背景

基於 Stream & Table relativity(參見前文),《Streaming Systems》將 declarative的編程方式往前推進到數據系統中最常用的SQL表達,即Streaming SQL。在《Streaming Systems》中,Streaming SQL 並不像 StreamCQL(基於Storm)屬於 SQL-like,而是作為 Classic SQL 的擴展,兼容 Classic SQL 的所有規則。

目前而言,Streaming SQL 還是一個正在不斷發展研究的的領域,還沒有一個框架實現了《Streaming Systems》書中提到的所有擴展特性;在開源框架中,Apache Calcite 也只是實現了一部分(Apache Flink集成了Apache Calcite,Apache Spark 在2.2後也實現了部分特性)。

因此下面的討論更多的是理論層面,而不是具體實現。

定義 Streaming SQL

關係型資料庫,或者是 SQL-on-Hadoop 的 SQL 實現都算是 Classic SQL;它的數學基礎是 relational algebra ,核心是 relation(a set of these tuples)。用傳統資料庫的辭彙表達就是一個 relation 就是一張 table、或者是資料庫的物理單元 table、或者是 SQL 查詢的 result、或者是 view 等等。relational algebra 的一個非常重要的性質就是 closure property:

applying any operator from the relational algebra to any valid relation always yields another relation.

許多嘗試在 streaming 中引入 SQL 特性的系統都沒有實現closure property(因此是 SQL-like );它們將 stream 和 SQL 視為不同的東西,並提供特殊的 operators 兩者中轉換。而這並不完美,使用者需要學習新的 operators,Streaming SQL 不應該這麼複雜。

因此《Streaming Systems》將時間的概念或者說是 stream 的概念視為 relational algebra 的第一等公民,使 SQL 也能應用到 streaming 中。從這個角度看待 relation,Classic Relation 是 point-in-time relations ,而新的 relation 是 time-varying relations,即TVR。準確來講,TVR 是evolution of a classic relation over time。

換句話說,Classic Relation 類似於二維的表結構,行代表著X軸,列代表著Y軸;time-varying relations在此基礎之上引入了Z軸,即時間軸,變成了三維表結構。隨著relation的變化,新的 relation snapshot 被不斷加入Z軸。

基於此,《Streaming Systems》通過一系列精彩的論證,證明了基於 TVR 的 Streaming SQL 可以全盤繼承 Classic SQL 的所有規則,這意味著:

classic sql的所有operators(例如Where,Having)都可以應用到 Streaming SQL 中,且結果一致。

closure property 依然成立。

一個偉大的突破!我們只需要將Classic SQL進行一個簡單的擴展就能應用到 Streaming 系統中去,從而免去了大部分接觸底層的編程。

再次審視Stream & Table Theory

《Streaming Systems》引入了STREAMTABLE兩個關鍵字用以區分 Stream 和 Table ,並認為 Table 是the point-in time relation snapshot ,而 Stream 是 capture the sequence of changes。這有點類似於業務處理過程中的 OLTP Table,OLTP Table 可以認為是一系列INSERTs、UPDATEs和DELETEs的操作隨著時間不斷發生變化的 TVR。

就像Stream & Table Theory提到的 Stream 和 Table 是一枚硬幣的兩面,單純從 Stream 或者是 Table 角度去看待這枚硬幣,都是不全面的。使用Kafka做類比的話,如果我們將數據的每一個變化發送到 Kafka ,Stream 關注的是這些數據的變化,而 Table 則是數據的變化過程中形成的一個結果;其實這兩者最終都是 Kafka 中記錄的 Log。

因此 Beam Model 從 Stream 角度去看待數據,Classic SQL Model 從 Table 的角度去看待數據,都有失偏頗。Beam Model的核心數據抽象 PCollection 是 Stream -> Stream ,Table 被隱性的忽略了。而 Classic SQL Model 在處理單純的 Table 是沒有問題的,但是遇上類似於下面SQL的情況,就有些問題了。

SELECT team, score
INTO TeamAndScore
FROM UserScores;

SELECT team, SUM(score) as total
INTO TeamTotals
FROM TeamAndScore
GROUP BY team

此時 Classic SQL Model 必須將 TeamAndScore 轉換成靜態的 Table 進行處理,而不是使用 Stream 增量地處理數據。

因此我們需要對 SQL 進行簡單地擴展,使其能良好地處理這兩種情況,而不是割裂開來。

理想中的Streaming SQL

前文僅僅是論證了Streaming SQL的可行性,而沒有談到具體的擴展特性,《Streaming Systeams》花了很大的篇幅去描述理想中的 Streaming SQL 是什麼樣的。建議讀者花時間去閱讀相應章節,下文只是一些簡單的概括和總結。

stream/table selection

可以使用STREAMTABLE關鍵字顯性區分 Stream 和 Table:

SElECT STREAM * FROM X;

SElECT TABLE * FROM X;

且應該默認下面情況:

  • 如果所有的輸入是 Table ,那麼輸出也是 Table 。
  • 如果所有的輸入是 Stream ,那麼輸出的也是 Stream 。

Temporal Operators

Temporal Operators 是為了提供時間推理工具以能夠處理事件時間,也就是回答Where、When、How三個問題。在SQL World,我們大可將事件時間作為表中的一列看待(這也是Spark 2.X的做法),同時在系統中引入 Sys.MTime 虛擬列作為數據的處理時間。

  • Where 問題對應的是 windowing,沿用 GROUP BY 語句即可。為了處理各種不同的window類型,可以像 Calcite 一樣在 Group By 後面使用內置的函數 SESSION 之類的。
  • When 問題對應的是 trigger 和 watermark 。默認的SQL處理方式是使用類似 materialized views 一樣的 per-record triggers,但是計算的過於頻繁,性能會有問題。因此可以使用 Watermark triggers(類似於 EMIT <when> 語句,等待輸入到達滿足某種完整性就進行計算)、Repeated delay triggers(每隔一定時間段就進行計算)、Data-driven triggers(滿足某個特別的數據要求時就進行計算),作為 trade-off 的一種選項。
  • How 問題對應的是 accumulation 。使用 Sys.Undo 和 Sys.Redo 虛擬列(類似於 Delete 和 Insert )來判定某一行數據是否刪除或者是新插入。

結尾

Streaming SQL 相對於 Classic SQL,它將時間作為第一等公民進行對待,使得 Streaming 領域也可以使用 SQL 進行處理數據;並且對於 SQL 如何翻譯成底層的運行模型,書中舉了具體的例子和給出了相應的代碼,為後續的開源框架指出了一種可能的實現方式。

PS:Beam模型和對應的Streaming SQL 實現確實很優秀;不過對於Apache Beam的發展,筆者並不看好,畢竟 Flink 和 Spark 在市場上已經佔據了這麼多份額,不可能甘心僅僅作為 Beam 的底層 runner,即使 Beam 有 Google 的背書。


參考文章:

  1. calcite.apache.org/docs
  2. 《Streaming Systems》第八章
  3. wso2.com/library/articl
  4. infoq.com/presentations

推薦閱讀:

相关文章