傳統的資料庫SQL和實時SQL處理的差別還是很大的,這裡簡單列出一些區別:

儘管存在這些差異,但使用關係查詢和SQL處理流並非不可能。高級關係資料庫系統提供稱為物化視圖的功能。物化視圖定義為SQL查詢,就像常規虛擬視圖一樣。與虛擬視圖相比,物化視圖緩存查詢的結果,使得在訪問視圖時不需要執行查詢。緩存的一個常見挑戰是避免緩存提供過時的結果。物化視圖在修改其定義查詢的基表時會過時。Eager View Maintenance是一種在更新基表後立即更新實例化視圖的技術。

如果我們考慮以下內容,Eager View Maintenance和流上的SQL查詢之間的聯繫就變得很明顯:

  • 資料庫表是INSERT,UPDATE和DELETEDML語句流的結果,通常被稱為更新日誌流。
  • 物化視圖定義為SQL查詢。為了更新視圖,查詢需要持續處理視圖源表的更改日誌流。
  • 物化視圖是流式SQL查詢的結果。

有了上面的基礎,下面可以介紹一下動態表的概念了。

動態表和持續不斷查詢

動態表flink table api和SQL處理流數據的核心概念。與靜態表相比,動態表隨時間而變化,但可以像靜態表一樣查詢動態表,只不過查詢動態表需要產生連續查詢。連續查詢永遠不會終止,會生成動態表作為結果表。查詢不斷更新其(動態)結果表以反映其(動態)輸入表的更改。最終,動態表上的連續查詢與定義物化視圖的查詢非常相似。

值得注意的是,連續查詢的結果始終在語義上等同於在輸入表的快照上執行批處理的到的相同查詢結果。

下圖顯示了流,動態表和連續查詢的關係:

  1. 數據流被轉化為動態表
  2. 在產生的動態表上執行連續不斷的查詢,產生一個動態結果表。
  3. 結果動態表再次被轉化為數據流。

注意:動態表最重要的是邏輯概念。在查詢執行期間,動態表不一定(完全)物化。

在下文中,會以schema如下的點擊事件流來解釋動態表和連續不斷的查詢。

[
user: VARCHAR, // the name of the user
cTime: TIMESTAMP, // the time when the URL was accessed
url: VARCHAR // the URL that was accessed by the user
]

stream轉化成表

當然,想要用經典的sql去分析流數據,肯定要先將其轉化為表。從概念上講,流的每個新增記錄都被解釋為對結果表的Insert操作。最終,可以理解為是在從一個INSERT-only changelog流上構建一個表。

下圖顯示了click事件流(左側)如何轉換為表(右側)。隨著更多點擊流記錄的插入,生成的表不斷增長。

注意:stream轉化的表內部並沒有被物化。

連續查詢

在動態表上執行連續查詢,並生成新的動態表作為結果表。與批處理查詢不同,連續查詢絕不會終止,而且會根據輸入表的更新來更新它的結果表。在任何時間點,連續查詢的結果在語義上等同於在輸入表的快照上以批處理模式得到的查詢的結果。

在下文中,我們將在用點擊事件流定義的clicks表上展示兩個示例查詢。

第一個查詢是一個簡單的GROUP-BY COUNT聚合查詢。主要是對clicks表按照user分組,然後統計url得到訪問次數。下圖展示了clicks表在數據增加期間查詢是如何執行的。

假設當查詢啟動的事以後,clicks表為空。當第一行數據插入clicks表的時候,查詢開始計算產生結果表。當[Mary, ./home]插入的時候,查詢會在結果表上產生一行[Mary, 1]。當[Bob, ./cart]插入clicks表之後,查詢會再次更新結果表,增加一行[Bob, 1]。當第三行,[Mary, ./prod?id=1]插入clicks表後,查詢會更新結果表的[Mary, 1]為[Mary, 2]。最後,第四行數據插入clicks後,查詢會給結果表增加一行[Liz, 1].

第二個查詢僅僅是在上個查詢的基礎上增加了一個1小時的滾動窗口。下圖展示了整個流水過程。

這個就類似批處理了,每個小時產生一次計算結果然後更新結果表。cTime的時間範圍在12:00:00 ~12:59:59的時候總共有四行數據,查詢計算出了兩行結果,並將其追加到結果表。Ctime窗口在13:00:00 and 13:59:59的時候,總共有三行數據,查詢再次產生兩行結果追加到結果表。隨著時間的推移,click數據會被追加到clicks表,結果表也會不斷有新的結果產生。

Update 和 append 查詢

儘管兩個示例查詢看起來非常相似(都計算了分組計數聚合),但是內部邏輯還是區別較大:

  • 第一個查詢更新以前發出的結果,即結果表的更改日誌流包含INSERT和UPDATE更改。
  • 第二個查詢僅append到結果表,即結果表的更改日誌流僅包含INSERT更改。

查詢是生成僅append表還是update表有一些區別:

  • 產生update變化的查詢通常必須維護更多狀態。
  • 將僅append錶轉換為流與將update表的轉換為流,方式不同。

查詢限制

並不是所有的查詢都能以流查詢的格式執行的。因為有些查詢計算起來成本比較高,要麼就是要維護的狀態比較大,要麼就是計算更新成本高。

狀態大小:連續查詢在無界流上執行,通常應該運行數周或數月,甚至7*24小時。因此,連續查詢處理的數據總量可能非常大。為了更新先前生成的結果,可能需要維護所有輸出的行。例如,第一個示例查詢需要存儲每個用戶的URL計數,以便能夠增加計數,並在輸入表收到新行時發出新結果。如果僅統計註冊用戶,則要維護的計數可能不會太高。但是,如果未註冊的用戶分配了唯一的用戶名,則要維護的計數數將隨著時間的推移而增長,最終可能導致查詢失敗。

SELECT user, COUNT(url)
FROM clicks
GROUP BY user;

計算更新:有時即使只添加或更新了單個輸入記錄,某些查詢也需要重新計算和更新大部分發出的結果行。顯然,這樣的查詢不適合作為連續查詢執行。下面sql是一個示例查詢,該查詢基於最後一次點擊的時間為每個用戶計算RANK 。一旦clicks表接收到新增行,用戶的lastAction就會更新,並且必須計算新的排名。但是,由於兩行不能具有相同的排名,因此所有排名較低的行也需要更新。

SELECT user, RANK() OVER (ORDER BY lastLogin)
FROM (
SELECT user, MAX(cTime) AS lastAction FROM clicks GROUP BY user
);

錶轉化為流

可以像傳統資料庫表一樣使用INSERT, UPDATE, 和DELETE修改動態表。當將動態錶轉化為stream或者寫入外部系統的時候,需要對修改進行編碼。Flink的Table API和SQL支持三種方式來編碼動態表的變化。

Append-only stream:假如動態表的更改操作僅僅是insert ,那麼變為stream就僅僅需要將插入的行發送出去即可。

Retract stream: retract(回撤)流是包含兩種類型的消息的流,增加消息和回撤消息。通過將INSERT編碼為增加消息,DELETE編碼為回撤消息,將UPDATE編碼為對先前行的回撤消息和對新增行的增加消息,來完成將動態錶轉換為收迴流。下圖顯示了動態表到回收流的轉換。

Upsert流: upsert流是一種包含兩種消息,upsert消息和刪除消息的流。轉換為upsert流的動態表需要唯一鍵。具有唯一鍵的動態表通過將INSERT和UPDATE編碼為upsert消息,DELETE編碼為刪除消息來完成動態錶轉化為流。流算符需要知道唯一鍵屬性才能正確處理消息。與回撤流的主要區別在於,UPDATE使用單個消息對update進行編碼,因此更有效。下圖顯示了動態表到upsert流的轉換。


推薦閱讀:
相關文章