正如在之前的那篇文章中 Spark Streaming 設計原理 中說到 Spark 團隊之後對 Spark Streaming 的維護可能越來越少,Spark 2.4 版本的 Release Note 裡面果然一個 Spark Streaming 相關的 ticket 都沒有。相比之下,Structured Streaming 有將近十個 ticket 說明。所以各位同學,是時候捨棄 Spark Streaming 轉向 Structured Streaming 了,當然理由並不止於此。我們這篇文章就來分析一下 Spark Streaming 的不足,以及Structured Streaming 的設計初衷和思想是怎麼樣的。文章主要參考今年(2018 年)sigmod 上面的這篇論文:Structured Streaming: A Declarative API for Real-Time Applications in Apache Spark

首先可以注意到的了論文標題中的 Declarative API,中文一般叫做聲明式編程 API。一般直接看到這個詞可能不知道什麼意思,但是當我們列出他的對立面:Imperative API,中文一般叫命令式編程 API,彷彿一切都明了了。是的,沒錯,Declarative 只是表達出我們想要什麼,而 Imperative 則是說為了得到什麼我們需要做哪些東西一個個說明。舉個例子,我們要一個糕點,去糕點店直接去定做告訴店員我們要什麼樣式的糕點,然後店員去給我們做出來,這就是 Declarative。而 Imperative 對應的就是麵粉店了。

0. Spark Streaming 不足

在開始正式介紹 Structured Streaming 之前有一個問題還需要說清楚,就是 Spark Streaming 存在哪些不足?總結一下主要有下面幾點:

使用 Processing Time 而不是 Event Time。首先解釋一下,Processing Time 是數據到達 Spark 被處理的時間,而 Event Time 是數據自帶的屬性,一般表示數據產生於數據源的時間。比如 IoT 中,感測器在 12:00:00 產生一條數據,然後在 12:00:05 數據傳送到 Spark,那麼 Event Time 就是 12:00:00,而 Processing Time 就是 12:00:05。我們知道 Spark Streaming 是基於 DStream 模型的 micro-batch 模式,簡單來說就是將一個微小時間段,比如說 1s,的流數據當前批數據來處理。如果我們要統計某個時間段的一些數據統計,毫無疑問應該使用 Event Time,但是因為 Spark Streaming 的數據切割是基於 Processing Time,這樣就導致使用 Event Time 特別的困難。

Complex, low-level api。這點比較好理解,DStream (Spark Streaming 的數據模型)提供的 API 類似 RDD 的 API 的,非常的 low level。當我們編寫 Spark Streaming 程序的時候,本質上就是要去構造 RDD 的 DAG 執行圖,然後通過 Spark Engine 運行。這樣導致一個問題是,DAG 可能會因為開發者的水平參差不齊而導致執行效率上的天壤之別。這樣導致開發者的體驗非常不好,也是任何一個基礎框架不想看到的(基礎框架的口號一般都是:你們專註於自己的業務邏輯就好,其他的交給我)。這也是很多基礎系統強調 Declarative 的一個原因。

reason about end-to-end application。這裡的 end-to-end 指的是直接 input 到 out,比如 Kafka 接入 Spark Streaming 然後再導出到 HDFS 中。DStream 只能保證自己的一致性語義是 exactly-once 的,而 input 接入 Spark Streaming 和 Spark Straming 輸出到外部存儲的語義往往需要用戶自己來保證。而這個語義保證寫起來也是非常有挑戰性,比如為了保證 output 的語義是 exactly-once 語義需要 output 的存儲系統具有冪等的特性,或者支持事務性寫入,這個對於開發者來說都不是一件容易的事情。

批流代碼不統一。儘管批流本是兩套系統,但是這兩套系統統一起來確實很有必要,我們有時候確實需要將我們的流處理邏輯運行到批數據上面。關於這一點,最早在 2014 年 Google 提出 Dataflow 計算服務的時候就批判了 streaming/batch 這種叫法,而是提出了 unbounded/bounded data 的說法。DStream 儘管是對 RDD 的封裝,但是我們要將 DStream 代碼完全轉換成 RDD 還是有一點工作量的,更何況現在 Spark 的批處理都用 DataSet/DataFrame API 了。

1. Structured Streaming 介紹

Structured Streaming 在 Spark 2.0 版本於 2016 年引入,設計思想參考很多其他系統的思想,比如區分 processing time 和 event time,使用 relational 執行引擎提高性能等。同時也考慮了和 Spark 其他組件更好的集成。Structured Streaming 和其他系統的顯著區別主要如下:

  • Incremental query model: Structured Streaming 將會在新增的流式數據上不斷執行增量查詢,同時代碼的寫法和批處理 API (基於 Dataframe 和 Dataset API)完全一樣,而且這些 API 非常的簡單。
  • Support for end-to-end application: Structured Streaming 和內置的 connector 使的 end-to-end 程序寫起來非常的簡單,而且 "correct by default"。數據源和 sink 滿足 "exactly-once" 語義,這樣我們就可以在此基礎上更好地和外部系統集成。
  • 復用 Spark SQL 執行引擎:我們知道 Spark SQL 執行引擎做了非常多的優化工作,比如執行計劃優化、codegen、內存管理等。這也是 Structured Streaming 取得高性能和高吞吐的一個原因。

2. Structured Streaming 核心設計

下面我們看一下 Structured Streaming 的核心設計。

  • Input and Output: Structured Streaming 內置了很多 connector 來保證 input 數據源和 output sink 保證 exactly-once 語義。而實現 exactly-once 語義的前提是:
    • Input 數據源必須是可以 replay 的,比如 Kafka,這樣節點 crash 的時候就可以重新讀取 input 數據。常見的數據源包括 Amazon Kinesis, Apache Kafka 和文件系統。
    • Output sink 必須要支持寫入是冪等的。這個很好理解,如果 output 不支持冪等寫入,那麼一致性語義就是 at-least-once 了。另外對於某些 sink, Structured Streaming 還提供了原子寫入來保證 exactly-once 語義。
  • API: Structured Streaming 代碼編寫完全復用 Spark SQL 的 batch API,也就是對一個或者多個 stream 或者 table 進行 query。query 的結果是 result table,可以以多種不同的模式(append, update, complete)輸出到外部存儲中。另外,Structured Streaming 還提供了一些 Streaming 處理特有的 API:Trigger, watermark, stateful operator。
  • Execution: 復用 Spark SQL 的執行引擎。Structured Streaming 默認使用類似 Spark Streaming 的 micro-batch 模式,有很多好處,比如動態負載均衡、再擴展、錯誤恢復以及 straggler (straggler 指的是哪些執行明顯慢於其他 task 的 task)重試。除了 micro-batch 模式,Structured Streaming 還提供了基於傳統的 long-running operator 的 continuous 處理模式。
  • Operational Features: 利用 wal 和狀態存儲,開發者可以做到集中形式的 rollback 和錯誤恢復。還有一些其他 Operational 上的 feature,這裡就不細說了。

3. Structured Streaming 編程模型

可能是受到 Google Dataflow 的批流統一的思想的影響,Structured Streaming 將流式數據當成一個不斷增長的 table,然後使用和批處理同一套 API,都是基於 DataSet/DataFrame 的。如下圖所示,通過將流式數據理解成一張不斷增長的表,從而就可以像操作批的靜態數據一樣來操作流數據了。

在這個模型中,主要存在下面幾個組成部分:

  • Input Unbounded Table: 流式數據的抽象表示
  • Query: 對 input table 的增量式查詢
  • Result Table: Query 產生的結果表
  • Output: Result Table 的輸出

下面舉一個具體的例子,NetworkWordCount,代碼如下:

// Create DataFrame representing the stream of input lines from connection to localhost:9999
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()

// Split the lines into words
val words = lines.as[String].flatMap(_.split(" "))

// Generate running word count
val wordCounts = words.groupBy("value").count()

// Start running the query that prints the running counts to the console
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()

代碼實際執行流程可以用下圖來表示。把流式數據當成一張不斷增長的 table,也就是圖中的 Unbounded table of all input。然後每秒 trigger 一次,在 trigger 的時候將 query 應用到 input table 中新增的數據上,有時候還需要和之前的靜態數據一起組合成結果。query 產生的結果成為 Result Table,我們可以選擇將 Result Table 輸出到外部存儲。輸出模式有三種:

  • Complete mode: Result Table 全量輸出
  • Append mode (default): 只有 Result Table 中新增的行才會被輸出,所謂新增是指自上一次 trigger 的時候。因為只是輸出新增的行,所以如果老數據有改動就不適合使用這種模式。
  • Update mode: 只要更新的 Row 都會被輸出,相當於 Append mode 的加強版。

和 batch 模式相比,streaming 模式還提供了一些特有的運算元操作,比如 window, watermark, statefaul oprator 等。

window,下圖是一個基於 event-time 統計 window 內事件的例子。

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
window("eventTime", "10 minutes", "5 minutes"),
$"word"
).count()

如下圖所示,窗口大小為 10 分鐘,每 5 分鐘 trigger 一次。在 12:11 時候收到了一條 12:04 的數據,也就是 late data (什麼叫 late data 呢?就是 Processing Time 比 Event Time 要晚),然後去更新其對應的 Result Table 的記錄。

watermark,是也為了處理 ,很多情況下對於這種 late data 的時效數據並沒有必要一直保留太久。比如說,數據晚了 10 分鐘或者還有點有,但是晚了 1 個小時就沒有用了,另外這樣設計還有一個好處就是中間狀態沒有必要維護那麼多。watermark 的形式化定義為 max(eventTime) - threshold,早於 watermark 的數據直接丟棄。

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words
.withWatermark("eventTime", "10 minutes")
.groupBy(
window("eventTime", "10 minutes", "5 minutes"),
$"word")
.count()

用下圖表示更加形象。在 12:15 trigger 時 watermark 為 12:14 - 10m = 12:04,所以 late date (12:08, dog; 12:13, owl) 都被接收了。在 12:20 trigger 時 watermark 為 12:21 - 10m = 12:11,所以 late data (12:04, donkey) 都丟棄了。

除此之後 Structured Streaming 還提供了用戶可以自定義狀態計算邏輯的運算元:

  • mapGroupsWithState
  • flatMapGroupsWithState

看名字大概也能看出來 mapGroupsWithState 是 one -> one,flatMapGroupsWithState 是 one -> multi。這兩個運算元的底層都是基於 Spark Streaming 的 updateStateByKey。

4. Continuous Processing Mode

好,終於要介紹到「真正」的流處理了,我之所以說「真正」是因為 continuous mode 是傳統的流處理模式,通過運行一個 long-running 的 operator 用來處理數據。之前 Spark 是基於 micro-batch 模式的,就被很多人詬病不是「真正的」流式處理。continuous mode 這種處理模式只要一有數據可用就會進行處理,如下圖所示。epoch 是 input 中數據被發送給 operator 處理的最小單位,在處理過程中,epoch 的 offset 會被記錄到 wal 中。另外 continuous 模式下的 snapshot 存儲使用的一致性演算法是 Chandy-Lamport 演算法。

這種模式相比與 micro-batch 模式缺點和優點都很明顯。

  • 缺點是不容易做擴展
  • 優點是延遲更低

關於為什麼延遲更低,下面兩幅圖可以做到一目了然。

5. 一致性語義

對於 Structured Streaming 來說,因為有兩種模式,所以我們分開討論。

micro-batch 模式可以提供 end-to-end 的 exactly-once 語義。原因是因為在 input 端和 output 端都做了很多工作來進行保證,比如 input 端 replayable + wal,output 端寫入冪等。

continuous mode 只能提供 at-least-once 語義。關於 continuous mode 的官方討論的實在太少,甚至只是提了一下。在和 @李呈祥 討論之後覺得應該還是 continuous mode 由於要儘可能保證低延遲,所以在 sink 端沒有做一致性保證。

6. Benchmark

Structured Streming 的官方論文裡面給出了 Yahoo! Streaming Benchmark 的結果,Structured Streaming 的 throughput 大概是 Flink 的 2 倍和 Kafka Streaming 的 90 多倍。

7. 總結

總結一下,Structured Streaming 通過提供一套 high-level 的 declarative api 使得流式計算的編寫相比 Spark Streaming 簡單容易不少,同時通過提供 end-to-end 的 exactly-once 語義使用 Structured Streaming 可以和其他系統更好的集成。性能方面,通過復用 Spark SQL Engine 來保證高性能。

上面的這些特定無論是出於哪個方面的考慮,對開發者都是有足夠的說服力轉向 Structured Streaming。

8. 閑扯

最後,閑扯一點別的。Spark 在多年前推出基於 micro-batch 模式的 Spark Streaming 必然是基於當時 Spark Engine 最快的方式,儘管不是「真正」的流處理,但是在吞吐量更重要的年代,還是嘗盡了甜頭。Spark 的真正基於 continuous 處理模式的 Structured Streaming 直到 Spark 2.3 版本才真正推出,而近兩年 Flink 在實時計算領域嘗盡了甜頭(當然和 Flink 的優秀的語義模型存在很大的關係)。在實時計算領域,由 Spark 的卓越核心 SQL Engine 助力的 Structured Streaming,還是風頭正勁的 Flink,亦或是其他流處理引擎,究竟誰將佔領統治地位,還是值得期待一下的。

Ps: 本人本周二在由阿里巴巴 EMR 團隊主導的 Apache Spark 社群釘釘群做了一場 《從 Spark Streaming 到 Structured Streaming》的直播,直播 ppt 可以在 slidetalk 網站上面查看,地址:slidestalk.com/s/FromSp

如果對 Spark 感興趣,也歡迎加入上面的釘釘群。

qr.dingtalk.com/action/ (二維碼自動識別)

9. Reference

  1. Zaharia M, Das T, Li H, et al. Discretized streams: Fault-tolerant streaming computation at scale[C]//Proceedings of the Twenty-Fourth ACM Symposium on Operating Systems Principles. ACM, 2013: 423-438.
  2. Akidau T, Bradshaw R, Chambers C, et al. The dataflow model: a practical approach to balancing correctness, latency, and cost in massive-scale, unbounded, out-of-order data processing[J]. Proceedings of the VLDB Endowment, 2015, 8(12): 1792-1803.
  3. Armbrust M, Das T, Torres J, et al. Structured Streaming: A Declarative API for Real-Time Applications in Apache Spark[C]//Proceedings of the 2018 International Conference on Management of Data. ACM, 2018: 601-613.
  4. The world beyond batch: Streaming 101
  5. The world beyond batch: Streaming 102
  6. Streaming Systems
  7. databricks.com/blog/201
  8. en.wikipedia.org/wiki/C
  9. A Deep Dive Into Structured Streaming: databricks.com/session/
  10. Continuous Applications: Evolving Streaming in Apache Spark 2.0: databricks.com/blog/201
  11. Spark Structured Streaming:A new high-level API for streaming: databricks.com/blog/201
  12. Event-time Aggregation and Watermarking in Apache Spark』s Structured Streaming: databricks.com/blog/201
  13. spark.apache.org/docs/l
  14. Benchmarking Structured Streaming on Databricks Runtime Against State-of-the-Art Streaming Systems: databricks.com/blog/201

推薦閱讀:

相关文章