本文是根據 Apache Flink 基礎篇系列直播整理而成,由 Apache Flink PMC 戴資力與阿里巴巴高級產品專家陳守元共同分享。Apache Flink 系列入門教程每周更新一期,持續推送。

▼ 預告 ▼

自 Flink 1.0 版本後變動最大又十分神秘的 Flink 1.9 版本即將發布,Flink 1.9 版本有哪些重大變更,會增加哪些新特性?本周六 Apache Flink Meetup 北京站為你詳細解讀,請鎖定直播,掃碼觀看,拿獎品,掃描下方二維碼即可參與!

直播二維碼

本文主要內容:

一. Apache Flink 的定義、架構及原理

  • 1. Flink Application
  • 2.Flink Architecture
  • 3.Flink Operation
  • 4.Flink 的應用場景

二.「有狀態的流式處理」概念解析

  • 1.傳統批次處理
  • 2.理想方法
  • 3.流式處理
  • 4.分散式流式處理
  • 5.有狀態分散式流式處理

三. Apache Flink 的優勢

  • 1.狀態容錯
  • 2.狀態維護
  • 3.Event - Time
  • 4.狀態保存與遷移

四. 總結

一. Apache Flink 的定義、架構及原理

Apache Flink 是一個分散式大數據處理引擎,可對有限數據流和無限數據流進行有狀態或無狀態的計算,能夠部署在各種集群環境,對各種規模大小的數據進行快速計算。

1. Flink Application

了解 Flink 應用開發需要先理解 Flink 的 Streams、State、Time 等基礎處理語義以及 Flink 兼顧靈活性和方便性的多層次 API。

  • Streams,流,分為有限數據流與無限數據流,unbounded stream 是有始無終的數據流,即無限數據流;而 bounded stream 是限定大小的有始有終的數據集合,即有限數據流,二者的區別在於無限數據流的數據會隨時間的推演而持續增加,計算持續進行且不存在結束的狀態,相對的有限數據流數據大小固定,計算最終會完成並處於結束的狀態。
  • State,狀態是計算過程中的數據信息,在容錯恢復和 Checkpoint 中有重要的作用,流計算在本質上是 Incremental Processing,因此需要不斷查詢保持狀態;另外,為了確保 Exactly- once 語義,需要數據能夠寫入到狀態中;而持久化存儲,能夠保證在整個分散式系統運行失敗或者掛掉的情況下做到 Exactly- once,這是狀態的另外一個價值。
  • Time,分為 Event time、Ingestion time、Processing time,Flink 的無限數據流是一個持續的過程,時間是我們判斷業務狀態是否滯後,數據處理是否及時的重要依據。
  • API,API 通常分為三層,由上而下可分為 SQL / Table API、DataStream API、ProcessFunction 三層,API 的表達能力及業務抽象能力都非常強大,但越接近 SQL 層,表達能力會逐步減弱,抽象能力會增強,反之,ProcessFunction 層 API 的表達能力非常強,可以進行多種靈活方便的操作,但抽象能力也相對越小。

2.Flink Architecture

在架構部分,主要分為以下四點:

第一, Flink 具備統一的框架處理有界和無界兩種數據流的能力

第二, 部署靈活,Flink 底層支持多種資源調度器,包括 Yarn、Kubernetes 等。Flink 自身帶的 Standalone 的調度器,在部署上也十分靈活。

第三, 極高的可伸縮性,可伸縮性對於分散式系統十分重要,阿里巴巴雙 11 大屏採用 Flink 處理海量數據,使用過程中測得 Flink 峰值可達 17 億 / 秒。

第四, 極致的流式處理性能。Flink 相對於 Storm 最大的特點是將狀態語義完全抽象到框架中,支持本地狀態讀取,避免了大量網路 IO,可以極大提升狀態存取的性能。

3.Flink Operation

後面會有專門課程講解,此處簡單分享 Flink 關於運維及業務監控的內容:

  • Flink 具備 7 X 24 小時高可用的 SOA(面向服務架構),原因是在實現上 Flink 提供了一致性的 Checkpoint。Checkpoint 是 Flink 實現容錯機制的核心,它周期性的記錄計算過程中 Operator 的狀態,並生成快照持久化存儲。當 Flink 作業發生故障崩潰時,可以有選擇的從 Checkpoint 中恢復,保證了計算的一致性。
  • Flink 本身提供監控、運維等功能或介面,並有內置的 WebUI,對運行的作業提供 DAG 圖以及各種 Metric 等,協助用戶管理作業狀態。

4.Flink 的應用場景

4.1 Flink 的應用場景:Data Pipeline

Data Pipeline 的核心場景類似於數據搬運並在搬運的過程中進行部分數據清洗或者處理,而整個業務架構圖的左邊是 Periodic ETL,它提供了流式 ETL 或者實時 ETL,能夠訂閱消息隊列的消息並進行處理,清洗完成後實時寫入到下游的 Database 或 File system 中。場景舉例:

  • 實時數倉

當下游要構建實時數倉時,上游則可能需要實時的 Stream ETL。這個過程會進行實時清洗或擴展數據,清洗完成後寫入到下游的實時數倉的整個鏈路中,可保證數據查詢的時效性,形成實時數據採集、實時數據處理以及下游的實時 Query。

  • 搜索引擎推薦

搜索引擎這塊以淘寶為例,當賣家上線新商品時,後台會實時產生消息流,該消息流經過 Flink 系統時會進行數據的處理、擴展。然後將處理及擴展後的數據生成實時索引,寫入到搜索引擎中。這樣當淘寶賣家上線新商品時,能在秒級或者分鐘級實現搜索引擎的搜索。

4.2 Flink 應用場景:Data Analytics

Data Analytics,如圖,左邊是 Batch Analytics,右邊是 Streaming Analytics。Batch Analysis 就是傳統意義上使用類似於 Map Reduce、Hive、Spark Batch 等,對作業進行分析、處理、生成離線報表,Streaming Analytics 使用流式分析引擎如 Storm,Flink 實時處理分析數據,應用較多的場景如實時大屏、實時報表。

4.3 Flink 應用場景:Data Driven

從某種程度上來說,所有的實時的數據處理或者是流式數據處理都是屬於 Data Driven,流計算本質上是 Data Driven 計算。應用較多的如風控系統,當風控系統需要處理各種各樣複雜的規則時,Data Driven 就會把處理的規則和邏輯寫入到 Datastream 的 API 或者是 ProcessFunction 的 API 中,然後將邏輯抽象到整個 Flink 引擎中,當外面的數據流或者是事件進入就會觸發相應的規則,這就是 Data Driven 的原理。在觸發某些規則後,Data Driven 會進行處理或者是進行預警,這些預警會發到下游產生業務通知,這是 Data Driven 的應用場景,Data Driven 在應用上更多應用於複雜事件的處理。

二.「有狀態的流式處理」概念解析

1. 傳統批處理

傳統批處理方法是持續收取數據,以時間作為劃分多個批次的依據,再周期性地執行批次運算。但假設需要計算每小時出現事件轉換的次數,如果事件轉換跨越了所定義的時間劃分,傳統批處理會將中介運算結果帶到下一個批次進行計算;除此之外,當出現接收到的事件順序顛倒情況下,傳統批處理仍會將中介狀態帶到下一批次的運算結果中,這種處理方式也不盡如人意。

2. 理想方法

第一,要有理想方法,這個理想方法是引擎必須要有能力可以累積狀態和維護狀態,累積狀態代表著過去歷史中接收過的所有事件,會影響到輸出。

第二,時間,時間意味著引擎對於數據完整性有機制可以操控,當所有數據都完全接受到後,輸出計算結果。

第三,理想方法模型需要實時產生結果,但更重要的是採用新的持續性數據處理模型來處理實時數據,這樣才最符合 continuous data 的特性。

3. 流式處理

流式處理簡單來講即有一個無窮無盡的數據源在持續收取數據,以代碼作為數據處理的基礎邏輯,數據源的數據經過代碼處理後產生出結果,然後輸出,這就是流式處理的基本原理。

4. 分散式流式處理

假設 Input Streams 有很多個使用者,每個使用者都有自己的 ID,如果計算每個使用者出現的次數,我們需要讓同一個使用者的出現事件流到同一運算代碼,這跟其他批次需要做 group by 是同樣的概念,所以跟 Stream 一樣需要做分區,設定相應的 key,然後讓同樣的 key 流到同一個 computation instance 做同樣的運算。

5. 有狀態分散式流式處理

如圖,上述代碼中定義了變數 X,X 在數據處理過程中會進行讀和寫,在最後輸出結果時,可以依據變數 X 決定輸出的內容,即狀態 X 會影響最終的輸出結果。這個過程中,第一個重點是先進行了狀態 co-partitioned key by,同樣的 key 都會流到 computation instance,與使用者出現次數的原理相同,次數即所謂的狀態,這個狀態一定會跟同一個 key 的事件累積在同一個 computation instance。

相當於根據輸入流的 key 重新分區的 狀態,當分區進入 stream 之後,這個 stream 會累積起來的狀態也變成 copartiton 了。第二個重點是 embeded local state backend。有狀態分散式流式處理的引擎,狀態可能會累積到非常大,當 key 非常多時,狀態可能就會超出單一節點的 memory 的負荷量,這時候狀態必須有狀態後端去維護它;在這個狀態後端在正常狀況下,用 in-memory 維護即可。

三. Apache Flink 的優勢

1. 狀態容錯

當我們考慮狀態容錯時難免會想到精確一次的狀態容錯,應用在運算時累積的狀態,每筆輸入的事件反映到狀態,更改狀態都是精確一次,如果修改超過一次的話也意味著數據引擎產生的結果是不可靠的。

  • 如何確保狀態擁有精確一次(Exactly-once guarantee)的容錯保證?
  • 如何在分散式場景下替多個擁有本地狀態的運運算元產生一個全域一致的快照(Global consistent snapshot)?
  • 更重要的是,如何在不中斷運算的前提下產生快照?

1.1 簡單場景的精確一次容錯方法

還是以使用者出現次數來看,如果某個使用者出現的次數計算不準確,不是精確一次,那麼產生的結果是無法作為參考的。在考慮精確的容錯保證前,我們先考慮最簡單的使用場景,如無限流的數據進入,後面單一的 Process 進行運算,每處理完一筆計算即會累積一次狀態,這種情況下如果要確保 Process 產生精確一次的狀態容錯,每處理完一筆數據,更改完狀態後進行一次快照,快照包含在隊列中並與相應的狀態進行對比,完成一致的快照,就能確保精確一次。

1.2 分散式狀態容錯

Flink 作為分散式的處理引擎,在分散式的場景下,進行多個本地狀態的運算,只產生一個全域一致的快照,如需要在不中斷運算值的前提下產生全域一致的快照,就涉及到分散式狀態容錯。

  • Global consistent snapshot

關於 Global consistent snapshot,當 Operator 在分散式的環境中,在各個節點做運算,首先產生 Global consistent snapshot 的方式就是處理每一筆數據的快照點是連續的,這筆運算流過所有的運算值,更改完所有的運算值後,能夠看到每一個運算值的狀態與該筆運算的位置,即可稱為 consistent snapshot,當然,Global consistent snapshot 也是簡易場景的延伸。

  • 容錯恢復

首先了解一下 Checkpoint,上面提到連續性快照每個 Operator 運算值本地的狀態後端都要維護狀態,也就是每次將產生檢查點時會將它們傳入共享的 DFS 中。當任何一個 Process 掛掉後,可以直接從三個完整的 Checkpoint 將所有的運算值的狀態恢復,重新設定到相應位置。Checkpoint 的存在使整個 Process 能夠實現分散式環境中的 Exactly-once。

1.3 分散式快照(Distributed Snapshots)方法

關於 Flink 如何在不中斷運算的狀況下持續產生 Global consistent snapshot,其方式是基於用 simple lamport 演演算法機制下延伸的。已知的一個點 Checkpoint barrier, Flink 在某個 Datastream 中會一直安插 Checkpoint barrier,Checkpoint barrier 也會 N — 1 等等,Checkpoint barrier N 代表著所有在這個範圍裡面的數據都是 Checkpoint barrier N。

舉例:假設現在需要產生 Checkpoint barrier N,但實際上在 Flink 中是由 job manager 觸發 Checkpoint,Checkpoint 被觸發後開始從數據源產生 Checkpoint barrier。當 job 開始做 Checkpoint barrier N 的時候,可以理解為 Checkpoint barrier N 需要逐步填充左下角的表格。

如圖,當部分事件標為紅色,Checkpoint barrier N 也是紅色時,代表著這些數據或事件都由 Checkpoint barrier N 負責。Checkpoint barrier N 後面白色部分的數據或事件則不屬於 Checkpoint barrier N。

在以上的基礎上,當數據源收到 Checkpoint barrier N 之後會先將自己的狀態保存,以讀取 Kafka 資料為例,數據源的狀態就是目前它在 Kafka 分區的位置,這個狀態也會寫入到上面提到的表格中。下游的 Operator 1 會開始運算屬於 Checkpoint barrier N 的數據,當 Checkpoint barrier N 跟著這些數據流動到 Operator 1 之後,Operator 1 也將屬於 Checkpoint barrier N 的所有數據都反映在狀態中,當收到 Checkpoint barrier N 時也會直接對 Checkpoint 去做快照。

當快照完成後繼續往下遊走,Operator 2 也會接收到所有數據,然後搜索 Checkpoint barrier N 的數據並直接反映到狀態,當狀態收到 Checkpoint barrier N 之後也會直接寫入到 Checkpoint N 中。以上過程到此可以看到 Checkpoint barrier N 已經完成了一個完整的表格,這個表格叫做 Distributed Snapshots,即分散式快照。分散式快照可以用來做狀態容錯,任何一個節點掛掉的時候可以在之前的 Checkpoint 中將其恢復。繼續以上 Process,當多個 Checkpoint 同時進行,Checkpoint barrier N 已經流到 job manager 2,Flink job manager 可以觸發其他的 Checkpoint,比如 Checkpoint N + 1,Checkpoint N + 2 等等也同步進行,利用這種機制,可以在不阻擋運算的狀況下持續地產生 Checkpoint。

2. 狀態維護

狀態維護即用一段代碼在本地維護狀態值,當狀態值非常大時需要本地的狀態後端來支持。

如圖,在 Flink 程序中,可以採用 getRuntimeContext().getState(desc); 這組 API 去註冊狀態。Flink 有多種狀態後端,採用 API 註冊狀態後,讀取狀態時都是通過狀態後端來讀取的。Flink 有兩種不同的狀態值,也有兩種不同的狀態後端:

  • JVM Heap 狀態後端,適合數量較小的狀態,當狀態量不大時就可以採用 JVM Heap 的狀態後端。JVM Heap 狀態後端會在每一次運算值需要讀取狀態時,用 Java object read / writes 進行讀或寫,不會產生較大代價,但當 Checkpoint 需要將每一個運算值的本地狀態放入 Distributed Snapshots 的時候,就需要進行序列化了。

  • RocksDB 狀態後端,它是一種 out of core 的狀態後端。在 Runtime 的本地狀態後端讓使用者去讀取狀態的時候會經過磁碟,相當於將狀態維護在磁碟里,與之對應的代價可能就是每次讀取狀態時,都需要經過序列化和反序列化的過程。當需要進行快照時只將應用序列化即可,序列化後的數據直接傳輸到中央的共享 DFS 中。

Flink 目前支持以上兩種狀態後端,一種是純 memory 的狀態後端,另一種是有資源磁碟的狀態後端,在維護狀態時可以根據狀態的數量選擇相應的狀態後端。

3.Event - Time

3.1 不同時間種類

在 Flink 及其他進階的流式處理引擎出現之前,大數據處理引擎一直只支持 Processing-time 的處理。假設定義一個運算 windows 的窗口,windows 運算設定每小時進行結算。以 Processing-time 進行運算時可以發現數據引擎將 3 點至 4 點間收到的數據做結算。實際上在做報表或者分析結果時是想了解真實世界中 3 點至 4 點之間實際產生數據的輸出結果,了解實際數據的輸出結果就必須採用 Event – Time 了。

如圖,Event - Time 相當於事件,它在數據最源頭產生時帶有時間戳,後面都需要用時間戳來進行運算。用圖來表示,最開始的隊列收到數據,每小時對數據劃分一個批次,這就是 Event - Time Process 在做的事情。

3.2 Event-Time 處理

Event-Time 是用事件真實產生的時間戳去做 Re-bucketing,把對應時間 3 點到 4 點的數據放在 3 點到 4 點的 Bucket,然後 Bucket 產生結果。所以 Event - Time 跟 Processing - time 的概念是這樣對比的存在。

Event - Time 的重要性在於記錄引擎輸出運算結果的時間。簡單來說,流式引擎連續 24 小時在運行、搜集資料,假設 Pipeline 里有一個 windows Operator 正在做運算,每小時能產生結果,何時輸出 windows 的運算值,這個時間點就是 Event - Time 處理的精髓,用來表示該收的數據已經收到。

3.3 Watermarks

Flink 實際上是用 watermarks 來實現 Event - Time 的功能。Watermarks 在 Flink 中也屬於特殊事件,其精髓在於當某個運算值收到帶有時間戳「 T 」的 watermarks 時就意味著它不會接收到新的數據了。使用 watermarks 的好處在於可以準確預估收到數據的截止時間。舉例,假設預期收到數據時間與輸出結果時間的時間差延遲 5 分鐘,那麼 Flink 中所有的 windows Operator 搜索 3 點至 4 點的數據,但因為存在延遲需要再多等 5 分鐘直至收集完 4:05 分的數據,此時方能判定 4 點鐘的資料收集完成了,然後才會產出 3 點至 4 點的數據結果。這個時間段的結果對應的就是 watermarks 的部分。

4. 狀態保存與遷移

流式處理應用無時無刻不在運行,運維上有幾個重要考量:

  • 更改應用邏輯 / 修 bug 等,如何將前一執行的狀態遷移到新的執行?
  • 如何重新定義運行的平行化程度?
  • 如何升級運算叢集的版本號?

Checkpoint 完美符合以上需求,不過 Flink 中還有另外一個名詞保存點(Savepoint),當手動產生一個 Checkpoint 的時候,就叫做一個 Savepoint。Savepoint 跟 Checkpoint 的差別在於檢查點是 Flink 對於一個有狀態應用在運行中利用分散式快照持續周期性的產生 Checkpoint,而 Savepoint 則是手動產生的 Checkpoint,Savepoint 記錄著流式應用中所有運算元的狀態。

如圖,Savepoint A 和 Savepoint B,無論是變更底層代碼邏輯、修 bug 或是升級 Flink 版本,重新定義應用、計算的平行化程度等,最先需要做的事情就是產生 Savepoint。

Savepoint 產生的原理是在 Checkpoint barrier 流動到所有的 Pipeline 中手動插入從而產生分散式快照,這些分散式快照點即 Savepoint。Savepoint 可以放在任何位置保存,當完成變更時,可以直接從 Savepoint 恢復、執行。

從 Savepoint 的恢復執行需要注意,在變更應用的過程中時間在持續,如 Kafka 在持續收集資料,當從 Savepoint 恢復時,Savepoint 保存著 Checkpoint 產生的時間以及 Kafka 的相應位置,因此它需要恢復到最新的數據。無論是任何運算,Event - Time 都可以確保產生的結果完全一致。

假設恢復後的重新運算用 Process Event - Time,將 windows 窗口設為 1 小時,重新運算能夠在 10 分鐘內將所有的運算結果都包含到單一的 windows 中。而如果使用 Event – Time,則類似於做 Bucketing。在 Bucketing 的狀況下,無論重新運算的數量多大,最終重新運算的時間以及 windows 產生的結果都一定能保證完全一致。

四. 總結

本文首先從 Apache Flink 的定義、架構、基本原理入手,對大數據流計算相關的基本概念進行辨析,在此基礎上簡單回顧了大數據處理方式的歷史演進以及有狀態的流式數據處理的原理,最後從目前有狀態的流式處理面臨的挑戰分析 Apache Flink 作為業界公認為最好的流計算引擎之一所具備的天然優勢。希望有助於大家釐清大數據流式處理引擎涉及的基本概念,能夠更加得心應手地使用 Flink。

Tips:

微信公眾號後台貼心小功能上線,回復以下關鍵詞,get 你想要的最新消息:

  • 回復「下載」,獲取 Apache Flink 社區專刊第一季和第二季專刊電子版下載鏈接;
  • 回復「活動」,一鍵了解最新社區Meetup嘉賓及活動信息;
  • 回復「直播」,直播課程表總覽,訂閱及回顧都超方便;

動動手指測試一下?


推薦閱讀:
相关文章