Kafka Stream 分佈式流式處理的新貴

《Kafka設計解析》系列上一篇《Kafka高性能架構之道——Kafka設計解析(六)》從宏觀架構到具體實現分析了Kafka實現高性能的原理。本文介紹了Kafka Stream的架構和併發模型,同時分析了Kafka Stream如何解決流式計算的關鍵問題。

什麼是流式計算。。。。。。。。。。。

一般流式計算會與批量計算相比較。。。。。。。。。。。。

在流式計算模型中,輸入是持續的,在時間上是無界的。這也就意味着,永遠拿不到全量數據集進行計算。同時,計算結果會持續輸出,也即計算結果在時間上也是無界的。

流式計算一般對實時性要求比較高,同時一般是先定義目標計算,然後數據到達後將計算邏輯應用於數據之上。同時爲了提高計算效率,一般儘可能(對於可合併的計算)採用增量計算代替全量計算。

Kafka Stream 分佈式流式處理Kafka設計解析七

批量處理模型中,一般先有全量數據集,然後將計算邏輯應用於該全量數據集。特點是全量計算,並且計算結果一次性全量輸出,在時間上是有界的。

Kafka Stream 分佈式流式處理Kafka設計解析七

Kafka Stream是什麼

Kafka Stream是Kafka從0.10.*引入的一個新的特性。它提供了對存於Kafka內的數據進行分佈式流式處理以和分析的能力。

。。。。。。。。。。。。。

Kafka Stream的特點如下:

  • 除了Kafka外,不依賴於任何外部系統
  • Kafka Stream是一個非常簡單並且輕量級的類庫,可以非常方便地將它嵌入任意Java程序中,也可以任意方式進行打包以及部署
  • 同時提供底層的處理單元Processor(類似於Storm提供的bolt和spout),以及高層抽象的DSL(類似於Spark的group/reduce/map)
  • 通過具有容錯性的state store實現可靠的狀態操作(如windowed join和aggregation)
  • 支持Exactly Once(正好一次)處理語義
  • 具備記錄級(也即行級)的數據處理能力,從而將延遲降低到毫秒級別
  • 充分利用Kafka分區機制以實現Scale Out(水平擴展)並提供順序性保證
  • 支持基於事件時間的窗口操作(Spark Streaming暫不支持事件時間),並且可處理晚到的數據(late arrival of records)

Kafka Stream定位及優勢

當前已經有多種分佈式流式處理系統,最知名且應用最多的開源流式處理系統當屬Twitter開源的Apache Storm和UC berkeley的Spark Streaming。。。。

Apache Storm經過多年發展,應用廣泛,並且同樣提供記錄級(行級)的處理能力,延遲也在毫秒級。目前已支持SQL on Stream。

Spark Streaming基於Apache Spark,且非常便於與SQL處理和圖計算等集成,功能強大,對於熟悉其它Spark應用開發的用戶而言使用門檻非常低。。。。。。。。

另外,目前主流的Hadoop發行版,如Cloudera,Hortonworks和MapR,都集成了Spark和Storm,使得部署與運維這些系統非常方便。。。。。。。

既然Apache Storm與Apache Spark擁用如此多優點,那爲何還需要Kafka Stream呢?筆者認爲主要有如下原因。。。。。。。

第一,Spark和Storm都是流式處理框架,而Kafka Stream提供的是一個基於Kafka的流式處理類庫。框架要求開發者按照指定的方式去開發邏輯部分,並按照指定的方法部署。開發者很難瞭解框架的內部處理方式,從而使得調試和運維成本較高,且使用受限。而Kafka Stream作爲流式處理類庫,直接提供具體的類給開發者調用,整個應用的運行和部署方式完成由開發者決定,極大地方便了使用和調試。應用程序與類庫及框架的關係如下圖所示。

Kafka Stream 分佈式流式處理Kafka設計解析七

第二,主流的分佈式流式處理系統,基本都支持以Kafka作爲其數據源。例如Spark提供專門的spark-streaming-kafka模塊,而Storm也具有專門的kafka-spout。事實上,Kafka可以說是當前業界主流的分佈式流式處理系統的標準數據源,大部分典型的流式系統中都已部署了Kafka,此時使用Kafka Stream的使用和維護成本非常低。。。。。。。

第三,雖然Hortonworks與Cloudera方便了Spark和Storm的部署,但這些框架的部署和運維仍相對複雜。相反,Kafka Stream作爲類庫,可以非常方便地被嵌入到已有的應用程序中,它對應用的打包方式及部署方式基本上沒有任何要求。。。。。。

第四,由於Kafka本身提供數據持久化,因此Kafka Stream具有在線滾動升級和滾動部署及重新計算的能力。。

第五,Kafka Stream充分利用了Consumer的Rebalance機制和Kafka的分區機制,使得Kafka Stream可以非常方便地進行水平擴展。具體來說,每個運行Kafka Stream的應用實例都包含了一個Kafka Consumer實例,多個同一應用的不同實例間並行處理目標數據集。而不同實例之間的部署方式並不必完全一致,比如部分實例運行在Web容器中,部分實例可以運行在Docker或Kubernetes等虛擬化容器中。

第六,Kafka具有Consumer Rebalance機制,因此可在線動態調整並行度而不需要重啓。。。。。。。。

第六,使用Spark Streaming或Storm時,需要爲框架本身的進程預留資源,如Spark on YARN的node manager和Storm的supervisor。對應用程序,框架本身也會佔用部分資源,如Spark Streaming需要爲shuffle和storage預留內存。。。。。。。

Kafka Stream整體架構

Kafka Stream的整體架構圖如下。

Kafka Stream 分佈式流式處理Kafka設計解析七

目前(Kafka 0.11.0.0)Kafka Stream的數據源只能是Kafka(如上圖所示)。但是處理結果並不一定要如上圖所示輸出到Kafka。實際上GlobalKTable和KTable及KStream的實例化都須指定Topic(如下所示)。

Kafka Stream 分佈式流式處理Kafka設計解析七

另外,上圖中的Producer和Consumer不需由開發者在Kafka Stream應用中顯示地實例化,而是由Kafka Stream根據參數隱式實例化,從而降低了使用Kafka的門檻。開發者只需專注於開發核心業務邏輯,也即上圖中Task內的部分。

Processor Topology

基於Kafka Stream的流式應用的業務邏輯全部由一個被稱爲Processor Topology的組件執行。它與Spark Streaming的DAG和Storm的Topology類似,都定義了數據在各個處理單元(在Kafka Stream中被稱作Processor)間的流動方式,也即定義了數據的處理邏輯。。。。。。。。

下面是一個Processor的示例,該Processor實現了Word Count功能,且每秒輸出一次計算結果。

Kafka Stream 分佈式流式處理Kafka設計解析七

由上述代碼可見

  • context.getStateStore提供的狀態存儲爲有狀態計算(如聚合操作,窗口操作)提供了可能性
  • process定義了對每條記錄的處理邏輯,也印證了Kafka具有記錄級的數據處理能力
  • context.scheduler定義了punctuate被執行的週期,從而提供了實現窗口操作的能力

Kafka Stream並行模型

Kafka Stream的並行模型中,最小粒度爲Task,每個Task包含一個特定子Topology的所有Processor。因此每個Task所執行的代碼完全一樣,唯一的不同在於所處理的數據集互補。

這一點跟Storm的Topology完全不一樣。Storm的Topology的每一個Task只包含一個Spout或Bolt的實例。因此Storm的一個Topology內的不同Task之間需要通過網絡通信傳遞數據,而Kafka Stream的Task包含了完整的子Topology,所以Task之間不需要傳遞數據,也就不需要網絡通信。這一點降低了系統複雜度,也提高了處理效率。

如果某個Stream的輸入Topic有多個(比如2個Topic,1個Partition數爲5,另一個Partition數爲6),則總的Task數等於Partition數最多的那個Topic的Partition數(max(5,6)=6)。這是因爲Kafka Stream使用了Consumer的Rebalance機制,每個Partition對應一個Task。

下圖展示了在一個進程(Instance)中以2個Topic(Partition數均爲4)爲數據源的Kafka Stream應用的並行模型。從圖中可以看到,由於Kafka Stream應用的默認線程數爲1,所以4個Task全部在一個線程中運行。

Kafka Stream 分佈式流式處理Kafka設計解析七

爲了充分利用多線程的並行處理優勢,Kafka Stream應用程序可設置線程數(默認爲1)。下圖展示了線程數爲2時的並行模型。

Kafka Stream 分佈式流式處理Kafka設計解析七

從上圖可見,每個線程分別負責執行兩個Task。

前文有提到,Kafka Stream可被嵌入到任意Java應用(理論上基於JVM的應用都可以)中。下圖展示了在同一臺機器的不同進程中同時啓動同一個Kafka Stream應用時的並行模型。。。

Kafka Stream 分佈式流式處理Kafka設計解析七

。。。。。。

注意,這裏要保證兩個進程的StreamsConfig.APPLICATION_ID_CONFIG完全一樣。因爲Kafka Stream將APPLICATION_ID_CONFIG作爲隱式啓動的Consumer的Group ID。只有保證APPLICATION_ID_CONFIG相同,才能保證這兩個進程的Consumer屬於同一個Group,從而可以通過Consumer Rebalance機制拿到互補的數據集。

既然實現了多進程部署,可以以同樣的方式實現多機器部署。該部署方式也要求所有進程的APPLICATION_ID_CONFIG完全一樣。從圖上也可以看到,每個實例中的線程數並不要求一樣。但是無論如何部署,Task總數總會保證一致。

Kafka Stream 分佈式流式處理Kafka設計解析七

注意:Kafka Stream的並行模型,非常依賴於《Kafka設計解析(四)- Kafka Consumer設計解析》一文中介紹的Consumer的Rebalance機制和《Kafka設計解析(一)- Kafka背景及架構介紹》一文中介紹的Kafka分區機制。強烈建議不太熟悉這兩種機制的朋友,先行閱讀這兩篇文章。

這裏對比一下Storm的Topology和Kafka Stream的Processor Topology。

  • Storm的Topology由Spout和Bolt組成,Spout提供數據源,而Bolt提供計算和數據導出。Kafka Stream的Processor Topology完全由Processor組成,因爲它的數據固定由Kafka的Consumer從Kafka的一個或多個Topic中獲取
  • Storm的Topology可以同時包含Shuffle部分和非Shuffle部分,並且往往一個Topology就是一個完整的應用。而Kafka Stream的一個物理Topology只包含非Shuffle部分,而Shuffle部分需要通過through操作顯示完成,該操作將一個大的Topology分成了2個子Topology
  • Storm的不同Bolt運行在不同的Executor中,很可能位於不同的機器,需要通過網絡通信傳輸數據。而Kafka Stream的Processor Topology的不同Processor完全運行於同一個Task中,也就完全處於同一個線程,無需網絡通信
  • Storm的一個Task只包含一個Spout或者Bolt的實例,而Kafka Stream的一個Task包含了一個子Topology的所有Processor
  • Storm的Topology內,不同Bolt/Spout的並行度可以不一樣,而Kafka Stream的子Topology內,所有Processor的並行度完全一樣
  • Storm如果要修改某個Spout或Bolt的並行度,需要重啓Topology。而Kafka Stream可利用Consumer Rebalance機制非常方便地在線動態調整並行度

State store

流式處理中,部分操作是無狀態的,例如過濾操作(Kafka Stream DSL中用filer方法實現)。而部分操作是有狀態的,需要記錄中間狀態,如Window操作和聚合計算。State store被用來存儲中間狀態。它可以是一個持久化的Key-Value存儲,也可以是內存中的HashMap,或者是數據庫。Kafka提供了基於Topic的狀態存儲。

Topic中存儲的數據記錄本身是Key-Value形式的,同時Kafka的log compaction機制可對歷史數據做compact操作,保留每個Key對應的最後一個Value,從而在保證Key不丟失的前提下,減少總數據量,從而提高查詢效率。

構造KTable時,需要指定其state store name。默認情況下,該名字也即用於存儲該KTable的狀態的Topic的名字,遍歷KTable的過程,實際就是遍歷它對應的state store,或者說遍歷Topic的所有key,並取每個Key最新值的過程。爲了使得該過程更加高效,默認情況下會對該Topic進行compact操作。

另外,除了KTable,所有狀態計算,都需要指定state store name,從而記錄中間狀態。

KStream vs. KTable

KTable和KStream是Kafka Stream中非常重要的兩個概念,它們是Kafka實現各種語義的基礎。因此這裏有必要分析下二者的區別。

KStream是一個數據流,可以認爲所有記錄都通過Insert only的方式插入進這個數據流裏。而KTable代表一個完整的數據集,可以理解爲數據庫中的表。由於每條記錄都是Key-Value對,這裏可以將Key理解爲數據庫中的Primary Key,而Value可以理解爲一行記錄。可以認爲KTable中的數據都是通過Update only的方式進入的。也就意味着,如果KTable對應的Topic中新進入的數據的Key已經存在,那麼從KTable只會取出同一Key對應的最後一條數據,相當於新的數據更新了舊的數據。

以下圖爲例,假設有一個KStream和KTable,基於同一個Topic創建,並且該Topic中包含如下圖所示5條數據。此時遍歷KStream將得到與Topic內數據完全一樣的所有5條數據,且順序不變。而此時遍歷KTable時,因爲這5條記錄中有3個不同的Key,所以將得到3條記錄,每個Key對應最新的值,並且這三條數據之間的順序與原來在Topic中的順序保持一致。這一點與Kafka的日誌compact相同。

Kafka Stream 分佈式流式處理Kafka設計解析七

此時如果對該KStream和KTable分別基於key做Group,對Value進行Sum,得到的結果將會不同。對Ktable的計算結果是。而對KStream的計算結果將是

時間

在流式數據處理中,時間是數據的一個非常重要的屬性。從Kafka 0.10開始,每條記錄除了Key和Value外,還增加了timestamp屬性。目前Kafka Stream支持三種時間

  • 事件發生時間。事件發生的時間,包含在數據記錄中。發生時間由Producer在構造ProducerRecord時指定。並且需要Broker或者Topic將message.timestamp.type設置爲CreateTime(默認值)才能生效。
  • 消息接收時間,也即消息存入Broker的時間。當Broker或Topic將message.timestamp.type設置爲LogAppendTime時生效。此時Broker會在接收到消息後,存入磁盤前,將其timestamp屬性值設置爲當前機器時間。一般消息接收時間比較接近於事件發生時間,部分場景下可代替事件發生時間。
  • 消息處理時間,也即Kafka Stream處理消息時的時間。

注:Kafka Stream允許通過實現org.apache.kafka.streams.processor.TimestampExtractor接口自定義記錄時間。

窗口

前文提到,流式數據是在時間上無界的數據。而聚合操作只能作用在特定的數據集,也即有界的數據集上。因此需要通過某種方式從無界的數據集上按特定的語義選取出有界的數據。窗口是一種非常常用的設定計算邊界的方式。不同的流式處理系統支持的窗口類似,但不盡相同。

Kafka Stream支持的窗口如下。

  1. Hopping Time Window 該窗口定義如下圖所示。它有兩個屬性,一個是Advance interval,一個是Window size。Advance interval定義輸出的時間間隔,而Window size指定了窗口的大小,也即每次計算的數據集的大小。而。一個典型的應用場景是,每隔5秒鐘輸出一次過去1個小時內網站的PV或者UV。。。。。。。。
Kafka Stream 分佈式流式處理Kafka設計解析七

  1. Tumbling Time Window 該窗口定義如下圖所示。可以認爲它是Hopping Time Window的一種特例,也即Window size和Advance interval相等。它的特點是各個Window之間完全不相交。
Kafka Stream 分佈式流式處理Kafka設計解析七

  1. Session Window 該窗口用於對Key做Group後的聚合操作中。它需要對Key做分組,然後對組內的數據根據業務需求定義一個窗口的起始點和結束點。一個典型的案例是,希望通過Session Window計算某個用戶訪問網站的時間。對於一個特定的用戶(用Key表示)而言,當發生登錄操作時,該用戶(Key)的窗口即開始,當發生退出操作或者超時時,該用戶(Key)的窗口即結束。窗口結束時,可計算該用戶的訪問時間或者點擊次數等。
  2. Sliding Window 該窗口只用於2個KStream進行Join計算時。該窗口的大小定義了Join兩側KStream的數據記錄被認爲在同一個窗口的最大時間差。假設該窗口的大小爲5秒,則參與Join的2個KStream中,記錄時間差小於5的記錄被認爲在同一個窗口中,可以進行Join計算。

Join

Kafka Stream由於包含Ktable和KStream兩種數據集,因此提供如下Join計算

  • KTable Join KTable 結果仍爲KTable。任意一邊有更新,結果KTable都會更新。
  • KStream Join KStream 結果爲KStream。必須帶窗口操作,否則會造成Join操作一直不結束。
  • KStream Join KTable / GlobalKTable 結果爲KStream。只有當KStream中有新數據時,纔會觸發Join計算並輸出結果。KStream無新數據時,KTable的更新並不會觸發Join計算,也不會輸出數據。並且該更新只對下次Join生效。一個典型的使用場景是,KStream中的訂單信息與KTable中的用戶信息做關聯計算。

對於Join操作,如果要得到正確的計算結果,需要保證參與Join的KTable或KStream中Key相同的數據被分配到同一個Task。具體方法是

  • 參與Join的KTable或KStream對應的Topic的Partition數相同
  • 參與Join的KTable或KStream的Key類型相同(實際上,業務含意也應該相同)
  • Partitioner策略的最終結果等效(實現不需要完全一樣,但效果必須一致),也即Key相同的情況下,被分配到ID相同的Partition內

如果上述條件不滿足,可通過調用如下方法使得它滿足上述條件。

KStream through(Serde keySerde, Serde valSerde, StreamPartitioner partitioner, String topic)

通過through方法,進行Join操作的過程如下圖所示

Kafka Stream 分佈式流式處理Kafka設計解析七

從上圖可以看出,爲了滿足Join條件,需要通過through等方法對參與Join的某一方進行重新分區,相當於Storm的Field Grouping和Spark的Shuffle。

爲了提高Join的效率,0.10.2.0中引入了GlobalKTable。當KStream與一個GlobalKTable Join時,GlobalKTable的所有數據會被複制到所有Kafka Stream應用實例,因此KStream可在Task內直接與其所在實例中的GlobalKTable副本進行Join,不需要通過through等方法進行重新分區,極大提高了Join時的效率。與GlobalKTable的Join過程如下圖所示。

Kafka Stream 分佈式流式處理Kafka設計解析七

一個典型的適用場景是,在類數據倉庫的應用中,將包含大量增量數據的Topic通過KStream引用,而將包含少量,可能更新的數據,置於GlobalKTable中。充分利用GlobalKTable的數據複製特性,降低Join開銷,提高性能。

但需要注意的是,GlobalKTable需要將所有數據複製到每一個實例,因此必須保證實例內存至少足夠保存該GlobalKTable內全部數據。

聚合與亂序處理

聚合操作可應用於KStream和KTable。當聚合發生在KStream上時必須指定窗口,從而限定計算的目標數據集。

需要說明的是,聚合操作的結果肯定是KTable。因爲KTable是可更新的,可以在晚到的數據到來時(也即發生數據亂序時)更新結果KTable。

這裏舉例說明。假設對KStream以5秒爲窗口大小,進行Tumbling Time Window上的Count操作。並且KStream先後出現時間爲1秒, 3秒, 5秒的數據,此時5秒的窗口已達上限,Kafka Stream關閉該窗口,觸發Count操作並將結果3輸出到KTable中(假設該結果表示爲<1-5,3>)。若1秒後,又收到了時間爲2秒的記錄,由於1-5秒的窗口已關閉,若直接拋棄該數據,則可認爲之前的結果<1-5,3>不準確。而如果直接將完整的結果<1-5,4>輸出到KStream中,則KStream中將會包含該窗口的2條記錄,<1-5,3>, <1-5,4>,也會存在骯數據。因此Kafka Stream選擇將聚合結果存於KTable中,此時新的結果<1-5,4>會替代舊的結果<1-5,3>。用戶可得到完整的正確的結果。

這種方式保證了數據準確性,同時也提高了容錯性。

但需要說明的是,Kafka Stream並不會對所有晚到的數據都重新計算並更新結果集,而是讓用戶設置一個retention period,將每個窗口的結果集在內存中保留一定時間,該窗口內的數據晚到時,直接合並計算,並更新結果KTable。超過retention period後,該窗口結果將從內存中刪除,並且晚到的數據即使落入窗口,也會被直接丟棄。

容錯

Kafka Stream從如下幾個方面進行容錯

  • 高可用的Partition保證無數據丟失。每個Task計算一個Partition,而Kafka數據複製機制保證了Partition內數據的高可用性,故無數據丟失風險。同時由於數據是持久化的,即使任務失敗,依然可以重新計算。
  • 狀態存儲實現快速故障恢復和從故障點繼續處理。對於Join和聚合及窗口等有狀態計算,狀態存儲可保存中間狀態。即使發生Failover或Consumer Rebalance,仍然可以通過狀態存儲恢復中間狀態,從而可以繼續從Failover或Consumer Rebalance前的點繼續計算。
  • KTable與retention period提供了對亂序數據的處理能力。

Kafka Stream應用示例

下面結合一個案例來講解如何開發Kafka Stream應用。本例完整代碼可從作者Github獲取。https://github.com/habren/KafkaExample

訂單KStream(名爲orderStream),底層Topic的Partition數爲3,Key爲用戶名,Value包含用戶名,商品名,訂單時間,數量。用戶KTable(名爲userTable),底層Topic的Partition數爲3,Key爲用戶名,Value包含性別,地址和年齡。商品KTable(名爲itemTable),底層Topic的Partition數爲6,Key爲商品名,價格,種類和產地。現在希望計算每小時購買產地與自己所在地相同的用戶總數。

首先由於希望使用訂單時間,而它包含在orderStream的Value中,需要通過提供一個實現TimestampExtractor接口的類從orderStream對應的Topic中抽取出訂單時間。

Kafka Stream 分佈式流式處理Kafka設計解析七

接着通過將orderStream與userTable進行Join,來獲取訂單用戶所在地。由於二者對應的Topic的Partition數相同,且Key都爲用戶名,再假設Producer往這兩個Topic寫數據時所用的Partitioner實現相同,則此時上文所述Join條件滿足,可直接進行Join。

Kafka Stream 分佈式流式處理Kafka設計解析七

從上述代碼中,可以看到,Join時需要指定如何從參與Join雙方的記錄生成結果記錄的Value。Key不需要指定,因爲結果記錄的Key與Join Key相同,故無須指定。Join結果存於名爲orderUserStream的KStream中。

接下來需要將orderUserStream與itemTable進行Join,從而獲取商品產地。此時orderUserStream的Key仍爲用戶名,而itemTable對應的Topic的Key爲產品名,並且二者的Partition數不一樣,因此無法直接Join。此時需要通過through方法,對其中一方或雙方進行重新分區,使得二者滿足Join條件。這一過程相當於Spark的Shuffle過程和Storm的FieldGrouping。

Kafka Stream 分佈式流式處理Kafka設計解析七

從上述代碼可見,through時需要指定Key的序列化器,Value的序列化器,以及分區方式和結果集所在的Topic。這裏要注意,該Topic(orderuser-repartition-by-item)的Partition數必須與itemTable對應Topic的Partition數相同,並且through使用的分區方法必須與iteamTable對應Topic的分區方式一樣。經過這種through操作,orderUserStream與itemTable滿足了Join條件,可直接進行Join。

總結

  • Kafka Stream的並行模型完全基於Kafka的分區機制和Rebalance機制,實現了在線動態調整並行度
  • 同一Task包含了一個子Topology的所有Processor,使得所有處理邏輯都在同一線程內完成,避免了不必的網絡通信開銷,從而提高了效率。
  • through方法提供了類似Spark的Shuffle機制,爲使用不同分區策略的數據提供了Join的可能
  • log compact提高了基於Kafka的state store的加載效率
  • state store爲狀態計算提供了可能
  • 基於offset的計算進度管理以及基於state store的中間狀態管理爲發生Consumer rebalance或Failover時從斷點處繼續處理提供了可能,併爲系統容錯性提供了保障
  • KTable的引入,使得聚合計算擁用了處理亂序問題的能力

(配套視頻資料獲取轉發關注私信小編“學習”)

Kafka Stream 分佈式流式處理Kafka設計解析七
相關文章