作者:周振靖

之前的 TiKV 源碼解析系列文章介紹了 TiKV 依賴的周邊庫,從本篇文章開始,我們將開始介紹 TiKV 自身的代碼。本文重點介紹 TiKV 最外面的一層——Service 層。

TiKV 的 Service 層的代碼位於 src/server 文件夾下,其職責包括提供 RPC 服務、將 store id 解析成地址、TiKV 之間的相互通信等。這一部分的代碼並不是特別複雜。本篇將會簡要地介紹 Service 層的整體結構和組成 Service 層的各個組件。

整體結構

位於 src/server/server.rs 文件中的 Server 是我們本次介紹的 Service 層的主體。它封裝了 TiKV 在網路上提供服務和 Raft group 成員之間相互通信的邏輯。Server 本身的代碼比較簡短,大部分代碼都被分離到 RaftClientTransportSnapRunner 和幾個 gRPC service 中。上述組件的層次關係如下圖所示:

接下來,我們將詳細介紹這些組件。

Resolver

在一個集羣中,每個 TiKV 實例都由一個唯一的 store id 進行標識。Resolver 的功能是將 store id 解析成 TiKV 的地址和埠,用於建立網路通信。

Resolver 是一個很簡單的組件,其介面僅包含一個函數:

pub trait StoreAddrResolver: Send + Clone {
fn resolve(&self, store_id: u64, cb: Callback) -> Result<()>;
}

其中 Callback 用於非同步地返回結果。PdStoreAddrResolver 實現了該 trait,它的 resolve 方法的實現則是簡單地將查詢任務通過其 sched 成員發送給 Runner。而 Runner 則實現了 Runnable<Task>,其意義是 Runner 可以在自己的一個線程裏運行,外界將會向 Runner 發送 Task 類型的消息,Runner 將對收到的 Task 進行處理。 這裡使用了由 TiKV 的 util 提供的一個單線程 worker 框架,在 TiKV 的很多處代碼中都有應用。Runnerstore_addrs 欄位是個 cache,它在執行任務時首先嘗試在這個 cache 中找,找不到則向 PD 發送 RPC 請求來進行查詢,並將查詢結果添加進 cache 裏。

RaftClient

TiKV 是一個 Multi Raft 的結構,Region 的副本之間,即 Raft group 的成員之間需要相互通信,RaftClient 的作用便是管理 TiKV 之間的連接,並用於向其它 TiKV 節點發送 Raft 消息。RaftClient 可以和另一個節點建立多個連接,並把不同 Region 的請求均攤到這些連接上。這部分代碼的主要的複雜性就在於連接的建立,也就是 Conn::new 這個函數。建立連接的代碼的關鍵部分如下:

let client1 = TikvClient::new(channel);

let (tx, rx) = batch::unbounded::<RaftMessage>(RAFT_MSG_NOTIFY_SIZE);
let rx = batch::BatchReceiver::new(rx, RAFT_MSG_MAX_BATCH_SIZE, Vec::new, |v, e| v.push(e));
let rx1 = Arc::new(Mutex::new(rx));

let (batch_sink, batch_receiver) = client1.batch_raft().unwrap();
let batch_send_or_fallback = batch_sink
.send_all(Reusable(rx1).map(move |v| {
let mut batch_msgs = BatchRaftMessage::new();
batch_msgs.set_msgs(RepeatedField::from(v));
(batch_msgs, WriteFlags::default().buffer_hint(false))
})).then(/*...*/);

client1.spawn(batch_send_or_fallback.map_err(/*...*/));

上述代碼向指定地址調用了 batch_raft 這個 gRPC 介面。batch_raftraft 都是 stream 介面。對 RaftClient 調用 send 方法會將消息發送到對應的 Connstream 成員,即上述代碼的 tx 中,而在 gRPC 的線程中則會從 rx 中取出這些消息(這些消息被 BatchReceiver 這一層 batch 起來以提升性能),並通過網路發送出去。

如果對方不支持 batch,則會 fallback 到 raft 介面。這種情況通常僅在從舊版本升級的過程中發生。

RaftStoreRouter 與 Transport

RaftStoreRouter 負責將收到的 Raft 消息轉發給 raftstore 中對應的 Region,而 Transport 負責將 Raft 消息發送到指定的 store。

ServerRaftStoreRouter 是在 TiKV 實際運行時將會使用的 RaftStoreRouter 的實現,它包含一個內層的、由 raftstore 提供的 RaftRouter 對象和一個 LocalReader 對象。收到的請求如果是一個只讀的請求,則會由 LocalReader 處理;其它情況則是交給內層的 router 來處理。

ServerTransport 則是 TiKV 實際運行時使用的 Transport 的實現(Transport trait 的定義在 raftstore 中),其內部包含一個 RaftClient 用於進行 RPC 通信。發送消息時,ServerTransport 通過上面說到的 Resolver 將消息中的 store id 解析為地址,並將解析的結果存入 raft_client.addrs 中;下次向同一個 store 發送消息時便不再需要再次解析。接下來,再通過 RaftClient 進行 RPC 請求,將消息發送出去。

Node

Node 可以認為是將 raftstore 的複雜的創建、啟動和停止邏輯進行封裝的一層,其內部的 RaftBatchSystem 便是 raftstore 的核心。在啟動過程中(即 Nodestart 函數中),如果該節點是一個新建的節點,那麼會進行 bootstrap 的過程,包括分配 store id、分配第一個 Region 等操作。

Node 並沒有直接包含在 Server 之內,但是 raftstore 的運行需要有用於向其它 TiKV 發送消息的 Transport,而 Transport 作為提供網路通信功能的一部分,則是包含在 Server 內。所以我們可以看到,在 src/binutil/server.rs文件的 run_raft_server 中(被 tikv-server 的 main 函數調用),啟動過程中需要先創建 Server,然後創建並啟動 Node 並把 Server 所創建的 Transport 傳給 Node,最後再啟動 Node

Service

TiKV 包含多個 gRPC service。其中,最重要的一個是 KvService,位於 src/server/service/kv.rs 文件中。

KvService 定義了 TiKV 的 kv_getkv_scankv_prewritekv_commit 等事務操作的 API,用於執行 TiDB 下推下來的複雜查詢和計算的 coprocessor API,以及 raw_getraw_put 等 Raw KV API。batch_commands 介面則是用於將上述的介面 batch 起來,以優化高吞吐量的場景。當我們要為 TiKV 添加一個新的 API 時,首先就要在 kvproto 項目中添加相關消息體的定義,並在這裡添加相關代碼。另外,TiKV 的 Raft group 各成員之間通信用到的 raftbatch_raft 介面也是在這裡提供的。

下面以 kv_prewrite 為例,介紹 TiKV 處理一個請求的流程。首先,無論是直接調用還是通過 batch_commands 介面調用,都會調用 future_prewrite 函數,並在該函數返回的 future 附加上根據結果發送響應的操作,再將得到的 future spawn 到 RpcContext,也就是一個線程池裡。future_prewrite 的邏輯如下:

// 從請求體中取出調用 prewrite 所需的參數

let (cb, f) = paired_future_callback();
let res = storage.async_prewrite(/*其它參數*/, cb);

AndThenWith::new(res, f.map_err(Error::from)).map(|v| {
let mut resp = PrewriteResponse::new();
if let Some(err) = extract_region_error(&v) {
resp.set_region_error(err);
} else {
resp.set_errors(RepeatedField::from_vec(extract_key_errors(v)));
}
resp
})

這裡的 paired_future_callback 是一個 util 函數,它返回一個閉包 cb 和一個 future f,當 cb 被調用時 f 就會返回被傳入 cb 的值。上述代碼會立刻返回,但 future 中的邏輯在 async_prewrite 中的非同步操作完成之後才會執行。一旦 prewrite 操作完成,cb 便會被調用,將結果傳給 f,接下來,我們寫在 future 中的創建和發送 Response 的邏輯便會繼續執行。

總結

以上就是 TiKV 的 Service 層的代碼解析。大家可以看到這些代碼大量使用 trait 和泛型,這是為了方便將其中一些組件替換成另外一些實現,方便編寫測試代碼。另外,在 src/server/snap.rs 中,我們還有一個專門用於處理 Snapshot 的模塊,由於 Snapshot 消息的特殊性,在其它模塊中也有一些針對 snapshot 的代碼。關於 Snapshot,我們將在另一篇文章裏進行詳細講解,敬請期待。

原文閱讀:pingcap.com/blog-cn/tik

更多 TiKV 源碼閱讀:

博客 | PingCAP?

www.pingcap.com
圖標

推薦閱讀:
相關文章