WAL模块主要方法简述

Method---wal.go Description
func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) 初次启动raftNode时调用WAL.Create方法。创建WAL对象用于记录追加 :判断是否存在dirpath路径,如果已存在则不是初次启动raftNode,返回os.ErrExist。创建临时目录和初始上锁的wal文件—walName(seq=0 & index=0),seek到文件末尾(why?),预分配该wal文件大小(SegmentSizeBytes=64MB,优化追加速度),创建WAL对象并设定路径、 metadata(NodeID和ClusterID)、编码器,将上锁的WAL文件追加到锁表内,然后依次写入crc、metadata和空snapshot,重命名临时目录,同步临时目录的父目录(fsync)使得重命名持久化。
func (w *WAL) renameWAL(tmpdirpath string) (*WAL, error) 移除w.dir目录及目录下所有文件和文件夹,调用os.Rename(tmpdirpath, w.dir)将Create方法内创建的临时目录重命名,创建FilePipeline和dirFile *os.File,dirFile is a fd for the wal directory for syncing on Rename
func (w *WAL) SaveSnapshot(e walpb.Snapshot) error 检验snapshot是否合法(Since etcd>=3.5.0),pb序列化snapshot得到data字段,加锁,调用w.encoder.encode方法写入record,更新w.enti如果snapshot index > 原w.enti,解锁。
func (w *WAL) saveCrc(prevCrc uint32) error 写入crcType的记录
func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error 加锁,如果hardstate和entries为空,返回。判断是否需要sync(entries长度不为0||vote改变||term改变),写入entries和hardstate,判断文件当前位置是否小于SegmentSizeBytes(默认64M),如果小于判断是否需要sync数据,如果不小于,返回cut操作结果,解锁。
func (w *WAL) saveEntry(e *raftpb.Entry) error 写入一条entry记录并更新WAL对象的enti值
func (w *WAL) saveState(s *raftpb.HardState) error 判断是否为空,不为空写入HardState并更新WAL对象的state值
func (w *WAL) cut() error 关闭当前文件并创建一个新的文件用于追加记录:首先移动到锁表最后一个wal文件的当前位置截断文件然后执行sync,调用FilePipeline对象的Open方法创建一个新文件并加入锁表,首先保存旧的encoder的crc,然后创建新的encoder对象替换旧的encoder对象,保存头信息crc、metadata和hardstate,原子重命名文件之前先执行sync和保存当前位置偏移,重命名后对WAL对象的dirFile执行fsync持久化wal目录的变化。关闭文件重新以LockFile方式打开文件并seek到文件末尾,替换锁表尾文件,再次进行新旧encoder替换。
func (w *WAL) tail() *fileutil.LockedFile WAL对象的锁表如果不为空,返回最后一个上锁文件,否则返回空。
func (w *WAL) sync() error 如果encoder存在,则将encoder pageWriter缓冲区的数据写入,锁表尾文件执行fdatasync,将fdatasync延时上报监控prometheus。
func ValidSnapshotEntries(lg *zap.Logger, walDir string) ([]walpb.Snapshot, error) 寻找所有wal文件中snapshot条目,有效的snapshot条目index必须小于等于最新的hardstate。步骤:找到目录下所有带有合法名称的wal文件名,以只读模式打开这些wal文件,根据以读模式打开的这些wal文件创建decoder,循环解码每个文件的record:若为snapshotType,追加到snaps中;若为stateType,更新hardstate;若为crcType(wal文件开头),验证是否和decoder.crc相同(上一个文件末尾的crc)。返回所有index小于最新hardstate.Commit的walpb.snap条目。
func readWALNames(lg *zap.Logger, dirpath string) ([]string, error) 从指定目录读取所有wal文件name,并检查name合法性(.wal结尾)
func openWALFiles(lg *zap.Logger, dirpath string, names []string, nameIndex int, write bool) ([]fileutil.FileReader, []*fileutil.LockedFile, func() error, error) 根据write标志选择以读模式还是写模式打开文件,步骤:从nameIndex指定的索引开始打开文件。若写模式:打开上锁的wal文件,将该文件添加到锁表、文件关闭表、读文件表;若读模式:以os.O_RDONLY打开文件,将该文件添加到文件关闭表、读文件表,添加nil到锁表(锁表只在写模式下用到)。
func Open(lg zap.Logger, dirpath string, snap walpb.Snapshot) (WAL, error) 写模式调用openAtIndex。Open opens the WAL at the given snap,The returned WAL is ready to read and the first record will be the one after the given snap. The WAL cannot be appended to before reading out all of its previous records.
func OpenForRead(lg zap.Logger, dirpath string, snap walpb.Snapshot) (WAL, error) 读模式调用openAtIndex。
func openAtIndex(lg zap.Logger, dirpath string, snap walpb.Snapshot, write bool) (WAL, error) 遍历wal目录,找到snap所在位置index,依次打开index后续所有的wal文件并加锁(调用openWALFiles),创建WAL对象并设定解码器、readClose(调用closeAll关闭已打开文件)和锁表,若写模式:readClose置空(写模式下还要继续对wal文件进行append操作,等到读完后不用进行关闭操作),测试锁表最后一个上锁文件是否是合法wal文件(通过是否符合命名规范判断),如果不合法,关闭所有文件返回错误,否则,设定FilePipeline对象(大小超过64M时用于截断并切换到新文件),返回WAL对象 。
func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.Entry, err error) 读取WAL对象的所有记录 ,对于不同类型记录做不同处理: 判断锁表尾文件如为空(读模式):如果不是读到EOF或ErrUnexpectedEOF则重置state返回;对于写模式,如果err不是EOF,重置状态返回,然后锁表尾文件Seek到lastOffset位置, 将后续内容清零(目的是处理遇到0记录后接非0记录时,非0记录又没有被全部重写,再次打开的时候会出现 CRC错误,由于数据从不会一开始就完全同步到磁盘,因此进行清零操作是安全的 ?暂时没懂),然后判断snapshot是否匹配,关闭decoder实现禁读,重置WAL对象的start为一个空snapshot对象,创建encoder并将decoder设空,返回metadata,state,ents和err。



Method---encoder.go Description
func (e *encoder) encode(rec *walpb.Record) error 加锁,根据record.data计算crc,序列化record得data,根据len(data)计算出lenField和padBytes(需要填充字节数使得8字节对齐),写入lenField到文件,如果padBytes!=0,则给data填充padBytes字节,写入到pageWriter缓冲区。
func encodeFrameSize(dataBytes int) (lenField uint64, padBytes int) 计算出lenField, padBytes
func (e *encoder) flush() error 加锁,将pageWriter缓冲区内数据写入文件,解锁
func writeUint64(w io.Writer, n uint64, buf []byte) error 将uint64类型n写入到[]byte内,再写入文件



Method---pagewriter.go Description
func (pw *PageWriter) Write(p []byte) (n int, err error) 若未超出缓冲区最大容量,则将数据复制到缓冲区并更新当前缓冲字节数返回。若超出,计算出一页还空闲多少字节数slack,若还有空闲:
func (pw *PageWriter) flush() (int, error) 将缓冲区内的所有数据写入文件,重新计算页内偏移量(pageBytes=4KB),将已缓存字节数重置为0



Method---decoder.go Description
func (d *decoder) decodeRecord(rec *walpb.Record) error 首先读取一个int64变量,算出recBytes,和padBytes,读取recBytes+padBytes大小的数据,将前recBytes大小的数据反序列化,如果不是crcType,验证crc是否一致,更新lastOffset = 8 + recBytes + padBytes。
func decodeFrameSize(lenField int64) (recBytes int64, padBytes int64) 根据lenField,算出实际数据大小和填充数据大小
func (d *decoder) isTornEntry(data []byte) bool 还没看
func readInt64(r io.Reader) (int64, error) 从文件中读取一个int64变量

热门相关:斗神战帝   薄先生,情不由己   刺客之王   豪门闪婚:帝少的神秘冷妻   寂静王冠