作者:張學程

本文為 DM 源碼閱讀系列文章的第九篇,在 上篇文章 中我們詳細介紹了 DM 對 online schema change 方案的同步支持,對 online schema change 同步方案以及實現細節等邏輯進行了分析。

在本篇文章中,我們將對 shard DDL 同步機制以及 checkpoint 機制等進行詳細的介紹,內容包括 shard group 的定義、shard DDL 的同步協調處理流程、checkpoint 機制以及與之相關的 safe mode 機制。

shard DDL 機制的實現

DM 中通過 庫表路由與列值轉換 功能,實現了對分庫分表合併場景下 DML 的同步支持。但當需要同步的各分表存在 DDL 變更時,還需要對 DDL 的同步進行更多額外的處理。有關分表合併時 shard DDL 同步需要處理的問題以及 DM 中的同步支持原理,請先閱讀 TiDB Ecosystem Tools 原理解讀系列(三)TiDB-DM 架構設計與實現原理。

shard group

在 這篇文章 中,我們介紹了 DM 在處理 shard DDL 同步時引入了兩級 shard group 的概念,即用於執行分表合併同步任務的各 DM-worker 組成的 shard group、每個 DM-worker 內需要進行合表同步的各上游分表組成的 shard group。

DM-worker 組成的 shard group

由 DM-worker 組成的 shard group 是由集群部署拓撲及同步任務配置決定的,即任務配置文件中定義的需要進行合表同步的所有上游 MySQL 實例對應的所有 DM-worker 實例即組成了一個 shard group。為了表示同步過程中的相關動態信息,DM-master 內部引入了兩個概念:

  • Lock:對於每組需要進行合併的表,其中每一條需要進行同步協調的 shard DDL,由一個 Lock 實例進行表示;每個 Lock 實例在有 shard DDL 需要協調同步時被創建、在協調同步完成後被銷毀;在 dmctl 中使用 show-ddl-locks 命令查看到的每一個 Lock 信息即對應一個該實例
  • LockKeeper:維護所有的 Lock 實例信息並提供相關的操作介面

Lock 中各主要成員變數的作用如下:

DM-worker 內分表組成的 shard group

每個 DM-worker 內的 shard group 是由對應上游 MySQL 實例內分表及同步任務配置決定的,即任務配置文件中定義的對應 MySQL 實例內需要進行合併同步到同一個下游目標表的所有分表組成一個 shard group。在 DM-worker 內部,我們維護了下面兩個對象:

  • ShardingGroup:對於每一組需要進行合併的表,由一個 ShardingGroup 實例進行表示;每個 ShardGroup 實例在同步任務啟動階段被創建,在任務停止時被銷毀
  • ShardingGroupKeeper:維護所有的 ShardingGroup 實例信息並提供相關的操作介面

ShardingGroup 中各主要成員變數的作用如下:

shard DDL 同步流程

對於兩級 shard group,DM 內部在依次完成兩個級別的 相應的 shard DDL 同步協調。

  1. 對於 DM-worker 內由各分表組成的 shard group,其 shard DDL 的同步在對應 DM-worker 內部進行協調
  2. 對於由各 DM-worker 組成的 shard group,其 shard DDL 的同步由 DM-master 進行協調

DM-worker 間 shard DDL 協調流程

我們基於在 這篇文章 中展示過的僅包含兩個 DM-worker 的 shard DDL 協調流程示例(如下圖)來了解 DM 內部的具體實現。

  1. DM-worker-1 將 shard DDL 信息發送給 DM-mastera. 當 DM-worker-1 內部 shard DDL 協調完成時,DM-worker-1 將對應的 shard DDL 信息保存在 channel 中供 DM-master 通過 gRPC 獲取b. DM-master 在 fetchWorkerDDLInfo 方法中以 gRPC streaming 的方式讀取到 DM-worker-1 的 shard DDL 信息c. DM-master 調用 ShardingGroupKeeper 的 TrySync 方法創建對應的 lock 信息,並在 lock 中標記已收到 DM-worker-1 的 shard DDL 信息
  2. DM-master 將 lock 信息發回給 DM-worker-1a. DM-master 以 gRPC streaming 的方式將 lock 信息發送給 DM-worker-1b. DM-worker-1 將來自 DM-master 的 lock 信息保存在內存中用於在 DM-master 請求 DM-worker 執行/跳過 shard DDL 時進行驗證
  3. DM-worker-2 將 shard DDL 信息發送給 DM-master(流程與 step.1 一致)
  4. DM-master 將 lock 信息發回給 DM-worker-2(流程與 step.2 一致)
  5. DM-master 協調 DM-worker-1 向下游同步 shard DDLa. DM-master 根據 step.1 與 step.3 時收到的 shard DDL 信息判定已經收到 shard group 內所有 DM-worker 的 shard DDL 信息b. DM-master 在 resolveDDLLock 方法中向 DM-worker-1 發送向下游同步 shard DDL 的請求(Exec 參數為 true)
  6. DM-worker-1 向下游同步 shard DDL

    a. DM-worker-1 接收到來自 DM-master 的向下游執行 shard DDL 的請求

    b. DM-worker-1 構造 DDL job 並添加到 DDL 執行隊列中c. DM-worker-1 將 shard DDL 執行結果保存在 channel 中供 DM-master 通過 gRPC 獲取
  7. DM-worker-2 忽略向下游同步 shard DDLa. DM-master 獲取 DM-worker-1 向下游同步 shard DDL 的結果判斷得知 DM-worker-1 同步 shard DDL 成功b. DM-master 向 DM-worker-2 發送忽略向下游同步 shard DDL 的請求(Exec 參數為 false)c. DM-worker-2 根據 DM-master 請求忽略向下游同步 shard DDL

DM-worker 內 shard DDL 同步流程

我們基於在 實現原理文章 中展示過的一個 DM-worker 內僅包含兩個分表 (table_1,table_2) 的 shard DDL(僅一條 DDL)協調處理流程示例來了解 DM 內部的具體實現。

  1. DM-worker 收到 table_1 的 DDLa. 根據 DDL 及 binlog event position 等信息更新對應的 shard groupb. 確保 binlog replication 過程已進入 safe mode(後文介紹 checkpoint 機制時會再介紹 safe mode)

    c. href="github.com/pingcap/dm/b">更新 table_1 的 checkpoint(後文會詳細介紹 checkpoint 機制)

  2. DM-worker 繼續解析後續的 binlog event根據 step.1 時返回的更新後的 shard group 信息得知還未收到 shard group 內所有分表對應的 shard DDL,不向下游同步 shard DDL 並繼續後續解析
  3. 忽略 table_1 的 DML 並同步 table_2 的 DMLhref="github.com/pingcap/dm/b">由於 table_1 已收到 shard DDL 但 shard DDL 自身還未完成同步,ref="github.com/pingcap/dm/b">忽略對 table_1 相關 DML 的同步
  4. DM-worker 收到 table_2 的 DDL(流程與 step.1 一致)
  5. DM-worker 向下游同步 shard DDLa. 根據 step.4 時返回的更新後的 shard group 信息得知已經收到 shard group 內所有分表對應的 shard DDLb. 嘗試讓 binlog replication 過程退出 safe modec. 將當前 shard DDL 同步完成後 re-sync 時重新同步 step.3 忽略的 DML 所需的相關信息保存在 channel 中d. 等待已分發的所有 DML 同步完成(確保等待並發同步的 DML 都同步到下游後再對下游 schema 進行變更)e. 將 shard DDL 相關信息保存在 channel 中以進行 DM-worker 間的同步(見前文 DM-worker 間 shard DDL 協調流程)f. 待 DM-worker 間協調完成後,向下游同步 shard DDL
  6. 將 binlog 的解析位置重定向回 step.1 對應 DDL 後的 binlog event position 進入 re-sync 階段根據 step.5 中保存的信息,將 binlog 的解析位置重定向回 step.1 對應的 DDL 後的 binlog event position
  7. 重新解析 binlog event
  8. 對於不同表的 DML 做不同的處理

    a. 對於 table_1 在 step.3 時忽略的 DML,解析後向下游同步

    b. href="github.com/pingcap/dm/b">對於 table_2 的 DML,根據 checkpoint 信息忽略向下游同步
  9. 解析到達 step.4 時 DDL 對應的 binlog position,re-sync 階段完成a. 解析 binlog position 到達 step.4 的 DDLb. 結束 re-sync 過程
  10. 繼續進行後續的 DDL 與 DML 的同步

需要注意的是,在上述 step.1 與 step.4 之間,如果有收到 table_1 的其他 DDL,則對於該 shard group,需要協調同步由一組 shard DDL 組成的 ShardingSequence。當在 step.9 對其中某一條 shard DDL 同步完成後,如果有更多的未同步的 shard DDL 需要協調處理,則會重定向到待處理的下一條 shard DDL 對應的位置重新開始解析 binlog event。

checkpoint 機制的實現

DM 中通過 checkpoint 機制來實現同步任務中斷後恢復時的續傳功能。對於 load 階段,其 checkpoint 機制的實現在 DM 源碼閱讀系列文章(四)dump/load 全量同步的實現 文章中我們已經進行了介紹,本文不再贅述。在本文中,我們將介紹 binlog replication 增量同步階段的 checkpoint 機制的實現及與之相關的 safe mode 機制的實現。

checkpoint 機制

DM 在 binlog replication 階段以 binlog event 對應的 position 為 checkpoint,包括兩類:

  1. 全局 checkpiont:對應已成功解析並同步到下游的 binlog event 的 position,同步任務中斷恢復後將從該位置重新進行解析與同步
  2. 每個需要同步 table 的 checkpoint:對應該 table 已成功解析並同步到下游的 binlog event 的 position,主要用於在 re-sync 過程中避免對已同步的數據進行重複同步

DM 的 checkpoint 信息保存在下游資料庫中,通過 RemoteCheckPoint 對象進行讀寫,其主要成員變數包括:

  • globalPoint:用於保存全局 checkpoint
  • points:用於保存各 table 的 checkpoint

checkpoint 信息在下游資料庫中對應的 schema 通過 createTable 方法進行創建,其中各主要欄位的含義為:

對於全局 checkpoint,在以下情況下會更新內存中的信息:

  • 收到 XID event 時(表示一個 DML 事務的結束)
  • DDL 向下游同步成功後

對於各 table checkpoint,在以下情況下會更新內存中的信息:

  • DML 向下游同步成功後
  • DDL 向下游同步成功後
  • 收到 shard DDL 且成功更新了 shard group,但未向下游同步 shard DDL 時

對於全局與 table 的 checkpoint,會在以下情況下 flush 到下游資料庫中:

  • 收到 flush 通知(如同步任務將暫停或停止時)
  • 已分發的任務成功同步到下游(DDL 同步到下游,超過指定時間閾值 flush)

值得注意的是,在 shard DDL 未同步到下游之前,為確保中斷恢復後仍能繼續整個 shard DDL 的協調過程,DM 不會將全局 checkpoint 更新為比 shard DDL 起始 position 更大的 position,DM 也不會將 shard DDL 協調過程中對應 table 的 checkpoint flush 到下游。

safe mode 機制

當同步任務中斷恢復後,DM 在 binlog replication 階段通過 checkpoint 機制保證了重新開始同步的起始點前的數據都已經成功同步到了下游資料庫中,即保證了 at-least-once 語義。但由於 flush checkpoint 與同步 DDL、DML 到下游不是在同一個事務中完成的,因此從 checkpoint 開始重新同步時,可能存在部分數據被重複同步的可能,即不能保證 at-most-once 。

在 DM 的 binlog replication 階段,通過增加 safe mode 機制確保了重複同步數據時的可重入,即:

  • href="https://github.com/pingcap/dm/blob/369933f31b/syncer/dml.go#L132">將 INSERT 操作轉為 REPLACE 操作
  • href="https://github.com/pingcap/dm/blob/369933f31b/syncer/dml.go#L195">將 UPDATE 操作轉為 DELETE 操作和 ="https://github.com/pingcap/dm/blob/369933f31b/syncer/dml.go#L200">REPLACE 操作
  • href="github.com/pingcap/dm/blob/369933f31b/syncer/dml.go#L265">對 DELETE 操作不進行轉換仍保持為 DELETE

目前,safe mode 會在以下情況時啟用:

  • 啟動或恢復任務時的前 5 分鐘,確保從 checkpoint 位置開始被重複同步的部分數據最終一致
  • DM-worker 內進行 shard DDL 同步協調時(見前文 DM-worker 內 shard DDL 同步流程),確保即使 shard DDL 協調過程中異常重啟且 5 分鐘內無法重複同步完之前已同步數據也能最終一致
  • 用戶在同步任務配置文件中指定了啟用 safe mode,用於其他需要以 safe mode 同步超 5 分鐘的場景

小結

本篇文章詳細地介紹了 shard DDL 機制與 checkpoint 機制的實現,內容包括了兩級 shard group 的定義與 DM-worker 間及 DM-worker 內的 shard DDL 同步協調處理流程、checkpoint 機制及與之相關的 safe mode 機制。下一篇文章中,我們將介紹用於保證 DM 正確性與穩定性的測試框架的實現,敬請期待。

原文閱讀:pingcap.com/blog-cn/dm-

更多 DM 源碼閱讀:

博客 | PingCAP?

www.pingcap.com圖標
推薦閱讀:

相关文章