作者:屈鵬

在 《TiKV 源碼解析(二)raft-rs proposal 示例情景分析》 中,我們主要介紹了 raft-rs 的基本 API 使用,其中,與應用程序進行交互的主要 API 是:

  1. RawNode::propose 發起一次新的提交,嘗試在 Raft 日誌中追加一個新項;
  2. RawNode::ready_since 從 Raft 節點中獲取最近的更新,包括新近追加的日誌、新近確認的日誌,以及需要給其他節點發送的消息等;
  3. 在將一個 Ready 中的所有更新處理完畢之後,使用 RawNode::advance 在這個 Raft 節點中將這個 Ready 標記為完成狀態。

熟悉了以上 3 個 API,用戶就可以寫出基本的基於 Raft 的分散式應用的框架了,而 Raft 協議中將寫入同步到多個副本中的任務,則由 raft-rs 庫本身的內部實現來完成,無須應用程序進行額外幹預。本文將對數據冗餘複製的過程進行詳細展開,特別是關於 snapshot 及流量控制的機制,幫助讀者更深刻地理解 Raft 的原理。

一般 MsgAppend 及 MsgAppendResponse 的處理

在 Raft leader 上,應用程序通過 RawNode::propose 發起的寫入會被處理成一條 MsgPropose 類型的消息,然後調用 Raft::append_entry 和 Raft::bcast_append 將消息中的數據追加到 Raft 日誌中並廣播到其他副本上。整體流程如偽代碼所示:

fn Raft::step_leader(&mut self, mut m: Message) -> Result<()> {
if m.get_msg_type() == MessageType::MsgPropose {
// Propose with an empty entry list is not allowed.
assert!(!m.get_entries().is_empty());
self.append_entry(&mut m.mut_entries());
self.bcast_append();
}
}

這段代碼中 append_entry 的參數是一個可變引用,這是因為在 append_entry 函數中會為每一個 Entry 賦予正確的 term 和 index。term 由選舉產生,在一個 Raft 系統中,每選舉出一個新的 Leader,便會產生一個更高的 term。而 index 則是 Entry 在 Raft 日誌中的下標。Entry 需要帶上 term 和 index 的原因是,在其他副本上的 Raft 日誌是可能跟 Leader 不同的,例如一個舊 Leader 在相同的位置(即 Raft 日誌中具有相同 index 的地方)廣播了一條過期的 Entry,那麼當其他副本收到了重疊的、但是具有更高 term 的消息時,便可以用它們替換舊的消息,以便達成與最新的 Leader 一致的狀態。

在 Leader 將新的寫入追加到自己的 Raft log 中之後,便可以調用 bcast_append 將它們廣播到其他副本了。注意這個函數並沒有任何參數,那麼 Leader 如何知道應該給每一個副本從哪一個位置開始廣播呢?原來在 Leader 上對每一個副本,都關聯維護了一個 Progress,該結構體定義如下:

pub struct Progress {
pub matched: u64,
// 該副本期望接收的下一個 Entry 的 index
pub next_idx: u64,
// 未 commit 的消息的滑動窗口
pub ins: Inflights,
// ProgressState::Probe:Leader 每個心跳間隔中最多發送一條 MsgAppend
// ProgressState::Replicate:Leader 在每個心跳間隔中可以發送多個 MsgAppend
// ProgressState::Snapshot:Leader 無法再繼續發送 MsgAppend 給這個副本
pub state: ProgressState,
// 是否暫停給這個副本發送 MsgAppend 了
pub paused: bool,
// 一些其他欄位……
}

如代碼注釋中所說的那樣,Leader 在給副本廣播新的日誌時,會從對應的副本的 next_idx 開始。這就蘊含了兩個問題:

  1. 在剛開始啟動的時候,所有副本的 next_idx 應該如何設置?
  2. 在接收並處理完成 Leader 廣播的新寫入後,其他副本應該如何向 Leader 更新 next_idx

第一個問題的答案在 Raft::reset 函數中。這個函數會在 Raft 完成選舉之後選出的 Leader 上調用,會將 Leader 的所有其他副本的 next_idx 設置為跟 Leader 相同的值。之後,Leader 就可以會按照 Raft 論文裏的規定,廣播一條包含了自己的 term 的空 Entry 了。

第二個問題的答案在 Raft::handle_append_response 函數中。我們繼續考察上面的情景,Leader 的其他副本在收到 Leader 廣播的最新的日誌之後,可能會採取兩種動作:

fn Raft::handle_append_entries(&mut self, m: &Message) {
let mut to_send = Message::new_message_append_response();
match self.raft_log.maybe_append(...) {
// 追加日誌成功,將最新的 last index 上報給 Leader
Some(last_index) => to_send.set_index(last_index),
// 追加日誌失敗,設置 reject 標誌,並告訴 Leader 自己的 last index
None => {
to_send.set_reject(true);
to_send.set_reject_hint(self.raft_log.last_index());
}
}
}
self.send(to_send);

其他副本調用 maybe_append 失敗的原因可能是比 Leader 的日誌更少,但是 Leader 在剛選舉出來的時候將所有副本的 next_idx 設置為與自己相同的值了。這個時候這些副本就會在 MsgAppendResponse 中設置拒絕的標誌。在 Leader 接收到這樣的反饋之後,就可以將對應副本的 next_idx 設置為正確的值了。這個邏輯在 Raft::handle_append_response中:

fn Raft::handle_append_response(&mut self, m: &Message, …) {
if m.get_reject() {
let pr: &mut Progress = self.get_progress(m.get_from());
// 將副本對應的 `next_idx` 回退到一個合適的值
pr.maybe_decr_to(m.get_index(), m.get_reject_hint());
} else {
// 將副本對應的 `next_idx` 設置為 `m.get_index() + 1`
pr.maybe_update(m.get_index());
}
}

以上偽代碼中我們省略了一些丟棄亂序消息的代碼,避免過多的細節造成幹擾。

pipeline 優化和流量控制機制

上一節我們重點觀察了 MsgAppend 及 MsgAppendResponse 消息的處理流程,原理是非常簡單、清晰的。然而,這個未經任何優化的實現能夠工作的前提是在 Leader 收到某個副本的 MsgAppendResponse 之前,不再給它發送任何 MsgAppend。由於等待響應的時間取決於網路的 TTL,這在實際應用中是非常低效的,因此我們需要引入 pipeline 優化,以及配套的流量控制機制來避免「優化」帶來的網路壅塞。

Pipeline 在 Raft::prepare_send_entries 函數中被引入。這個函數在 Raft::send_append 中被調用,內部會直接修改對目標副本的 next_idex 值,這樣,後續的 MsgAppend 便可以在此基礎上繼續發送了。而一旦之前的 MsgAppend 被該目標副本拒絕掉了,也可以通過上一節中介紹的 maybe_decr_to 機制將 next_idx 重置為正確的值。我們來看一下這段代碼:

// 這個函數在 `Raft::prepare_send_entries` 中被調用
fn Progress::update_state(&mut self, last: u64) {
match self.state {
ProgressState::Replicate => {
self.next_idx = last + 1;
self.ins.add(last);
},
ProgressState::Probe => self.pause(),
_ => unreachable!(),
}
}

Progress 有 3 種不同的狀態,如這個結構體的定義的代碼片段所示。其中 Probe 狀態和 Snapshot 狀態會在下一節詳細介紹,現在只需要關注 Replicate 狀態。我們已經知道 Pipeline 機制是由更新 next_idx 的那一行引入的了,那麼下面更新 ins 的一行的作用是什麼呢?

從 Progress 的定義的代碼片段中我們知道,ins 欄位的類型是 Inflights,可以想像成一個類似 TCP 的滑動窗口:所有 Leader 發出了,但是尚未被目標副本響應的消息,都被框在該副本在 Leader 上對應的 Progress 的 ins 中。這樣,由於滑動窗口的大小是有限的,Raft 系統中任意時刻的消息數量也會是有限的,這就實現了流量控制的機制。更具體地,Leader 在給某一副本發送 MsgAppend 時,會檢查其對應的滑動窗口,這個邏輯在 Raft::send_append 函數中;在收到該副本的 MsgAppendResponse 之後,會適時調用 Inflights 的 free_to 函數,使窗口向前滑動,這個邏輯在 Raft::handle_append_response 中。

ProgressState 相關優化

我們已經在 Progress 結構體的定義以及上面一些代碼片段中見過了 ProgressState 這個枚舉類型。在 3 種可能的狀態中,Replicate 狀態是最容易理解的,Leader 可以給對應的副本發送多個 MsgAppend 消息(不超過滑動窗口的限制),並適時地將窗口向前滑動。然而,我們注意到,在 Leader 剛選舉出來時,Leader 上面的所有其他副本的狀態卻被設置成了 Probe。這是為什麼呢?

從 Progress 結構體的欄位注釋中,我們知道當某個副本處於 Probe 狀態時,Leader 只能給它發送 1 條 MsgAppend 消息。這是因為,在這個狀態下的 Progress 的 next_idx 是 Leader 猜出來的,而不是由這個副本明確的上報信息推算出來的。它有很大的概率是錯誤的,亦即 Leader 很可能會回退到某個地方重新發送;甚至有可能這個副本是不活躍的,那麼 Leader 發送的整個滑動窗口的消息都可能浪費掉。因此,我們引入 Probe 狀態,當 Leader 給處於這一狀態的副本發送了 MsgAppend 時,這個 Progress 會被暫停掉(源碼片段見上一節),這樣在下一次嘗試給這個副本發送 MsgAppend 時,會在 Raft::send_append 中跳過。而當 Leader 收到了這個副本上報的正確的 last index 之後,Leader 便知道下一次應該從什麼位置給這個副本發送日誌了,這一過程在 Progress::maybe_update 函數中:

fn Progress::maybe_update(&mut self, n: u64) {
if self.matched < n {
self.matched = n;
self.resume(); // 取消暫停的狀態
}
if self.next_idx < n + 1 {
self.next = n + 1;
}
}

ProgressState::Snapshot 狀態與 Progress 中的 pause 標誌十分相似,一個副本對應的 Progress 一旦處於這個狀態,Leader 便不會再給這個副本發送任何 MsgAppend 了。但是仍有細微的差別:事實上在 Leader 收到 MsgHeartbeatResponse 時,也會調用 Progress::resume 來將取消對該副本的暫停,然而對於 ProgressState::Snapshot 狀態的 Progress 則沒有這個邏輯。這個狀態會在 Leader 成功發送完成 Snapshot,或者收到了對應的副本的最新的 MsgAppendResponse 之後被改變,詳細的邏輯請參考源代碼,這裡就不作贅述了。

我們把篇幅留給在 Follower 上收到 Snapshot 之後的處理邏輯,主要是 Raft::restore_raftRaftLog::restore 兩個函數。前者中主要包含了對 Progress 的處理,因為 Snapshot 包含了 Leader 上最新的信息,而 Leader 上的 Configuration 是可能跟 Follower 不同的。後者的主要邏輯偽代碼如下所示:

fn RaftLog::restore(&mut self, snapshot: Snapshot) {
self.committed = snapshot.get_metadata().get_index();
self.unstable.restore(snapshot);
}

可以看到,內部僅更新了 committed,並沒有更新 applied。這是因為 raft-rs 僅關心 Raft 日誌的部分,至於如何把日誌中的內容更新到真正的狀態機中,是應用程序的任務。應用程序需要從上一篇文章中介紹的 Ready 介面中把 Snapshot 拿到,然後自行將其應用到狀態機中,最後再通過 RawNode::advance 介面將 applied 更新到正確的值。

總結

Raft 日誌複製及相關的流量控制、Snapshot 流程就介紹到這裡,代碼倉庫仍然在 github.com/pingcap/raft,source-code 分支。下一期 raft-rs 源碼解析我們會繼續為大家帶來 configuration change 相關的內容,敬請期待!

更多閱讀:

博客?

www.pingcap.com
圖標

推薦閱讀:
相關文章