本文出自:【InTheWorld的博客】 (欢迎留言、交流)
Paxos算法也许是最著名的分布式一致性算法,而Raft则大概是最流行的分布式一致性算法。由于经验和水平所限,单纯看论文感觉并不能达到更进一步的理解。前面听闻Kubernetes, Docker Swarm, CockroachDB等等牛逼的项目都在用Raft。毕竟是经过大规模生产环境考验的技术,我觉得很有必要学习一下。而且etcd的Raft实现是开源的,毕竟“源码之前,了无秘密”。
无论是Paxos还是Raft,它们都是致力于维护一个RSM(Replicated State Machine),如上图所示。对于RSM来说,状态存储是非常关键的。在这篇博客里,我准备基于etcd的实现分析一下Raft的状态存储。Raft状态的存储主要靠Snapshot和WAL(write ahead log)实现。
- 和很多数据库一样,为了保证数据的安全性(crash或者宕机下的恢复),都会使用WAL,etcd也不例外。etcd中的每一个事务操作(即写操作),都会预先写到事务文件中,这种文件就是WAL。
- 此外,etcd作为一个高可用的KV存储系统,不可能只依靠log replay来实现数据恢复。因此,etcd还提供了snapshot(快照)功能。snapshot即是定期把整个数据库保存成一个单独的快照文件,这样一来,不但缩短了日志重放的时间,也减轻了WAL的存储量,过早的WAL可以删除掉。
etcd使用了protobuf来定义协议格式,snapshot和log也在其中。raft/raft.proto文件部分内容如下:
enum EntryType { EntryNormal = 0; EntryConfChange = 1; } message Entry { optional uint64 Term = 2 [(gogoproto.nullable) = false]; // must be 64-bit aligned for atomic operations optional uint64 Index = 3 [(gogoproto.nullable) = false]; // must be 64-bit aligned for atomic operations optional EntryType Type = 1 [(gogoproto.nullable) = false]; optional bytes Data = 4; } message SnapshotMetadata { optional ConfState conf_state = 1 [(gogoproto.nullable) = false]; optional uint64 index = 2 [(gogoproto.nullable) = false]; optional uint64 term = 3 [(gogoproto.nullable) = false]; } message Snapshot { optional bytes data = 1; optional SnapshotMetadata metadata = 2 [(gogoproto.nullable) = false]; }
其中,entry即是logEntry,表示一条log。
1. Raft library提供的接口
etcd的Raft library其实也不是开箱即用,应用程序需要实现存储io和网络通信。存储io在Raft library被定义为一个Storage接口,这个Storage接口是Raft library用来读取log、snapshot等等数据的接口。Raft library本身提供了一个MemoryStorage的实现,这个实现是基于内存存储的,不能仅仅依靠它来保存持久化数据。
这个Storage的接口定义如下:
type Storage interface { // InitialState returns the saved HardState and ConfState information. InitialState() (pb.HardState, pb.ConfState, error) // Entries returns a slice of log entries in the range [lo,hi). // MaxSize limits the total size of the log entries returned, but // Entries returns at least one entry if any. Entries(lo, hi, maxSize uint64) ([]pb.Entry, error) // Term returns the term of entry i, which must be in the range // [FirstIndex()-1, LastIndex()]. The term of the entry before // FirstIndex is retained for matching purposes even though the // rest of that entry may not be available. Term(i uint64) (uint64, error) // LastIndex returns the index of the last entry in the log. LastIndex() (uint64, error) // FirstIndex returns the index of the first log entry that is // possibly available via Entries (older entries have been incorporated // into the latest Snapshot; if storage only contains the dummy entry the // first log entry is not available). FirstIndex() (uint64, error) // Snapshot returns the most recent snapshot. // If snapshot is temporarily unavailable, it should return ErrSnapshotTemporarilyUnavailable, // so raft state machine could know that Storage needs some time to prepare // snapshot and call Snapshot later. Snapshot() (pb.Snapshot, error) }
既然仅仅依靠memoryStorage是不够用的,那么我们还是来看看etcd本身是如何使用Raft libray的。etcd的Storage接口其实也复用了memoryStorage,但是仅仅把它当做一层内存的cache。每一次事务性操作中,etcd都会事先将存储内容flush到持久化存储设备上,然后写入memoryStorage。正如前文所述,Storage仅仅是用做汇报内容给Raft library的,只要能保证它和持久化内容的一致即可。而这一点在单机上很容易保证。此外,Raft library是通过raftlog来操作Storage的,详情见 etcd/raft/raft.go 。
2. etcd的具体实现
etcd server是通过WAL和snapshot实现持久化存储的。etcd使用了一个包裹层,一个叫storage的struct。为了避免混淆,贴一点代码(etcd/etcdserver/storage.go)。
type Storage interface { // Save function saves ents and state to the underlying stable storage. // Save MUST block until st and ents are on stable storage. Save(st raftpb.HardState, ents []raftpb.Entry) error // SaveSnap function saves snapshot to the underlying stable storage. SaveSnap(snap raftpb.Snapshot) error // Close closes the Storage and performs finalization. Close() error } type storage struct { *wal.WAL *snap.Snapshotter } func NewStorage(w *wal.WAL, s *snap.Snapshotter) Storage { return &storage{w, s} }
注意,这个Storage和之前的那个Storage并没有关系,千万不要搞混淆了。
由于golang的语言特性,storage struct可以直接使用WAL和Snapshotter的方法,因为没有声明成员变量的名字。那么etcd是如何结合使用Raft library的memoryStorage和这里的storage struct的呢?答案就在etcd/etcdserver/raft.go。etcd对Raft library进行了进一步的封装,称之为raftNode,raftNode包含了一个raftNodeConfig的匿名成员。raftNodeConfig的定义如下所示:
type raftNodeConfig struct { // to check if msg receiver is removed from cluster isIDRemoved func(id uint64) bool raft.Node raftStorage *raft.MemoryStorage storage Storage heartbeat time.Duration // for logging // transport specifies the transport to send and receive msgs to members. // Sending messages MUST NOT block. It is okay to drop messages, since // clients should timeout and reissue their messages. // If transport is nil, server will panic. transport rafthttp.Transporter }
源码看起来就是一目了然,raftStorage就是提供给Raft library的,而storage则是etcd实现的持久化存贮。在使用中,etcd以连续调用的方式实现二者一致的逻辑。以etcd server重启为例,我们看看同步是如何实现的,且看restartNode()的实现。
func restartNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) { var walsnap walpb.Snapshot if snapshot != nil { walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term } w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap) plog.Infof("restarting member %s in cluster %s at commit index %d", id, cid, st.Commit) cl := membership.NewCluster("") cl.SetID(cid) s := raft.NewMemoryStorage() if snapshot != nil { s.ApplySnapshot(*snapshot) } s.SetHardState(st) s.Append(ents) c := &raft.Config{ ID: uint64(id), ElectionTick: cfg.ElectionTicks, HeartbeatTick: 1, Storage: s, MaxSizePerMsg: maxSizePerMsg, MaxInflightMsgs: maxInflightMsgs, CheckQuorum: true, } n := raft.RestartNode(c) raftStatusMu.Lock() raftStatus = n.Status raftStatusMu.Unlock() advanceTicksForElection(n, c.ElectionTick) return id, cl, n, s, w }
这个函数的主要逻辑就是通过读取snapshot和WAL,然后通过s.SetHardState()和s.Append()使得memoryStrorage的状态得到恢复。在etcd的工作过程中也是类似的形式,不信请看raft.go的start()方法:
if err := r.storage.Save(rd.HardState, rd.Entries); err != nil { plog.Fatalf("raft save state and entries error: %v", err) } if !raft.IsEmptyHardState(rd.HardState) { proposalsCommitted.Set(float64(rd.HardState.Commit)) } // gofail: var raftAfterSave struct{} r.raftStorage.Append(rd.Entries)
代码我删减了一部分,总体的逻辑可以看得更清楚。r.storage.Save()和r.raftStorage.Append()这种连续调用保证了storage和raftStorage的一致性。
好吧!状态存储就到这里了,但这仅仅是Raft的基本内容,后边继续探索Raft的日志复制、leader选举以及事务提交的实现,当然还有RPC。
参考资料:
【1】. Stanford大学Diego Ongaro博士论文. 《CONSENSUS: BRIDGING THEORY AND PRACTICE》
【2】. Raft github page. https://raft.github.io/
【3】. Raft可视化. http://thesecretlivesofdata.com/raft/
【4】. etcd github项目. https://github.com/coreos/etcd (last but not least)
透彻,好评。
谢谢!本来想认真弄一下etcd的,现在看来估计要烂尾了,尴尬