TiKV 源碼解析系列文章(十)Snapshot 的發送和接收
作者:黃夢龍
背景知識
TiKV 使用 Raft 演算法來提供高可用且具有強一致性的存儲服務。在 Raft 中,Snapshot 指的是整個 State Machine 數據的一份快照,大體上有以下這幾種情況需要用到 Snapshot:
- 正常情況下 leader 與 follower/learner 之間是通過 append log 的方式進行同步的,出於空間和效率的考慮,leader 會定期清理過老的 log。假如 follower/learner 出現宕機或者網路隔離,恢復以後可能所缺的 log 已經在 leader 節點被清理掉了,此時只能通過 Snapshot 的方式進行同步。
- Raft 加入新的節點的,由於新節點沒同步過任何日誌,只能通過接收 Snapshot 的方式來同步。實際上這也可以認為是 1 的一種特殊情形。
- 出於備份/恢復等需求,應用層需要 dump 一份 State Machine 的完整數據。
TiKV 涉及到的是 1 和 2 這兩種情況。在我們的實現中,Snapshot 總是由 Region leader 所在的 TiKV 生成,通過網路發送給 Region follower/learner 所在的 TiKV。
理論上講,我們完全可以把 Snapshot 當作普通的 RaftMessage
來發送,但這樣做實踐上會產生一些問題,主要是因為 Snapshot 消息的尺寸遠大於其他 RaftMessage
:
- Snapshot 消息需要花費更長的時間來發送,如果共用網路連接容易導致網路擁塞,進而引起其他 Region 出現 Raft 選舉超時等問題。
- 構建待發送 Snapshot 消息需要消耗更多的內存。
- 過大的消息可能導致超出 gRPC 的 Message Size 限制等問題。
基於上面的原因,TiKV 對 Snapshot 的發送和接收進行了特殊處理,為每個 Snapshot 創建單獨的網路連接,並將 Snapshot 拆分成 1M 大小的多個 Chunk 進行傳輸。
源碼解讀
下面我們分別從 RPC 協議、發送 Snapshot、收取 Snapshot 三個方面來解讀相關源代碼。本文的所有內容都基於 v3.0.0-rc.2 版本。
Snapshot RPC call 的定義
與普通的 raft message 類似,Snapshot 消息也是使用 gRPC 遠程調用的方式來傳輸的。在 pingcap/kvproto 項目中可以找到相關 RPC Call 的定義,具體在 tikvpb.proto 和 raft_serverpb.proto 文件中。
rpc Snapshot(stream raft_serverpb.SnapshotChunk) returns (raft_serverpb.Done) {}
...
message SnapshotChunk {
RaftMessage message = 1;
bytes data = 2;
}
message Done {}
可以看出,Snapshot 被定義成 client streaming 調用,即對於每個 Call,客戶端依次向伺服器發送多個相同類型的請求,伺服器接收並處理完所有請求後,向客戶端返回處理結果。具體在這裡,每個請求的類型是 SnapshotChunk
,其中包含了 Snapshot 對應的 RaftMessage
,或者攜帶一段 Snapshot 數據;回復消息是一個簡單的空消息 Done
,因為我們在這裡實際不需要返回任何信息給客戶端,只需要關閉對應的 stream。
Snapshot 的發送流程
Snapshot 的發送過程的處理比較簡單粗暴,直接在將要發送 RaftMessage
的地方截獲 Snapshot 類型的消息,轉而通過特殊的方式進行發送。相關代碼可以在 server/transport.rs 中找到:
fn write_data(&self, store_id: u64, addr: &str, msg: RaftMessage) {
if msg.get_message().has_snapshot() {
return self.send_snapshot_sock(addr, msg);
}
if let Err(e) = self.raft_client.wl().send(store_id, addr, msg) {
error!("send raft msg err"; "err" => ?e);
}
}
fn send_snapshot_sock(&self, addr: &str, msg: RaftMessage) {
...
if let Err(e) = self.snap_scheduler.schedule(SnapTask::Send {
addr: addr.to_owned(),
msg,
cb,
}) {
...
}
}
從代碼中可以看出,這裡簡單地把對應的 RaftMessage
包裝成一個 SnapTask::Send
任務,並將其交給獨立的 snap-worker
去處理。值得注意的是,這裡的 RaftMessage
只包含 Snapshot 的元信息,而不包括真正的快照數據。TiKV 中有一個單獨的模塊叫做 SnapManager
,用來專門處理數據快照的生成與轉存,稍後我們將會看到從 SnapManager
模塊讀取 Snapshot 數據塊並進行發送的相關代碼。
我們不妨順藤摸瓜來看看 snap-worker
是如何處理這個任務的,相關代碼在 server/snap.rs,精簡掉非核心邏輯後的代碼引用如下:
fn run(&mut self, task: Task) {
match task {
Task::Recv { stream, sink } => {
...
let f = recv_snap(stream, sink, ...).then(move |result| {
...
});
self.pool.spawn(f).forget();
}
Task::Send { addr, msg, cb } => {
...
let f = future::result(send_snap(..., &addr, msg))
.flatten()
.then(move |res| {
...
});
self.pool.spawn(f).forget();
}
}
}
snap-worker
使用了 future
來完成收發 Snapshot 任務:通過調用 send_snap()
或 recv_snap()
生成一個 future 對象,並將其交給 FuturePool
非同步執行。
現在我們暫且只關注 send_snap()
的 實現:
fn send_snap(
...
addr: &str,
msg: RaftMessage,
) -> Result<impl Future<Item = SendStat, Error = Error>> {
...
let key = {
let snap = msg.get_message().get_snapshot();
SnapKey::from_snap(snap)?
};
...
let s = box_try!(mgr.get_snapshot_for_sending(&key));
if !s.exists() {
return Err(box_err!("missing snap file: {:?}", s.path()));
}
let total_size = s.total_size()?;
let chunks = {
let mut first_chunk = SnapshotChunk::new();
first_chunk.set_message(msg);
SnapChunk {
first: Some(first_chunk),
snap: s,
remain_bytes: total_size as usize,
}
};
let cb = ChannelBuilder::new(env);
let channel = security_mgr.connect(cb, addr);
let client = TikvClient::new(channel);
let (sink, receiver) = client.snapshot()?;
let send = chunks.forward(sink).map_err(Error::from);
let send = send
.and_then(|(s, _)| receiver.map_err(Error::from).map(|_| s))
.then(move |result| {
...
});
Ok(send)
}
這一段流程還是比較清晰的:先是用 Snapshot 元信息從 SnapManager
取到待發送的快照數據,然後將 RaftMessage
和 Snap
一起封裝進 SnapChunk
結構,最後創建全新的 gRPC 連接及一個 Snapshot stream 並將 SnapChunk
寫入。這裡引入 SnapChunk
是為了避免將整塊 Snapshot 快照一次性載入進內存,它 impl 了 futures::Stream
這個 trait 來達成按需載入流式發送的效果。如果感興趣可以參考它的 具體實現,本文就暫不展開了。
Snapshot 的收取流程
最後我們來簡單看一下 Snapshot 的收取流程,其實也就是 gRPC Call 的 server 端對應的處理,整個流程的入口我們可以在 server/service/kv.rs 中找到:
fn snapshot(
&mut self,
ctx: RpcContext<_>,
stream: RequestStream<SnapshotChunk>,
sink: ClientStreamingSink<Done>,
) {
let task = SnapTask::Recv { stream, sink };
if let Err(e) = self.snap_scheduler.schedule(task) {
...
}
}
與發送過程類似,也是直接構建 SnapTask::Recv
任務並轉發給 snap-worker
了,這裡會調用上面出現過的 recv_snap()
函數,具體實現 如下:
fn recv_snap<R: RaftStoreRouter + static>(
stream: RequestStream<SnapshotChunk>,
sink: ClientStreamingSink<Done>,
...
) -> impl Future<Item = (), Error = Error> {
...
let f = stream.into_future().map_err(|(e, _)| e).and_then(
move |(head, chunks)| -> Box<dyn Future<Item = (), Error = Error> + Send> {
let context = match RecvSnapContext::new(head, &snap_mgr) {
Ok(context) => context,
Err(e) => return Box::new(future::err(e)),
};
...
let recv_chunks = chunks.fold(context, |mut context, mut chunk| -> Result<_> {
let data = chunk.take_data();
...
if let Err(e) = context.file.as_mut().unwrap().write_all(&data) {
...
}
Ok(context)
});
Box::new(
recv_chunks
.and_then(move |context| context.finish(raft_router))
.then(move |r| {
snap_mgr.deregister(&context_key, &SnapEntry::Receiving);
r
}),
)
},
);
f.then(move |res| match res {
...
})
.map_err(Error::from)
}
值得留意的是 stream 中的第一個消息(其中包含有 RaftMessage
)被用來創建 RecvSnapContext
對象,其後的每個 chunk 收取後都依次寫入文件,最後調用 context.finish()
把之前保存的 RaftMessage
發送給 raftstore
完成整個接收過程。
總結
以上就是 TiKV 發送和接收 Snapshot 相關的代碼解析了。這是 TiKV 代碼庫中較小的一個模塊,它很好地解決了由於 Snapshot 消息特殊性所帶來的一系列問題,充分應用了 grpc-rs
組件及 futures
/FuturePool
模型,大家可以結合本系列文章的 第七篇 和 第八篇 進一步拓展學習。
原文閱讀:
TiKV 源碼解析系列文章(十)Snapshot 的發送和接收更多 TiKV 源碼閱讀:
博客 | PingCAP推薦閱讀: