1. Architecture

flink是一個架構和分散式處理引擎,設計目的是有狀態的處理有界流和無界流。flink可以運行與所有通用的集羣管理器,以內存的速度進行計算並且支持任何規模部署。

下面,我們解釋一下Flink架構的重要方面。

1.1 處理無界數據和有界數據

任何種類的數據都是以事件流的形式產生。信用卡交易,感測器測量,機器日誌或網站或移動應用程序上的用戶交互,所有這些數據都作為流生成。

數據可以作為有界流和無界流被處理。

  1. 無界流意思很明顯,只有開始沒有結束。必須連續的處理無界流數據,也即是在事件注入之後立即要對其進行處理。不能等待數據到達了再去全部處理,因為數據是無界的並且永遠不會結束數據注入。 處理無界流數據往往要求事件注入的時候有一定的順序性,例如可以以事件產生的順序注入,這樣會使得處理結果完整。
  2. 有界流,也即是有明確的開始和結束的定義。有界流可以等待數據全部注入完成了再開始處理。注入的順序不是必須的了,因為對於一個靜態的數據集,我們是可以對其進行排序的。有界流的處理也可以稱為批處理。

flink擅長處理有界流和無界流數據集。精確的事件和狀態控制可以使得flink可以運行任何針對無界流處理的應用。有界流數據通過為固定數據集特殊設計的運算元和數據結構的處理,也表現出很好的性能。

1.2 可以部署與任何地方

Flink是分散式系統,為了處理應用程序需要計算資源。Flink可以整合所有通用的集羣資源管理器,比如yarn,mesos,kubernetes,同時也可以單獨運行。

當部署flink應用的時候,flink會根據應用程序配置的並行度自動識別需要的資源並且向資源管理器申請相應的資源。如過發生故障,Flink會通過申請新的容器來替換掉失敗的容器。無論是提交app或者是控制app進行的通訊都是經過rest調用的形式進行的。這使得flink可以很輕鬆的整合到很多環境。

1.3 運行任意規模的應用

Flink可以運行任意規模的流式應用程序。應用會並發成數以千計的task,這些task在集羣中分散式並行運行。因此,應用程序可以利用幾乎無限量的CPU,主內存,磁碟和網路IO。 另外,flink可以保存非常大規模的應用狀態。

其非同步和增量檢查點演算法確保對處理延遲的影響最小,同時保證恰一次的狀態一致性。

1.4 利用內存性能

有狀態Flink應用程序針對本地狀態訪問進行了優化。任務狀態始終保留在內存中,或者,如果狀態大小超過可用內存,則保存在訪問高效的磁碟上。

因此,任務通過訪問本地(通常是內存中)狀態來執行所有計算,從而產生非常低的處理延遲。 Flink通過定期和非同步checkpoint本地狀態到持久存儲來保證在出現故障時的恰一次的狀態一致性。

狀態訪問和存儲的過程如下圖:

2. application

2.1 flink應用組成block

Apache Flink是一個用於對無界和有界數據流進行有狀態計算的框架。 Flink提供在不同抽象級別的API,並為常見用例提供專用庫。

在這裡,我們介紹Flink易於使用和富有表現力的API和庫。

可以用流處理框架構建和執行的應用程序的類型由框架控制流,狀態和時間的成度來決定。下文中,我們會對流程序的組成部分進行介紹,並講解flink處理他們的方法。

2.1.1 Streams

顯然,流是流處理的根本。但是,流可以具有不同的特徵,這些特徵會影響流的處理方式。Flink是一個多功能的處理框架,可以處理任何類型的流。

  • 有界和無界流:流可以是無界的,也可以是有界的。Flink具有處理無界流的複雜功能,但也有專門的操作運算元來有效地處理有界流。
  • 實時處理和離線處理: 所有的數據都是按照流的形式產生。有兩種處理數據的方式,也即是實時處理和緩存下來在進行離線處理。

2.1.2 state

很多流都是由狀態的,當然也有些流僅僅是單獨的處理事件,這些流是無狀態的。運行基本業務邏輯的任何應用程序都需要記住事件或中間結果,以便在以後的時間點訪問它們,例如在收到下一個事件時或在特定持續時間之後。

應用的狀態是flink的一等公民。您可以通過觀察Flink在狀態處理環境中提供的所有功能來查看。

  • Multiple State Primitives(多狀態原語):Flink為不同的數據結構提供狀態原語,例如atomic values, lists, or maps. 開發人員可以根據函數的訪問模式選擇最有效的狀態原語。
  • Pluggable State Backends(可插拔狀態後端):應用程序狀態由可插拔狀態後端管理和checkpoint。 Flink具有不同的狀態後端,可以在內存或RocksDB中存儲狀態,RocksDB是一種高效的嵌入式磁碟數據存儲。 也可以插入自定義狀態後端。
  • Exactly-once state consistency(恰一次狀態一致性):flink的checkpoint和recovery演算法保證了應用狀態在失敗的情況下的一致性。因此,故障是透明處理的,不會影響應用程序的正確性。
  • Very Large State(非常大的狀態):由於其非同步和增量檢查點演算法,Flink能夠維持幾TB的應用程序狀態。
  • Scalable Applications(可擴展的應用程序):Flink通過將狀態重新分配給更多或更少的工作人員來支持有狀態應用程序的擴展。

2.1.3 Time

事件是流程序的另一個重要組成部分。大多數事件流都具有固有的時間語義,因為每個事件都是在特定時間點生成的。此外,許多常見的流計算基於時間,例如窗口聚合,會話化(sessionization),模式檢測(pattern detection)和基於時間的join。

Flink提供了一組豐富的與時間相關的特徵。

  • 事件時間模式:使用事件時間語義處理流的應用程序根據事件的時間戳計算結果。因此,無論是否處理記錄的或實時的事件,事件時間處理都允許準確和一致的結果。
  • 支持watermark:Flink使用watermark來推斷基於事件時間的應用中的時間。watermark也是一種靈活的機制,可以權衡結果的延遲和完整性。
  • 延遲數據處理:

當使用watermark在事件時間模式下處理流時,可能會發生在所有相關事件到達之前已完成計算。這類事件被稱為遲發事件。 Flink具有多種處理延遲事件的選項,例如通過側輸出重新路由它們以及更新以前完成的結果。

  • 處理時間模式:

除了事件時間模式之外,Flink還支持處理時間語義,該處理時間語義執行由處理機器的系統時間觸發計算。處理時間模式適用於具有嚴格的低延遲要求的某些應用,這些要求可以容忍近似結果。

2.2 分層API

Flink提供三層API。 每個API在簡潔性和表達性之間提供不同的權衡,並針對不同的用例。

我們簡要介紹每個API,討論它的應用程序,並展示一個代碼示例。

2.2.1 ProcessFunctions

ProcessFunctions是Flink提供的最具表現力的功能介面。 Flink提供ProcessFunctions來處理來自窗口中分組的單個事件 亦或者一個或兩個輸入流的單個事件。 ProcessFunctions提供對時間和狀態的細粒度控制。 ProcessFunction可以任意修改其狀態並註冊將在未來觸發回調函數的定時器。 因此,ProcessFunctions可以實現許多有狀態事件驅動應用程序所需的複雜的單事件業務邏輯。

以下示例顯示了一個KeyedProcessFunction,它對KeyedStream進行操作並匹配START和END事件。 收到START事件時,該函數會記住其狀態的時間戳,並內註冊一個在四小時內的計時器。 如果在計時器觸發之前收到END事件,則該函數計算END和START事件之間的持續時間,清除狀態並返回該值。 否則,計時器只會觸發並清除狀態。

/**
* Matches keyed START and END events and computes the difference between
* both elements timestamps. The first String field is the key attribute,
* the second String attribute marks START and END events.
*/
public static class StartEndDuration
extends KeyedProcessFunction<String, Tuple2<String, String>, Tuple2<String, Long>> {

private ValueState<Long> startTime;

@Override
public void open(Configuration conf) {
// obtain state handle
startTime = getRuntimeContext()
.getState(new ValueStateDescriptor<Long>("startTime", Long.class));
}

/** Called for each processed event. */
@Override
public void processElement(
Tuple2<String, String> in,
Context ctx,
Collector<Tuple2<String, Long>> out) throws Exception {

switch (in.f1) {
case "START":
// set the start time if we receive a start event.
startTime.update(ctx.timestamp());
// register a timer in four hours from the start event.
ctx.timerService()
.registerEventTimeTimer(ctx.timestamp() + 4 * 60 * 60 * 1000);
break;
case "END":
// emit the duration between start and end event
Long sTime = startTime.value();
if (sTime != null) {
out.collect(Tuple2.of(in.f0, ctx.timestamp() - sTime));
// clear the state
startTime.clear();
}
default:
// do nothing
}
}

/** Called when a timer fires. */
@Override
public void onTimer(
long timestamp,
OnTimerContext ctx,
Collector<Tuple2<String, Long>> out) {

// Timeout interval exceeded. Cleaning up the state.
startTime.clear();
}
}

該示例說明瞭KeyedProcessFunction的表達能力,但也強調了它是一個相當冗長的介面。

2.2.2 The DataStream API

DataStream API為許多常見的流處理操作提供原語,例如窗口化,一次記錄轉換以及通過查詢外部數據存儲來豐富事件。 DataStream API可用於Java和Scala,它基於函數編程,例如map(),reduce()和aggregate()。 可以通過擴展介面或 作為Java或Scala lambda函數來定義函數。

以下示例顯示如何對點擊流進行會話並計算每個會話的點擊次數。

// a stream of website clicks
DataStream<Click> clicks = ...

DataStream<Tuple2<String, Long>> result = clicks
// project clicks to userId and add a 1 for counting
.map(
// define function by implementing the MapFunction interface.
new MapFunction<Click, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(Click click) {
return Tuple2.of(click.userId, 1L);
}
})
// key by userId (field 0)
.keyBy(0)
// define session window with 30 minute gap
.window(EventTimeSessionWindows.withGap(Time.minutes(30L)))
// count clicks per session. Define function as lambda function.
.reduce((a, b) -> Tuple2.of(a.f0, a.f1 + b.f1));
SQL & Table API

2.2.3 SQL & Table API

Flink有兩個關係API,Table API和SQL。 兩個API都是用於批處理和流處理的統一API,即,在無界的實時流或有界的記錄流上以相同的語義執行查詢,併產生相同的結果。 Table API和SQL利用Apache Calcite進行解析,驗證和查詢優化。 它們可以與DataStream和DataSet API無縫集成,並支持用戶定義的標量,聚合和表值函數。

Flink的關係API旨在簡化數據分析,數據pipeline和ETL應用程序的定義。

以下示例顯示了用於對點擊流進行會話並計算每個會話的點擊次數的SQL查詢。 這與DataStream API示例中的用例相同。

SELECT userId, COUNT(*)
FROM clicks
GROUP BY SESSION(clicktime, INTERVAL 30 MINUTE), userId

3. Libraries

Flink具有幾個用於常見數據處理用例的庫。這些庫通常嵌入在API中,而不是完全獨立的。因此,他們可以從API的所有功能中受益,並與其他庫集成。

  • 複雜事件處理(CEP):模式檢測是事件流處理的一個非常常見的用例。 Flink的CEP庫提供了一個API來指定事件模式(想想正則表達式或狀態機)。 CEP庫與Flink的DataStream API集成,以便在DataStream上評估模式。 CEP庫的應用包括網路入侵檢測,業務流程監控和欺詐檢測。
  • DataSet API:DataSet API是Flink用於批處理應用程序的核心API。 DataSet API的原語包括map,reduce,(outer)join,co-group和iterate。 所有操作均由演算法和數據結構支持,這些演算法和數據結構對內存中的序列化數據進行操作,並在數據大小超過內存預算時溢出到磁碟。 Flink的DataSet API的數據處理演算法受到傳統資料庫運算符的啟發,例如混合散列 hash-join 或外部合併排序。
  • Gelly:Gelly是一個可擴展的圖形處理和分析庫。 Gelly是在DataSet API之上實現的,並與DataSet API集成在一起。因此,它受益於其可擴展且強大的操作運算元。 Gelly具有內置演算法,如標籤傳播,三角枚舉和頁面排名,但也提供了一個簡化自定義圖演算法實現的Graph API。

3. operations

Apache Flink是一個用於對無界和有界數據流進行有狀態計算的框架。由於許多流應用程序設計為以最短的停機時間連續運行,因此流處理器必須提供出色的故障恢復,以及在應用程序運行時監視和維護應用程序的工具。

Apache Flink非常關注流處理的操作方面。 在這裡,我們將解釋Flink的故障恢復機制,並介紹其管理和監督正在運行的應用程序的功能。

3.1 7*24小時運行

機器和過程故障在分散式系統中無處不在。 像Flink這樣的分散式流處理器必須從故障中恢復,以便能夠24/7全天候運行流應用程序。 顯然,這不僅意味著在故障後重新啟動應用程序,而且還要確保其內部狀態保持一致,以便應用程序可以繼續處理,就像從未發生過故障一樣。

Flink提供了多種功能,以確保應用程序報紙運行並保持一致:

  • 一致的CheckpointFlink的恢復機制基於應用程序狀態的一致檢查點。如果發生故障,將重新啟動應用程序並從最新檢查點載入其狀態。結合可重置流源,此功能可以保證一次性狀態一致性。 高效的檢查點:如果應用程序保持TB級的狀態,則檢查應用程序的狀態可能非常昂貴。 Flink可以執行非同步和增量檢查點,以便將檢查點對應用程序的延遲SLA的影響保持在非常小的水平。
  • 端到端地恰一次:Flink為特定存儲系統提供事務接收器,保證數據只寫出一次,即使出現故障。
  • 與集羣管理器集成:Flink與集羣管理器緊密集成,例如Hadoop YARN,Mesos或Kubernetes。當進程失敗時,將自動啟動一個新進程來接管其工作。
  • 高可用性設置:Flink具有高可用性模式,可消除所有單點故障。 HA模式基於Apache ZooKeeper,這是一種經過驗證的可靠分散式協調服務。

3.2 更新,遷移,暫停和恢復您的應用程序

需要維護為關鍵業務服務提供支持的流應用程序。 需要修復錯誤,並且需要改進或實現新功能。 但是,更新有狀態流應用程序並非易事。 通常,人們不能簡單地停止應用程序並重新啟動固定版本或改進版本,因為人們無法承受丟失應用程序的狀態。

Flink的Savepoints是一個獨特而強大的功能,可以解決更新有狀態應用程序和許多其他相關挑戰的問題。 Savepoints是應用程序狀態的一致快照,因此與檢查點非常相似。 但是,與檢查點相比,需要手動觸發Savepoints,並且在應用程序停止時不會自動刪除Savepoints。 Savepoints可用於啟動狀態兼容的應用程序並初始化其狀態。 Savepoints可啟用以下功能:

  • 應用程序的演變Savepoints可用於發展應用程序。可以從從先前版本的應用程序中獲取的Savepoints重新啟動應用程序的固定或改進版本。也可以從較早的時間點(假設存在這樣的保存點)啟動應用程序,以修復由有缺陷的版本產生的錯誤結果。
  • 羣集遷移:使用Savepoints,可以將應用程序遷移(或克隆)到不同的羣集。
  • Flink版本更新:可以使用Savepoints遷移應用程序以在新的Flink版本上運行。
  • 應用程序擴展:

    Savepoints可用於增加或減少應用程序的並行性。

  • A/B測試和What-If情景:可以通過啟動同一Savepoints的所有版本來比較兩個(或更多)不同版本的應用程序的性能或質量。
  • 暫停和恢復:可以通過獲取Savepoints並停止它來暫停應用程序。在以後的任何時間點,都可以從Savepoints恢復應用程序。
  • 存檔:Savepoints可以存檔,以便能夠將應用程序的狀態重置為較早的時間點。

4. 監控和控制您的應用程序

與任何其他服務一樣,需要監視連續運行的流應用程序並將其集成到企業的運營架構(即,監視和日誌記錄服務)中。 監控有助於預測問題並提前做出反應。日誌記錄可以根本原因分析來調查故障。最後,控制運行應用程序的易於訪問的界面是一個重要特性。

Flink可以與許多常見的日誌系統和監視服務很好地集成,並提供REST API來控制應用程序和查詢信息。

  • Web UI:Flink具有Web UI,可以檢查,監視和調試正在運行的應用程序。 它還可用於提交執行執行或取消執行。
  • 日誌記錄:Flink實現了流行的slf4j日誌記錄界面,並與日誌框架log4j或logback集成。
  • 度量標準:Flink具有複雜的度量標準系統,用於收集和報告系統和用戶定義的度量標準。 度量標準可以導出到reporter,包括JMX,Ganglia,Graphite,Prometheus,StatsD,Datadog和Slf4j。
  • REST API:

    Flink暴露REST API以提交新應用程序,獲取正在運行的應用程序的Savepoints或取消應用程序。 REST API還暴露元數據和收集的運行或已完成應用程序的指標。

推薦閱讀:

相關文章