本小节将详细的分析 raft 层的实现,包括核心的模块介面、数据结构、模块交互和 coroutine 模型等。
既然是 StateMachine,那么首先看看 raft StateMachine 的状态转换,实际上就是 raft 演算法中各种角色的转换,
etcd-raft StateMachine 封装在 raft struct 中,其状态转换如下图:
func (r *raft) becomeFollower(term uint64, lead uint64) func (r *raft) becomePreCandidate() func (r *raft) becomeCandidate() func (r *raft) becomeLeader()
etcd 将 raft 相关的所以处理都抽象为了 Msg,通过 Step 介面处理
func (r *raft) Step(m pb.Message) error { r.step(r, m) }
其中 step 是一个回调函数,在不同的 state 会设置不同的回调函数来驱动 raft,这个回调函数 stepFunc 就是在 become**** 函数完成的设置
type raft struct { ....... step stepFunc }
step 回调函数有如下几个值,其中 stepCandidate 会处理 PreCandidate 和 Candidate 两种状态
func stepFollower(r *raft, m pb.Message) error func stepCandidate(r *raft, m pb.Message) error func stepLeader(r *raft, m pb.Message) error
这里以 stepCandidate 为例说明:
func stepCandidate(r *raft, m pb.Message) error { ...... switch m.Type { case pb.MsgProp: r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term) return ErrProposalDropped case pb.MsgApp: r.becomeFollower(m.Term, m.From) // always m.Term == r.Term r.handleAppendEntries(m) case pb.MsgHeartbeat: r.becomeFollower(m.Term, m.From) // always m.Term == r.Term r.handleHeartbeat(m) case pb.MsgSnap: r.becomeFollower(m.Term, m.From) // always m.Term == r.Term r.handleSnapshot(m) case myVoteRespType: ...... case pb.MsgTimeoutNow: r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From) } return nil }
即对各种 Msg 进行处理,这里就不展开详细展开。
所有的外部处理请求经过 raft StateMachine 处理都会首先被转换成统一抽象的输入 Message(Msg),Msg 会通过 raft.Step(m) 介面完成 raft StateMachine 的处理,Msg 分两类:
所有的 Msg 在 bp.Message 中详细定义,下面给出所有的 message 类型并且依次介绍:
const ( MsgHup MessageType = 0 // 本地消息:选举,可能会触发 pre-vote 或者 vote MsgBeat MessageType = 1 // 本地消息:心跳,触发放给 peers 的 Msgheartbeat MsgProp MessageType = 2 // 本地消息:Propose,触发 MsgApp MsgApp MessageType = 3 // 非本地:Op log 复制/配置变更 request MsgAppResp MessageType = 4 // 非本地:Op log 复制 response MsgVote MessageType = 5 // 非本地:vote request MsgVoteResp MessageType = 6 // 非本地:vote response MsgSnap MessageType = 7 // 非本地:Leader 向 Follower 拷贝 Snapshot,response Message 就是 MsgAppResp,通过这个值告诉 Leader 继续复制后面的日志 MsgHeartbeat MessageType = 8 // 非本地:心跳 request MsgHeartbeatResp MessageType = 9 // 非本地:心跳 response MsgUnreachable MessageType = 10 // 本地消息:EtcdServer 通过这个消息告诉 raft 状态某个 Follower 不可达,让其发送 message方式由 pipeline 切成 ping-pong 模式 MsgSnapStatus MessageType = 11 // 本地消息:EtcdServer 通过这个消息告诉 raft 状态机 snapshot 发送成功还是失败 MsgCheckQuorum MessageType = 12 // 本地消息:CheckQuorum,用于 Lease read,Leader lease MsgTransferLeader MessageType = 13 // 本地消息:可能会触发一个空的 MsgApp 尽快完成日志复制,也有可能是 MsgTimeoutNow 出 Transferee 立即进入选举 MsgTimeoutNow MessageType = 14 // 非本地:触发 Transferee 立即进行选举 MsgReadIndex MessageType = 15 // 非本地:Read only ReadIndex MsgReadIndexResp MessageType = 16 // 非本地:Read only ReadIndex response MsgPreVote MessageType = 17 // 非本地:pre vote request MsgPreVoteResp MessageType = 18 // 非本地:pre vote response )
需要注意的是并没有单独的配置变更的 Msg,而是 MsgApp 不同的 entry
type EntryType int32 ? const ( EntryNormal EntryType = 0 EntryConfChange EntryType = 1 )
由于 etcd 的网路、持久化模块和 raft 核心是分离的,所以当 raft 处理到某一些阶段的时候,需要输出一些东西,给外部处理,例如 Op log entries 持久化,Op log entries 复制的 Msg 等;以 heartbeat 为例,输入是 MsgBeat Msg,经过状态机状态化之后,就变成了给复制组所有的 Peer 发送心跳的 MsgHeartbeat Msg;在 ectd 中就是通过一个 Ready 的数据结构来封装当前 Raft state machine 已经准备好的数据和 Msg 供外部处理。下面是 Ready 的数据结构
// Ready encapsulates the entries and messages that are ready to read, // be saved to stable storage, committed or sent to other peers. // All fields in Ready are read-only. type Ready struct { // The current volatile state of a Node. // SoftState will be nil if there is no update. // It is not required to consume or store SoftState. *SoftState ? // The current state of a Node to be saved to stable storage BEFORE // Messages are sent. // HardState will be equal to empty state if there is no update. pb.HardState ? // ReadStates can be used for node to serve linearizable read requests locally // when its applied index is greater than the index in ReadState. // Note that the readState will be returned when raft receives msgReadIndex. // The returned is only valid for the request that requested to read. ReadStates []ReadState ? // Entries specifies entries to be saved to stable storage BEFORE // Messages are sent. // 写入 WAL Entries []pb.Entry ? // Snapshot specifies the snapshot to be saved to stable storage. Snapshot pb.Snapshot ? // CommittedEntries specifies entries to be committed to a // store/state-machine. These have previously been committed to stable // store. CommittedEntries []pb.Entry ? // Messages specifies outbound messages to be sent AFTER Entries are // committed to stable storage. // If it contains a MsgSnap message, the application MUST report back to raft // when the snapshot has been received or has failed by calling ReportSnapshot. Messages []pb.Message ? // MustSync indicates whether the HardState and Entries must be synchronously // written to disk or if an asynchronous write is permissible. MustSync bool }
Ready 是 raft 状态机和外面交互传递的核心数据结构,其包含了一批更新操作
// SoftState provides state that is useful for logging and debugging. // The state is volatile and does not need to be persisted to the WAL. type SoftState struct { Lead uint64 // must use atomic operations to access; keep 64-bit aligned. RaftState StateType } ? type StateType uint64 ? var stmap = [...]string{ "StateFollower", "StateCandidate", "StateLeader", "StatePreCandidate", }
type HardState struct { Term uint64 protobuf:"varint,1,opt,name=term" json:"term" Vote uint64 protobuf:"varint,2,opt,name=vote" json:"vote" Commit uint64 protobuf:"varint,3,opt,name=commit" json:"commit" XXX_unrecognized []byte json:"-" }
也就是上面和图中的 node 模块,其中实现了 Node interface 定义的所有介面,其主要用于raftNode 、外部和 raft StateMachine 状态机交互,其核心介面分类描述如下:
(1)提供输入介面:向 raft StateMachine 提交 msg
(2)驱动状态机运转介面:
(3)获取输出介面:
返回准备好的待处理的状态和数据: Ready,介面 Ready() <-chan Ready,raftNode 模块会监听这个 Channel 来获取 raft StateMachine 的输出结构 Ready
node struct 实现了 Node interface 的所有介面,详细的介面定义如下:
type Node interface { // Tick increments the internal logical clock for the Node by a single tick. Election // timeouts and heartbeat timeouts are in units of ticks. Tick() // Campaign causes the Node to transition to candidate state and start campaigning to become leader. Campaign(ctx context.Context) error // 向 raft leader node propse 一个 key-value Op,提交 MsgProp,Entry type:EntryNormal Propose(ctx context.Context, data []byte) error // 向 raft leader node propse 一个 conf change Op,提交 MsgProp Entry type:EntryConfChange,负责和正常 Op 的 log 复制相同 ProposeConfChange(ctx context.Context, cc pb.ConfChange) error // Step advances the state machine using the given message. ctx.Err() will be returned, if any. // 当节点收到其他节点发过来的 message,主动调用驱动 Raft Step(ctx context.Context, msg pb.Message) error ? // Ready returns a channel that returns the current point-in-time state. // Users of the Node must call Advance after retrieving the state returned by Ready. // // NOTE: No committed entries from the next Ready may be applied until all committed entries // and snapshots from the previous one have finished. // 得到当前节点的 ready 状态,我们会在之前用 has_ready 来判断一个 RawNode 是否 ready Ready() <-chan Ready ? // 告诉 Raft 已经处理完 ready,开始后续的迭代 Advance() // ApplyConfChange applies config change to the local node. // Returns an opaque ConfState protobuf which must be recorded // in snapshots. Will never return nil; it returns a pointer only // to match MemoryStorage.Compact. ApplyConfChange(cc pb.ConfChange) *pb.ConfState ? // TransferLeadership attempts to transfer leadership to the given transferee. TransferLeadership(ctx context.Context, lead, transferee uint64) ? // ReadIndex request a read state. The read state will be set in the ready. // Read state has a read index. Once the application advances further than the read // index, any linearizable read requests issued before the read request can be // processed safely. The read state will have the same rctx attached. ReadIndex(ctx context.Context, rctx []byte) error ? // Status returns the current status of the raft state machine. Status() Status // ReportUnreachable reports the given node is not reachable for the last send. ReportUnreachable(id uint64) // ReportSnapshot reports the status of the sent snapshot. The id is the raft ID of the follower // who is meant to receive the snapshot, and the status is SnapshotFinish or SnapshotFailure. // Calling ReportSnapshot with SnapshotFinish is a no-op. But, any failure in applying a // snapshot (for e.g., while streaming it from leader to follower), should be reported to the // leader with SnapshotFailure. When leader sends a snapshot to a follower, it pauses any raft // log probes until the follower can apply the snapshot and advance its state. If the follower // cant do that, for e.g., due to a crash, it could end up in a limbo, never getting any // updates from the leader. Therefore, it is crucial that the application ensures that any // failure in snapshot sending is caught and reported back to the leader; so it can resume raft // log probing in the follower. ReportSnapshot(id uint64, status SnapshotStatus) // Stop performs any necessary termination of the Node. Stop() }
除此之外 node 模块还会有一个 coroutine 负责接收外部的各种 Msg,然后驱动 raft StateMachine 运转,在 go 中 coroutine 之间的通信都是通过 Channel 来进行的,所以 node 模块也通过监听相应的 Channel 发现输入 Msg ,并且驱动 raft StateMachine 运转,或者通过往 Channel 中写入数据来传递输出。
type node struct { // 向 raft StateMachine 提交一个 Op Propose(normal op/ conf change) propc chan msgWithResult // 向 raft StateMachine 提交 Peer 发送过来的一些 Message,例如一些 Response,或者对 Follower 来说各种 request message recvc chan pb.Message confc chan pb.ConfChange confstatec chan pb.ConfState // 向上层应用 raftNode 输出 Ready 好的数据和状态 readyc chan Ready // 用于 raftNode 通知 raft StateMachine 当前 Ready 处理完了,准备下一个 advancec chan struct{} // 用于 raftNode 通知 raft StateMachine,滴答逻辑时钟推进 tickc chan struct{} done chan struct{} stop chan struct{} // 向上层应用输出 raft state machine 状态 status chan chan Status ..... }
如上是 node 模块的 node struct 的定义,这里以 Node interface 的 Propose 介面为例,其会生成一个本地的 MsgProp Msg 并通过 node.propc Channel 写入,而这个时候 node 模块的 coroutine 已经监听在这个 node.proc Channel 上了,当收到 MsgProp Msg,就会提交 raft StateMachine,并运转 raft StateMachine,一旦产生输出,就会 new 一个 Ready struct 来包含这次输出,然后写入 node.readyc Channel,这个时候 raftNode 模块的 coroutine 监听到 node.readyc 有输入,然后其就会读取处理处理,处理完了,就会通过 node.advance Channel 通知 node 模块 coroutine,已经处理完了当前 raft StateMachine Ready 输出,可以发送下一个准备好的待处理数据 Ready
运转指的是整个 etcd-raft 的运转,其核心是由两个 coroutine 驱动,分别上文和图中提到的 raft 层中的 raftNode 模块和 node 模块各一个 coroutine:
(1)node 模块,对应一个 coroutine,其对应的处理逻辑代码框架如下。主要负责监听几个 Channel 接收输入,然后运行 raft StateMachine 处理输出,并打包成 Ready 给 raftNode 模块处理。:
func (n *node) run(r *raft) { ? for { if advancec != nil { readyc = nil } else { // 生成 Ready rd = newReady(r, prevSoftSt, prevHardSt) ...... } ....... select { // TODO: maybe buffer the config propose if there exists one (the way // described in raft dissertation) // Currently it is dropped in Step silently. // 从 propc 拿 client 发过来的 Propose 交给 case pm := <-propc: ....... err := r.Step(m) ....... case m := <-n.recvc: // filter out response message from unknown From. // (1) 如果是 Leader,那么收到的 Msg 必须有对应的 Progress // (2) 如果是 Follower,那么收到的 Msg 必定不是 ResponseMsg ....... case <-n.tickc: r.tick() case readyc <- rd: ....... case <-advancec: ...... } } }
(2)raftNode 模块:也会有一个 coroutine 对应,其核心的代码逻辑如下,主要完成的工作是把 raft StateMachine 处理的阶段性输出 Ready 拿来处理,该持久化的通过持久化介面写入盘中,该发送给 Peer 的通过网路层发送给 Peers 等。
func (r *raftNode) start(rh *raftReadyHandler) { go func() { defer r.onStop() islead := false ? for { select { // 监听 Ticker 事件,并通知 raft StateMachine case <-r.ticker.C: r.tick() // 监听待处理的 Ready,并处理 case rd := <-r.Ready(): ...... // 这部分处理 Ready 的逻辑下面单独文字描述 ...... // 通知 raft StateMachine 运转,返回新的待处理的 Ready r.Advance() case <-r.stopped: return } } }() }
raftNode 模块的 cortoutine 核心就是处理 raft StateMachine 的 Ready,下面将用文字单独描述,这里仅考虑Leader 分支,Follower 分支省略:
因为是 go 实现的,所以实际上是 coroutine 模型,如下图,注意因为是 coroutine,所以 coroutine 间的通信都是通过 Channel 完成的,这点注意和多线程模型区别开来,下图将给出整个 etcd server 和 raft 相关的所有 coroutine 和相关交互的 Channel 之间的关系图,这里不会详细介绍所有的交互流程和细节,感兴趣的读者可以结合代码来看。
其中红色虚线框起来的代表一个 coroutine,下面将对各个协程的作用基本的描述
通过上面的线程模型分析以及 3.5 小节关于 raftNode 对于 raft StateMachine 输出 Ready 的处理,可以总结 etcd-raft 在性能上做了如下的优化:
为了更好的将整个 etcd-raft 流程串起来,下面将以一个 put kv 请求为例,描述各个模块是如何协作来完成 request 的处理。如下图给出了 etcd server 收到一个 put kv 请求的详细流程步骤图。
OK,整个 Put kv request 的处理请求流程大致介绍完。需要注意的是,上面尽管每个步骤都有严格的序号,但是很多操作是非同步,并发甚至并行的发生的,序号并不是严格的发生先后顺序,例如上面的 11 步 和 12,分别在不同 coroutine 并行处理,严格的发生时间序列并没有。
etcd-raft 最大设计亮点就是抽离了网路、持久化、协程等逻辑,用一个纯粹的 raft StateMachine 来实现 raft 演算法逻辑,充分的解耦,有助于 raft 演算法本身的正确实现和,而且更容易纯粹的去测试 raft 演算法最本质的逻辑,而不需要考虑引入其他因素(各种异常),这一点在 raft StateMachine 的单元测试中就能够体现。希望通过本文能让大家从整体上快速的了解 etcd-raft 设计和实现思路,限于篇幅未能涉及,很多 etcd-raft 的实现细节未能详细描述,例如 Ticker 驱动逻辑时钟推进,Read 的详细交互流程,Pipeline 复制等,感兴趣的可以阅读相关源代码,时间仓促,难免有理解疏漏或者错误的地方,欢迎指出。
Notes
如有理解和描述上有疏漏或者错误的地方,欢迎共同交流;参考已经在参考文献中注明,但仍有可能有疏漏的地方,有任何侵权或者不明确的地方,欢迎指出,必定及时更正或者删除;文章供于学习交流,转载注明出处。
参考文献
[1]. Ongaro D, Ousterhout J. In search of an understandable consensus algorithm[J]. Draft of October, 2014.
[2]. ONGARO, D. Consensus: Bridging theory and practice. Tech. Rep. Ph.D. thesis, Stanford University, August 2014.
[3]. etcd. https://github.com/etcd-io/etcd
[4]. raft home. https://raft.github.io/
[5]. Paxos、Raft演算法当前阶段比较稳定的,经过生产环境验证的开源实现有哪些?. https://www.zhihu.com/question/53344734
推荐阅读: