作者:lan

本文為 DM 源碼閱讀系列文章的第五篇。上篇文章 介紹了 dump 和 load 兩個數據同步處理單元的設計實現,對核心 interface 實現、數據導入並發模型、數據導入暫停或中斷的恢復進行了分析。本篇文章將詳細地介紹 DM 核心處理單元 Binlog replication,內容包含 binlog 讀取、過濾、路由、轉換,以及執行等邏輯。文內涉及到 shard merge 相關邏輯功能,如 column mapping、shard DDL 同步處理,會在 shard merge 篇單獨詳細講解,這裡就不贅述了。

Binlog replication 處理流程

從上圖可以大致了解到 Binlog replication 的邏輯處理流程,對應的 邏輯入口代碼。

1.從 relay log 或者 MySQL/MariaDB 讀取 binlog events。

2.對 binlog events 進行處理轉換(transformation),這裡可以做三類操作:

3.executor 對 job 進行衝突檢測,然後根據固定規則分發給對應的 worker 執行。

4.定期保存 binlog position/gtid 到 checkpoint。

Binlog 讀取

Binlog replication 支持兩種方式讀取 binlog events:

  • 從遠程的 MySQL/MariaDB
  • 從 DM-worker 的本地 relay log

兩種方式都提供了同樣的讀取方法,處理核心都是 go-mysql。該庫主要提供了兩個功能:

  • 註冊為 MySQL/MariaDB 的 slave server ,從 MySQL/MariaDB 順序讀取 raw binlog events。
  • 解析 raw binlog events。

更多的處理細節會在下篇關於 relay log 的文章中進行介紹,迫不及待的小夥伴可以先翻閱一下相關代碼實現。

Binlog 轉換

處理程序拿到解析好的 binlog event 後,根據 binlog 的類型來對 binlog 進行分類處理。Binlog replication 主要關心以下類型的 binlog event :

Binlog replication 數據處理單元會對每一類 binlog event 進行以下的處理步驟,具體實現的處理順序可能略有差異,以代碼實現為準。

過濾

Binlog replication 會從兩個維度對 binlog event 來進行過濾:

  • 根據 同步庫/表黑白名單,過濾掉對應庫/表的所有 binlog event。
  • 根據 binlog event 過濾規則,過濾掉對應庫/表指定的 binlog event。

row event過濾處理 query event過濾處理 的實現在邏輯上面存在一些差異:

  • row event 包含 庫名和表名 信息;query event 需要通過 tidb parser 解析 event 裡面包含的 query statement 來獲取需要的庫名,表名以及其他信息。
  • tidb parser 不是完全 100% 兼容 MySQL 語法,當遇到 parser 不支持的 query statement 時候,解析就會報錯,從而無法獲取到對應的庫名和表名信息。Binlog replication 提供了一些 內置的不支持的 query statement 正則表達式,配合 href="github.com/pingcap/dm/b">使用 [schema-pattern: *, table-pattern: *]的 binlog event 過濾規則,來跳過 parser 不支持的 query statement。
  • query event 裡面也會包含 statement format binlog event,此時 Binlog replication 就可以利用 parser 解析出來具體的 statement 類型,對不支持的 statement format binlog event 作出相應的處理: 對於需要同步的表,進行報錯處理;不需要同步的表,忽略繼續同步。

路由

binlog 過濾完成之後,對於需要同步的表就會根據過濾步驟獲得的庫名和表名,通過 路由規則 轉換得到需要同步到的目標庫名和表名,在接下來的轉換步驟來使用目標庫名和表名來轉換出正確的 DML 和 DDL statement。

轉換

row event 轉換處理和 query event 轉換處理的實現存在一些差異,這裡分開來講述。

row event 轉換處理通過三個轉換函數生成對應的 statements:

  • generate insert sqls :將 write rows event 轉換為 replace into statements
  • generate update sqls
    • safe mode = true,將 update rows event 轉換為 delete + replace statements。
    • safe mode = false,將 update row event 轉換為 update statements。
  • generate delete sqls:將 delete rows event 轉換為 delete statements。

query event 轉換處理:

  • 因為 TiDB 目前不支持一條 DDL 語句包含多個 DDL 操作,query event 轉換處理會首先嘗試將 包含多個 DDL 變更操作的單條 DDL 語句 拆分成 只包含一個 DDL 操作的多條 DDL 語句(具體代碼實現)。
  • 使用 parser 將 DDL statement 對應的 ast 結構裡面的庫名和表名替換成對應的目標庫名和表名(具體代碼實現)。

通過轉換處理之後,將不同的 binlog event 包裝成不同的 job 發送到 executor 執行:

  • row event -> insert/update/delete job
  • query event -> ddl job
  • xid event -> xid job

Job 執行

衝突檢測

binlog 順序同步模型要求按照 binlog 順序一個一個來同步 binlog event,這樣的順序同步勢必不能滿足高 QPS 低同步延遲的同步需求,並且不是所有的 binlog 涉及到的操作都存在衝突。Binlog replication 採用衝突檢測機制,鑒別出來需要順序執行的 jobs,在確保這些 jobs 的順序執行的基礎上,最大程度地保持其他 job 的並發執行來滿足性能方面的要求。

衝突檢測流程如下:

  • 遇到 DDL job,等待前面已經分發出來的所有 DML jobs 執行完成後,然後單獨執行該 DDL job,執行完成之後保存 checkpoint 信息。
  • 遇到 DML job,會 先檢測並且嘗試解決衝突。如果檢測到衝突(即存在兩個 executor 的 worker 的 jobs 都需要與當前的 job 保持順序執行),會發送一個 flush job 來等待已經分發的所有 DML jobs 執行完成,然後再將 job 分發到對應的 worker,並且記錄該分發信息到內存。在沒有衝突的情況下,如果不需要與已經分發出去的 job 保持順序的話,發送 job 到任意 worker 上;如果需要保持順序的話,那麼根據內存儲存的歷史分發信息,發送 job 到對應的 worker 上。

衝突檢測實現比較簡單,根據轉換步驟獲得每條 statement 對應的 primary/unique key 信息,來進行交集檢測,如果存在交集那麼認定是需要順序的執行兩條 statement,請參考 具體實現代碼。

執行

job 分發到對應的 worker 後,worker 根據一定的規則來批量執行這些 job,如下:

  • 遇到 DDL 立即執行。
  • 遇到 flush 或者積累的 job 數量超過 配置的 batch 數量 立即執行。
  • 沒有新的 job 分發進來,清空當前已經積累的 jobs 或者 sleep 10 ms。

根據上面三個規則可以很快地將已經分發的 jobs 應用到下游 TiDB。

小結

本篇文章詳細地介紹 DM 核心處理單元 Binlog replication,內容包含 binlog 讀取、過濾、路由、轉換,以及執行等邏輯。下一篇我們會對 relay log 數據處理單元的設計進行詳細的講解。

更多閱讀:

博客?

www.pingcap.com
圖標

推薦閱讀:
相关文章