Raft算法实现之状态存储——基于etcd

本文出自:【InTheWorld的博客】 (欢迎留言、交流)

Paxos算法也许是最著名的分布式一致性算法,而Raft则大概是最流行的分布式一致性算法。由于经验和水平所限,单纯看论文感觉并不能达到更进一步的理解。前面听闻Kubernetes, Docker Swarm,  CockroachDB等等牛逼的项目都在用Raft。毕竟是经过大规模生产环境考验的技术,我觉得很有必要学习一下。而且etcd的Raft实现是开源的,毕竟“源码之前,了无秘密”。

image

无论是Paxos还是Raft,它们都是致力于维护一个RSM(Replicated State Machine),如上图所示。对于RSM来说,状态存储是非常关键的。在这篇博客里,我准备基于etcd的实现分析一下Raft的状态存储。Raft状态的存储主要靠Snapshot和WAL(write ahead log)实现。

  • 和很多数据库一样,为了保证数据的安全性(crash或者宕机下的恢复),都会使用WAL,etcd也不例外。etcd中的每一个事务操作(即写操作),都会预先写到事务文件中,这种文件就是WAL。
  • 此外,etcd作为一个高可用的KV存储系统,不可能只依靠log replay来实现数据恢复。因此,etcd还提供了snapshot(快照)功能。snapshot即是定期把整个数据库保存成一个单独的快照文件,这样一来,不但缩短了日志重放的时间,也减轻了WAL的存储量,过早的WAL可以删除掉。

etcd使用了protobuf来定义协议格式,snapshot和log也在其中。raft/raft.proto文件部分内容如下:

enum EntryType {
    EntryNormal     = 0;
    EntryConfChange = 1;
}

message Entry {
    optional uint64     Term  = 2 [(gogoproto.nullable) = false]; // must be 64-bit aligned for atomic operations
    optional uint64     Index = 3 [(gogoproto.nullable) = false]; // must be 64-bit aligned for atomic operations
    optional EntryType  Type  = 1 [(gogoproto.nullable) = false];
    optional bytes      Data  = 4;
}

message SnapshotMetadata {
    optional ConfState conf_state = 1 [(gogoproto.nullable) = false];
    optional uint64    index      = 2 [(gogoproto.nullable) = false];
    optional uint64    term       = 3 [(gogoproto.nullable) = false];
}

message Snapshot {
    optional bytes            data     = 1;
    optional SnapshotMetadata metadata = 2 [(gogoproto.nullable) = false];
}

其中,entry即是logEntry,表示一条log。

1. Raft library提供的接口

etcd的Raft library其实也不是开箱即用,应用程序需要实现存储io和网络通信。存储io在Raft library被定义为一个Storage接口,这个Storage接口是Raft library用来读取log、snapshot等等数据的接口。Raft library本身提供了一个MemoryStorage的实现,这个实现是基于内存存储的,不能仅仅依靠它来保存持久化数据。

这个Storage的接口定义如下:

type Storage interface {
    // InitialState returns the saved HardState and ConfState information.
    InitialState() (pb.HardState, pb.ConfState, error)
    // Entries returns a slice of log entries in the range [lo,hi).
    // MaxSize limits the total size of the log entries returned, but
    // Entries returns at least one entry if any.
    Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)
    // Term returns the term of entry i, which must be in the range
    // [FirstIndex()-1, LastIndex()]. The term of the entry before
    // FirstIndex is retained for matching purposes even though the
    // rest of that entry may not be available.
    Term(i uint64) (uint64, error)
    // LastIndex returns the index of the last entry in the log.
    LastIndex() (uint64, error)
    // FirstIndex returns the index of the first log entry that is
    // possibly available via Entries (older entries have been incorporated
    // into the latest Snapshot; if storage only contains the dummy entry the
    // first log entry is not available).
    FirstIndex() (uint64, error)
    // Snapshot returns the most recent snapshot.
    // If snapshot is temporarily unavailable, it should return ErrSnapshotTemporarilyUnavailable,
    // so raft state machine could know that Storage needs some time to prepare
    // snapshot and call Snapshot later.
    Snapshot() (pb.Snapshot, error)
}

既然仅仅依靠memoryStorage是不够用的,那么我们还是来看看etcd本身是如何使用Raft libray的。etcd的Storage接口其实也复用了memoryStorage,但是仅仅把它当做一层内存的cache。每一次事务性操作中,etcd都会事先将存储内容flush到持久化存储设备上,然后写入memoryStorage。正如前文所述,Storage仅仅是用做汇报内容给Raft library的,只要能保证它和持久化内容的一致即可。而这一点在单机上很容易保证。此外,Raft library是通过raftlog来操作Storage的,详情见 etcd/raft/raft.go

2. etcd的具体实现

etcd server是通过WAL和snapshot实现持久化存储的。etcd使用了一个包裹层,一个叫storage的struct。为了避免混淆,贴一点代码(etcd/etcdserver/storage.go)。

type Storage interface {
    // Save function saves ents and state to the underlying stable storage.
    // Save MUST block until st and ents are on stable storage.
    Save(st raftpb.HardState, ents []raftpb.Entry) error
    // SaveSnap function saves snapshot to the underlying stable storage.
    SaveSnap(snap raftpb.Snapshot) error
    // Close closes the Storage and performs finalization.
    Close() error
}

type storage struct {
    *wal.WAL
    *snap.Snapshotter
}

func NewStorage(w *wal.WAL, s *snap.Snapshotter) Storage {
    return &storage{w, s}
}

注意,这个Storage和之前的那个Storage并没有关系,千万不要搞混淆了。

由于golang的语言特性,storage struct可以直接使用WAL和Snapshotter的方法,因为没有声明成员变量的名字。那么etcd是如何结合使用Raft library的memoryStorage和这里的storage struct的呢?答案就在etcd/etcdserver/raft.go。etcd对Raft library进行了进一步的封装,称之为raftNode,raftNode包含了一个raftNodeConfig的匿名成员。raftNodeConfig的定义如下所示:

type raftNodeConfig struct {
    // to check if msg receiver is removed from cluster
    isIDRemoved func(id uint64) bool
    raft.Node
    raftStorage *raft.MemoryStorage
    storage     Storage
    heartbeat   time.Duration // for logging
    // transport specifies the transport to send and receive msgs to members.
    // Sending messages MUST NOT block. It is okay to drop messages, since
    // clients should timeout and reissue their messages.
    // If transport is nil, server will panic.
    transport rafthttp.Transporter
}

源码看起来就是一目了然,raftStorage就是提供给Raft library的,而storage则是etcd实现的持久化存贮。在使用中,etcd以连续调用的方式实现二者一致的逻辑。以etcd server重启为例,我们看看同步是如何实现的,且看restartNode()的实现。

func restartNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
    var walsnap walpb.Snapshot
    if snapshot != nil {
        walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
    }
    w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap)

    plog.Infof("restarting member %s in cluster %s at commit index %d", id, cid, st.Commit)
    cl := membership.NewCluster("")
    cl.SetID(cid)
    s := raft.NewMemoryStorage()
    if snapshot != nil {
        s.ApplySnapshot(*snapshot)
    }
    s.SetHardState(st)
    s.Append(ents)
    c := &raft.Config{
        ID:              uint64(id),
        ElectionTick:    cfg.ElectionTicks,
        HeartbeatTick:   1,
        Storage:         s,
        MaxSizePerMsg:   maxSizePerMsg,
        MaxInflightMsgs: maxInflightMsgs,
        CheckQuorum:     true,
    }

    n := raft.RestartNode(c)
    raftStatusMu.Lock()
    raftStatus = n.Status
    raftStatusMu.Unlock()
    advanceTicksForElection(n, c.ElectionTick)
    return id, cl, n, s, w
}

这个函数的主要逻辑就是通过读取snapshot和WAL,然后通过s.SetHardState()和s.Append()使得memoryStrorage的状态得到恢复。在etcd的工作过程中也是类似的形式,不信请看raft.go的start()方法:

    if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
        plog.Fatalf("raft save state and entries error: %v", err)
    }
    if !raft.IsEmptyHardState(rd.HardState) {
        proposalsCommitted.Set(float64(rd.HardState.Commit))
    }
    // gofail: var raftAfterSave struct{}
    r.raftStorage.Append(rd.Entries)

代码我删减了一部分,总体的逻辑可以看得更清楚。r.storage.Save()和r.raftStorage.Append()这种连续调用保证了storage和raftStorage的一致性。

好吧!状态存储就到这里了,但这仅仅是Raft的基本内容,后边继续探索Raft的日志复制、leader选举以及事务提交的实现,当然还有RPC。

 

 

参考资料:

【1】. Stanford大学Diego Ongaro博士论文. 《CONSENSUS: BRIDGING THEORY AND PRACTICE》

【2】. Raft github page. https://raft.github.io/

【3】. Raft可视化. http://thesecretlivesofdata.com/raft/

【4】. etcd github项目. https://github.com/coreos/etcd  (last but not least)

已有2条评论 发表评论

  1. 路人甲 /

    透彻,好评。

    1. lshw4320814 / 本文作者

      谢谢!本来想认真弄一下etcd的,现在看来估计要烂尾了,尴尬

发表评论