Exactly-once語義

Flink自1.4.0開始實現exactly-once的數據保證,即在任何情況下都能保證數據對應用產生的效果只有一次,不會多也不會少。具體實現依賴於抽象類TwoPhaseCommitSinkFunction,用戶只需要實現類介面就可以自己定義對外部系統的exactly-once。

Flink的checkpoint可以保證作業失敗的情況下從最近一次快照進行恢復,也就是可以保證系統內部的exactly-once。但是,flink有很多外接系統,比如將數據寫到kafka,一旦作業失敗重啟,offset重置,會消費舊數據,從而將重複的結果寫到kafka。如下圖:

這個時候,僅靠系統本身是無法保證exactly-once的。系統之間的數據一致性一般要靠2PC協議來保證,flink的TwoPhaseCommitSinkFunction也是基於此實現的。exactly-once的語義如下圖:

Exactly-once VS At-least-once

運算元做快照時,如果等所有輸入端的barrier都到了才開始做快照,那麼就可以保證運算元的exactly-once;如果為了降低延時而跳過對其,從而繼續處理數據,那麼等barrier都到齊後做快照就是at-least-once了,因為這次的快照摻雜了下一次快照的數據,當作業失敗恢復的時候,這些數據會重複作用系統,就好像這些數據被消費了兩遍。

註:對齊只會發生在運算元的上端是join操作以及上游存在partition或者shuffle的情況,對於直連操作類似map、flatMap、filter等還是會保證exactly-once的語義。

端到端的Exactly once實現

下面以一個簡單的flink讀寫kafka作為例子來說明(kafka0.11版本開始支持exactly-once語義)。如圖所示:

上圖由kafka source、window操作和kafka sink組成。

為了保證exactly-once語義,作業必須在一次事務中將緩存的數據全部寫入kafka。一次commit會提交兩個checkpoint之間所有的數據。

pre-commit階段起始於一次快照的開始,即master節點將checkpoint的barrier注入source端,barrier隨著數據向下流動直到sink端。barrier每到一個運算元,都會出發運算元做本地快照。如下圖所示:

當狀態涉及到外部系統時,需要外部系統支持事務操作來配合flink實現2PC協議,從而保證數據的exatly-once。這個時候,sink運算元出了將自己的state寫到後段,還必須準備好事務提交。

當所有的運算元都做完了本地快照並且回復到master節點時,pre-commit階段才算結束。這個時候,checkpoint已經成功,並且包含了外部系統的狀態。如果作業失敗,可以進行恢復。

接下來是通知所有的運算元這次checkpoint成功了,即2PC的commit階段。source節點和window節點沒有外部狀態,所以這時它們不需要做任何操作。而對於sink節點,需要commit這次事務,將數據寫到外部系統。

總的來說,流程如下:

  • 一旦所有的運算元完成了它們的pre-commit,它們會要求一個commit。
  • 如果存在一個運算元pre-commit失敗了,本次事務失敗,我們回滾到上次的checkpoint。
  • 一旦master做出了commit的決定,那麼這個commit必須得到執行,就算宕機恢復也有繼續執行。

實現flink的2PC

由於2PC協議比較複雜,所以flink對它做了抽象,即TwoPhaseCommitSinkFunction。可以通過以下四步實現:

  • beginTransaction。開始一次事務,在目的文件系統創建一個臨時文件。接下來我們就可以將數據寫到這個文件。
  • preCommit。在這個階段,將文件flush掉,同時重起一個文件寫入,作為下一次事務的開始。
  • commit。這個階段,將文件寫到真正的目的目錄。值得注意的是,這會增加數據可視的延時。
  • abort。如果回滾,那麼刪除臨時文件。

如果pre-commit成功了但是commit沒有到達運算元舊宕機了,flink會將運算元恢復到pre-commit時的狀態,然後繼續commit。我們需要做的還有就是保證commit的冪等性,這可以通過檢查臨時文件是否還在來實現。

總結

  • Flink依託checkpoint來實現端到端的一致性語義。
  • 這種方法的優勢是不需要持久化傳輸中的數據,沒有必要將每個階段的計算都寫到磁碟。
  • Flink抽象了TwoPhaseCommitSinkFunction來幫助用戶更好地實現exactly-once語義。
  • 自Flink 1.4.0,Pravega和Kafka 0.11都支持了exactly-once語義。
  • Kafka 0.11在TwoPhaseCommitSinkFunction實現了事務支持,並且開銷很小。

推薦閱讀:

相关文章