本篇核心目標是讓大家概要了解一個完整的 Apache Flink SQL Job 的組成部分,以及 Apache Flink SQL 所提供的核心運算元的語義,最後會應用 TumbleWindow 編寫一個 End-to-End 的頁面訪問的統計示例。

1.Apache Flink SQL Job 的組成

我們做任何數據計算都離不開讀取原始數據,計算邏輯和寫入計算結果數據三部分,當然基於 ApacheFlink SQL 編寫的計算 Job 也離不開這個三部分,如下所所示:

如上所示,一個完整的 Apache Flink SQL Job 由如下三部分:

  • Source Operator – Soruce operator 是對外部數據源的抽象, 目前 Apache Flink 內置了很多常用的數據源實現,比如上圖提到的 Kafka。
  • Query Operators – 查詢運算元主要完成如圖的 Query Logic,目前支持了Union,Join,Projection,Difference, Intersection 以及 window 等大多數傳統資料庫支持的操作。
  • Sink Operator – Sink operator 是對外結果表的抽象,目前 Apache Flink 也內置了很多常用的結果表的抽象,比如上圖提到的 Kafka。

大數據Flink討論羣

2.Apache Flink SQL 核心運算元

SQL 是 StructuredQuevy Language 的縮寫,最初是由美國計算機科學家 Donald D. Chamberlin 和 Raymond F. Boyce 在 20 世紀 70 年代早期從 Early History of SQL 中瞭解關係模型後在 IBM 開發的。該版本最初稱為[SEQUEL: A Structured EnglishQuery Language](結構化英語查詢語言),旨在操縱和檢索存儲在 IBM 原始準關係資料庫管理系統 System R 中的數據。直到 1986 年, ANSI 和 ISO 標準組正式採用了標準的」資料庫語言 SQL」語言定義。Apache Flink SQL 核心運算元的語義設計也參考了 1992 、2011 等 ANSI-SQL 標準。接下來我們將簡單為大家介紹 Apache Flink SQL 每一個運算元的語義。

2.1 SELECT

SELECT 用於從數據集/流中選擇數據,語法遵循 ANSI-SQL 標準,語義是關係代數中的投影(Projection),對關係進行垂直分割,消去某些列。

一個使用 Select 的語句如下:

SELECT ColA, ColC FROME tab ;

2.2 WHERE

WHERE 用於從數據集/流中過濾數據,與 SELECT 一起使用,語法遵循 ANSI-SQL 標準,語義是關係代數的 Selection,根據某些條件對關係做水平分割,即選擇符合條件的記錄,如下所示:

對應的 SQL 語句如下:

SELECT * FROM tab WHERE ColA <> a2 ;

2.3 GROUP BY

GROUP BY 是對數據進行分組的操作,比如我需要分別計算一下一個學生表裡面女生和男生的人數分別是多少,如下:

對應的 SQL 語句如下:

SELECT sex, COUNT(name) AS count FROM tab GROUP BY sex ;

2.4 UNION ALL

UNION ALL 將兩個表合併起來,要求兩個表的欄位完全一致,包括欄位類型、欄位順序,語義對應關係代數的 Union,只是關係代數是 Set 集合操作,會有去重複操作,UNION ALL 不進行去重,如下所示:

對應的 SQL 語句如下:

SELECT * FROM T1 UNION ALL SELECT * FROM T2

2.5 UNION

UNION 將兩個流給合併起來,要求兩個流的欄位完全一致,包括欄位類型、欄位順序,並其 UNION 不同於 UNION ALL,UNION 會對結果數據去重,與關係代數的 Union 語義一致,如下:

對應的 SQL 語句如下:

SELECT * FROM T1 UNION SELECT * FROM T2

2.6 JOIN

JOIN 用於把來自兩個表的行聯合起來形成一個寬表,Apache Flink 支持的 JOIN 類型:

  • JOIN – INNER JOIN
  • LEFT JOIN – LEFT OUTER JOIN
  • RIGHT JOIN – RIGHT OUTER JOIN
  • FULL JOIN – FULL OUTER JOIN

JOIN 與關係代數的 Join 語義相同,具體如下:

對應的 SQL 語句如下(INNERJOIN):

SELECT ColA, ColB, T2.ColC, ColE FROM TI JOIN T2 ON T1.ColC = T2.ColC ;

LEFT JOIN 與 INNERJOIN 的區別是當右表沒有與左邊相 JOIN 的數據時候,右邊對應的欄位補 NULL 輸出,如下:

對應的 SQL 語句如下(LEFTJOIN):

SELECT ColA, ColB, T2.ColC, ColE FROM TI LEFT JOIN T2 ON T1.ColC = T2.ColC ;

說明:

  • 細心的讀者可能發現上面 T2.ColC 是添加了前綴 T2 了,這裡需要說明一下,當兩張表有欄位名字一樣的時候,我需要指定是從那個表裡面投影的。
  • RIGHT JOIN 相當於 LEFT JOIN 左右兩個表交互一下位置。FULL JOIN 相當於 RIGHT JOIN 和 LEFT JOIN 之後進行 UNION ALL 操作。

2.7 Window

在 Apache Flink 中有 2 種類型的 Window,一種是 OverWindow,即傳統資料庫的標準開窗,每一個元素都對應一個窗口。一種是 GroupWindow,目前在SQL中 GroupWindow 都是基於時間進行窗口劃分的。

2.7.1 OverWindow

OVER Window 目前支持由如下三個元素組合的 8 種類型:

  • 時間 – ProcessingTime 和 EventTime
  • 數據集 – Bounded 和 UnBounded
  • 劃分方式 – ROWS 和 RANGE 我們以的Bounded ROWS 和 Bounded RANGE 兩種常用類型,想大家介紹 Over Window 的語義

  • Bounded ROWS Over Window

Bounded ROWS OVER Window 每一行元素都視為新的計算行,即,每一行都是一個新的窗口。

語法

SELECT
agg1(col1) OVER(
[PARTITION BY (value_expression1,..., value_expressionN)]
ORDER BY timeCol
ROWS
BETWEEN (UNBOUNDED | rowCount) PRECEDING AND CURRENT ROW) AS colName,
...
FROM Tab1

  • value_expression – 進行分區的字表達式;
  • timeCol – 用於元素排序的時間欄位;
  • rowCount – 是定義根據當前行開始向前追溯幾行元素;

語義

我們以 3 個元素(2PRECEDING)的窗口為例,如下圖:

上圖所示窗口 user 1 的 w5 和 w6, user 2 的 窗口 w2 和 w3,雖然有元素都是同一時刻到達,但是他們仍然是在不同的窗口,這一點有別於 RANGEOVER Window.

  • Bounded RANGE Over Window

Bounded RANGE OVER Window 具有相同時間值的所有元素行視為同一計算行,即,具有相同時間值的所有行都是同一個窗口;

語法

Bounded RANGE OVER Window 的語法如下:

SELECT
agg1(col1) OVER(
[PARTITION BY (value_expression1,..., value_expressionN)]
ORDER BY timeCol
RANGE
BETWEEN (UNBOUNDED | timeInterval) PRECEDING AND CURRENT ROW) AS colName,
...
FROM Tab1

  • value_expression – 進行分區的字表達式;
  • timeCol – 用於元素排序的時間欄位;
  • timeInterval – 是定義根據當前行開始向前追溯指定時間的元素行;

語義

我們以 3 秒中數據(INTERVAL『2』 SECOND)的窗口為例,如下圖:

注意: 上圖所示窗口 user 1 的 w6, user 2的 窗口 w3,元素都是同一時刻到達,他們是在同一個窗口,這一點有別於 ROWS OVER Window.

2.7.2 GroupWindow

根據窗口數據劃分的不同,目前 Apache Flink 有如下 3 種 Bounded Winodw:

  • Tumble – 滾動窗口,窗口數據有固定的大小,窗口數據無疊加;
  • Hop – 滑動窗口,窗口數據有固定大小,並且有固定的窗口重建頻率,窗口數據有疊加;
  • Session – 會話窗口,窗口數據沒有固定的大小,根據窗口數據活躍程度劃分窗口,窗口數據無疊加;

說明:Aapche Flink 還支持 UnBounded的 Group Window,也就是全局 Window,流上所有數據都在一個窗口裡面,語義非常簡單,這裡不做詳細介紹了。

GroupWindow 的語法如下:

SELECT
[gk],
agg1(col1),
...
aggN(colN)
FROM Tab1
GROUP BY [WINDOW(definition)], [gk]

[WINDOW(definition)] – 在具體窗口語義介紹中介紹。

  • Tumble Window

Tumble 滾動窗口有固定 size,窗口數據不重疊,具體語義如下:

假設我們要寫一個 2 分鐘大小的 Tumble,示例SQL如下:

SELECT gk, COUNT(*) AS pv
FROM tab
GROUP BY TUMBLE(rowtime, INTERVAL 2 MINUTE), gk

  • Hop Window

Hop 滑動窗口和滾動窗口類似,窗口有固定的 size,與滾動窗口不同的是滑動窗口可以通過 slide 參數控制滑動窗口的新建頻率。因此當 slide 值小於窗口 size 的值的時候多個滑動窗口會重疊,具體語義如下:

假設我們要寫一個統計連續的兩個訪問用戶之間的訪問時間間隔不超過 3 分鐘的的頁面訪問量(PV).

SELECT gk, COUNT(*) AS pv
FROM tab
GROUP BY HOP(rowtime, INTERVAL 5 MINUTE, INTERVAL 10 MINUTE), gk

  • Session Window

Session 會話窗口 是沒有固定大小的窗口,通過 session 的活躍度分組元素。不同於滾動窗口和滑動窗口,會話窗口不重疊,也沒有固定的起止時間。一個會話窗口在一段時間內沒有接收到元素時,即當出現非活躍間隙時關閉。一個會話窗口 分配器通過配置 session gap 來指定非活躍週期的時長,具體語義如下:

假設我們要寫一個統計連續的兩個訪問用戶之間的訪問時間間隔不超過 3 分鐘的的頁面訪問量(PV).

SELECT gk, COUNT(*) AS pv
FROM pageAccessSession_tab
GROUP BY SESSION(rowtime, INTERVAL 3 MINUTE), gk

說明:很多場景用戶需要獲得 Window 的開始和結束時間,上面的 GroupWindow的SQL 示例中沒有體現,那麼窗口的開始和結束時間應該怎樣獲取呢? Apache Flink 我們提供瞭如下輔助函數:

  • TUMBLE_START/TUMBLE_END
  • HOP_START/HOP_END
  • SESSION_START/SESSION_END

這些輔助函數如何使用,請參考如下完整示例的使用方式。

3.完整的 SQL Job 案例

上面我們介紹了 Apache Flink SQL 核心運算元的語法及語義,這部分將選取Bounded EventTime Tumble Window 為例為大家編寫一個完整的包括 Source 和 Sink 定義的 ApacheFlink SQL Job。假設有一張淘寶頁面訪問表(PageAccess_tab),有地域,用戶 ID 和訪問時間。我們需要按不同地域統計每 2 分鐘的淘寶首頁的訪問量(PV). 具體數據如下:

3.1 Source 定義

自定義 Apache Flink Stream Source 需要實現 StreamTableSource, StreamTableSource 中通過 StreamExecutionEnvironment 的 addSource 方法獲取 DataStream, 所以我們需要自定義一個 SourceFunction, 並且要支持產生 WaterMark,也就是要實現 DefinedRowtimeAttributes 介面。出於代碼篇幅問題,我們如下只介紹核心部分,完整代碼 請查看: EventTimeTumbleWindowDemo.scala

3.1.1 Source Function 定義

支持接收攜帶 EventTime 的數據集合,Either 的數據結構 Right 是 WaterMark,Left 是元數據:

class MySourceFunction[T](dataList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] {
override def run(ctx: SourceContext[T]): Unit = {
dataList.foreach {
case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
case Right(w) => ctx.emitWatermark(new Watermark(w)) // emit watermark
}
}
}

3.1.2 定義 StreamTableSource

我們自定義的 Source 要攜帶我們測試的數據,以及對應的 WaterMark 數據,具體如下:

class MyTableSource extends StreamTableSource[Row] with DefinedRowtimeAttributes {

// 頁面訪問表數據 rows with timestamps and watermarks
val data = Seq(
// Data
Left(1510365660000L, Row.of(new JLong(1510365660000L), "ShangHai", "U0010")),
// Watermark
Right(1510365660000L),
..
)

val fieldNames = Array("accessTime", "region", "userId")
val schema = new TableSchema(fieldNames, Array(Types.SQL_TIMESTAMP, Types.STRING, Types.STRING))
val rowType = new RowTypeInfo(
Array(Types.LONG, Types.STRING, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
fieldNames)

override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
// 添加數據源實現
execEnv.addSource(new MySourceFunction[Row](data)).setParallelism(1).returns(rowType)
}
...
}

3.4 Sink 定義

我們簡單的將計算結果寫入到 Apache Flink 內置支持的 CSVSink 中,定義 Sink 如下:

def getCsvTableSink: TableSink[Row] = {
val tempFile = ...
new CsvTableSink(tempFile.getAbsolutePath).configure(
Array[String]("region", "winStart", "winEnd", "pv"),
Array[TypeInformation[_]](Types.STRING, Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP, Types.LONG))
}

3.5 構建主程序

主程序包括執行環境的定義,Source / Sink 的註冊以及統計查 SQL 的執行,具體如下:

def main(args: Array[String]): Unit = {
// Streaming 環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)

// 設置EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

//方便我們查出輸出數據
env.setParallelism(1)

val sourceTableName = "mySource"
// 創建自定義source數據結構
val tableSource = new MyTableSource

val sinkTableName = "csvSink"
// 創建CSV sink 數據結構
val tableSink = getCsvTableSink

// 註冊source
tEnv.registerTableSource(sourceTableName, tableSource)
// 註冊sink
tEnv.registerTableSink(sinkTableName, tableSink)

val sql =
"SELECT " +
" region, " +
" TUMBLE_START(accessTime, INTERVAL 2 MINUTE) AS winStart," +
" TUMBLE_END(accessTime, INTERVAL 2 MINUTE) AS winEnd, COUNT(region) AS pv " +
" FROM mySource " +
" GROUP BY TUMBLE(accessTime, INTERVAL 2 MINUTE), region"

tEnv.sqlQuery(sql).insertInto(sinkTableName);
env.execute()
}

3.6 執行並查看運行結果

執行主程序後我們會在控制檯得到 Sink 的文件路徑,如下:

Sink path : /var/folders/88/8n406qmx2z73qvrzc_rbtv_r0000gn/T/csv_sink_8025014910735142911tem

Cat 方式查看計算結果,如下:

jinchengsunjcdeMacBook-Pro:FlinkTableApiDemo jincheng.sunjc$ cat /var/folders/88/8n406qmx2z73qvrzc_rbtv_r0000gn/T/csv_sink_8025014910735142911tem
ShangHai,2017-11-11 02:00:00.0,2017-11-11 02:02:00.0,1
BeiJing,2017-11-11 02:00:00.0,2017-11-11 02:02:00.0,1
BeiJing,2017-11-11 02:10:00.0,2017-11-11 02:12:00.0,2
ShangHai,2017-11-11 04:10:00.0,2017-11-11 04:12:00.0,1

4.小結

本篇概要的介紹了 Apache Flink SQL 的所有核心運算元,並以一個 End-to-End 的示例展示瞭如何編寫 Apache Flink SQL 的 Job . 希望對大家有所幫助。

孫金城,本期作者;淘寶花名"金竹",Apache Flink Committer,阿里巴巴高級技術專家。目前就職於阿里巴巴計算平臺事業部,自2015年以來一直投入於基於Apache Flink的新一代大數據計算平臺實時計算的設計研發工作。


推薦閱讀:
相關文章