之前的 TiKV 源碼解析系列文章介紹了 TiKV 依賴的周邊庫,從本篇文章開始,我們將開始介紹 TiKV 自身的代碼。本文重點介紹 TiKV 最外面的一層——Service 層。
接下來,我們將詳細介紹這些組件。
Resolver
在一個集羣中,每個 TiKV 實例都由一個唯一的 store id 進行標識。Resolver 的功能是將 store id 解析成 TiKV 的地址和埠,用於建立網路通信。
Resolver 是一個很簡單的組件,其介面僅包含一個函數:
pub trait StoreAddrResolver: Send + Clone {
fn resolve(&self, store_id: u64, cb: Callback) -> Result<()>;
}
其中 Callback
用於非同步地返回結果。PdStoreAddrResolver
實現了該 trait,它的 resolve
方法的實現則是簡單地將查詢任務通過其 sched
成員發送給 Runner
。而 Runner
則實現了 Runnable<Task>
,其意義是 Runner
可以在自己的一個線程裏運行,外界將會向 Runner
發送 Task
類型的消息,Runner
將對收到的 Task
進行處理。 這裡使用了由 TiKV 的 util 提供的一個單線程 worker 框架,在 TiKV 的很多處代碼中都有應用。Runner
的 store_addrs
欄位是個 cache,它在執行任務時首先嘗試在這個 cache 中找,找不到則向 PD 發送 RPC 請求來進行查詢,並將查詢結果添加進 cache 裏。
RaftClient
TiKV 是一個 Multi Raft 的結構,Region 的副本之間,即 Raft group 的成員之間需要相互通信,RaftClient
的作用便是管理 TiKV 之間的連接,並用於向其它 TiKV 節點發送 Raft 消息。RaftClient
可以和另一個節點建立多個連接,並把不同 Region 的請求均攤到這些連接上。這部分代碼的主要的複雜性就在於連接的建立,也就是 Conn::new
這個函數。建立連接的代碼的關鍵部分如下:
let client1 = TikvClient::new(channel);
let (tx, rx) = batch::unbounded::<RaftMessage>(RAFT_MSG_NOTIFY_SIZE);
let rx = batch::BatchReceiver::new(rx, RAFT_MSG_MAX_BATCH_SIZE, Vec::new, |v, e| v.push(e));
let rx1 = Arc::new(Mutex::new(rx));
let (batch_sink, batch_receiver) = client1.batch_raft().unwrap();
let batch_send_or_fallback = batch_sink
.send_all(Reusable(rx1).map(move |v| {
let mut batch_msgs = BatchRaftMessage::new();
batch_msgs.set_msgs(RepeatedField::from(v));
(batch_msgs, WriteFlags::default().buffer_hint(false))
})).then(/*...*/);
client1.spawn(batch_send_or_fallback.map_err(/*...*/));
上述代碼向指定地址調用了 batch_raft
這個 gRPC 介面。batch_raft
和 raft
都是 stream 介面。對 RaftClient
調用 send
方法會將消息發送到對應的 Conn
的 stream
成員,即上述代碼的 tx
中,而在 gRPC 的線程中則會從 rx
中取出這些消息(這些消息被 BatchReceiver
這一層 batch 起來以提升性能),並通過網路發送出去。
如果對方不支持 batch,則會 fallback 到 raft
介面。這種情況通常僅在從舊版本升級的過程中發生。
RaftStoreRouter 與 Transport
RaftStoreRouter
負責將收到的 Raft 消息轉發給 raftstore 中對應的 Region,而 Transport
負責將 Raft 消息發送到指定的 store。
ServerRaftStoreRouter
是在 TiKV 實際運行時將會使用的 RaftStoreRouter
的實現,它包含一個內層的、由 raftstore 提供的 RaftRouter
對象和一個 LocalReader
對象。收到的請求如果是一個只讀的請求,則會由 LocalReader
處理;其它情況則是交給內層的 router 來處理。
ServerTransport
則是 TiKV 實際運行時使用的 Transport
的實現(Transport
trait 的定義在 raftstore 中),其內部包含一個 RaftClient
用於進行 RPC 通信。發送消息時,ServerTransport
通過上面說到的 Resolver 將消息中的 store id 解析為地址,並將解析的結果存入 raft_client.addrs
中;下次向同一個 store 發送消息時便不再需要再次解析。接下來,再通過 RaftClient
進行 RPC 請求,將消息發送出去。
Node
Node
可以認為是將 raftstore 的複雜的創建、啟動和停止邏輯進行封裝的一層,其內部的 RaftBatchSystem
便是 raftstore 的核心。在啟動過程中(即 Node
的 start
函數中),如果該節點是一個新建的節點,那麼會進行 bootstrap 的過程,包括分配 store id、分配第一個 Region 等操作。
Node
並沒有直接包含在 Server
之內,但是 raftstore 的運行需要有用於向其它 TiKV 發送消息的 Transport
,而 Transport
作為提供網路通信功能的一部分,則是包含在 Server
內。所以我們可以看到,在 src/binutil/server.rs
文件的 run_raft_server
中(被 tikv-server 的 main
函數調用),啟動過程中需要先創建 Server
,然後創建並啟動 Node
並把 Server
所創建的 Transport
傳給 Node
,最後再啟動 Node
。
Service
TiKV 包含多個 gRPC service。其中,最重要的一個是 KvService
,位於 src/server/service/kv.rs
文件中。
KvService
定義了 TiKV 的 kv_get
,kv_scan
,kv_prewrite
,kv_commit
等事務操作的 API,用於執行 TiDB 下推下來的複雜查詢和計算的 coprocessor
API,以及 raw_get
,raw_put
等 Raw KV API。batch_commands
介面則是用於將上述的介面 batch 起來,以優化高吞吐量的場景。當我們要為 TiKV 添加一個新的 API 時,首先就要在 kvproto 項目中添加相關消息體的定義,並在這裡添加相關代碼。另外,TiKV 的 Raft group 各成員之間通信用到的 raft
和 batch_raft
介面也是在這裡提供的。
下面以 kv_prewrite
為例,介紹 TiKV 處理一個請求的流程。首先,無論是直接調用還是通過 batch_commands
介面調用,都會調用 future_prewrite
函數,並在該函數返回的 future 附加上根據結果發送響應的操作,再將得到的 future spawn 到 RpcContext
,也就是一個線程池裡。future_prewrite
的邏輯如下:
// 從請求體中取出調用 prewrite 所需的參數
let (cb, f) = paired_future_callback();
let res = storage.async_prewrite(/*其它參數*/, cb);
AndThenWith::new(res, f.map_err(Error::from)).map(|v| {
let mut resp = PrewriteResponse::new();
if let Some(err) = extract_region_error(&v) {
resp.set_region_error(err);
} else {
resp.set_errors(RepeatedField::from_vec(extract_key_errors(v)));
}
resp
})
這裡的 paired_future_callback
是一個 util 函數,它返回一個閉包 cb
和一個 future f
,當 cb
被調用時 f
就會返回被傳入 cb
的值。上述代碼會立刻返回,但 future 中的邏輯在 async_prewrite
中的非同步操作完成之後才會執行。一旦 prewrite 操作完成,cb
便會被調用,將結果傳給 f
,接下來,我們寫在 future
中的創建和發送 Response 的邏輯便會繼續執行。
總結
以上就是 TiKV 的 Service 層的代碼解析。大家可以看到這些代碼大量使用 trait 和泛型,這是為了方便將其中一些組件替換成另外一些實現,方便編寫測試代碼。另外,在 src/server/snap.rs
中,我們還有一個專門用於處理 Snapshot 的模塊,由於 Snapshot 消息的特殊性,在其它模塊中也有一些針對 snapshot 的代碼。關於 Snapshot,我們將在另一篇文章裏進行詳細講解,敬請期待。
原文閱讀: https://www. pingcap.com/blog-cn/tik v-source-code-reading-9/
更多 TiKV 源碼閱讀:
博客 | PingCAP ?
www.pingcap.com 推薦閱讀: