作者 | 張俊

本文整理自 2019 年 4 月 13 日在深圳舉行的 Flink Meetup 會議,分享嘉賓張俊,目前擔任 OPPO 大數據平臺研發負責人,也是 Apache Flink contributor。本文主要內容如下:

  • OPPO 實時數倉的演進思路;
  • 基於 Flink SQL 的擴展工作;
  • 構建實時數倉的應用案例;
  • 未來工作的思考和展望。

一.OPPO 實時數倉的演進思路

1.1.OPPO 業務與數據規模

大家都知道 OPPO 是做智能手機的,但並不知道 OPPO 與互聯網以及大數據有什麼關係,下圖概要介紹了 OPPO 的業務與數據情況:

OPPO 作為手機廠商,基於 Android 定製了自己的 ColorOS 系統,當前日活躍用戶超過 2 億。圍繞 ColorOS,OPPO 構建了很多互聯網應用,比如應用商店、瀏覽器、信息流等。在運營這些互聯網應用的過程中,OPPO 積累了大量的數據,上圖右邊是整體數據規模的演進:從 2012 年開始每年都是 2~3 倍的增長速度,截至目前總數據量已經超過 100PB,日增數據量超過 200TB。要支撐這麼大的一個數據量,OPPO 研發出一整套的數據系統與服務,並逐漸形成了自己的數據中臺體系。

1.2.OPPO 數據中臺

今年大家都在談數據中臺,OPPO 是如何理解數據中臺的呢?我們把它分成了 4 個層次:
  • 最下層是統一工具體系,涵蓋了"接入 - 治理 - 開發 - 消費"全數據鏈路;
  • 基於工具體系之上構建了數據倉庫,劃分成"原始層 - 明細層 - 匯總層 - 應用層",這也是經典的數倉架構;
  • 再往上是全域的數據體系,什麼是全域呢?就是把公司所有的業務數據都打通,形成統一的數據資產,比如 ID-Mapping、用戶標籤等;
  • 最終,數據要能被業務用起來,需要場景驅動的數據產品與服務。

以上就是 OPPO 數據中臺的整個體系,而數據倉庫在其中處於非常基礎與核心的位置。

1.3. 構建 OPPO 離線數倉

過往 2、3 年,我們的重點聚焦在離線數倉的構建。上圖大致描述了整個構建過程:首先,數據來源基本是手機、日誌文件以及 DB 資料庫,我們基於 Apache NiFi 打造了高可用、高吞吐的接入系統,將數據統一落入 HDFS,形成原始層;緊接著,基於 Hive 的小時級 ETL 與天級匯總 Hive 任務,分別負責計算生成明細層與匯總層;最後,應用層是基於 OPPO 內部研發的數據產品,主要是報表分析、用戶畫像以及介面服務。此外,中間的明細層還支持基於 Presto 的即席查詢與自助提數。伴隨著離線數倉的逐步完善,業務對實時數倉的訴求也愈發強烈。

1.4. 數倉實時化的訴求

對於數倉實時化的訴求,大家通常都是從業務視角來看,但其實站在平臺的角度,實時化也能帶來切實的好處。首先,從業務側來看,報表、標籤、介面等都會有實時的應用場景,分別參見上圖左邊的幾個案例;其次,對平臺側來說,我們可以從三個案例來看:第一,OPPO 大量的批量任務都是從 0 點開始啟動,都是通過 T+1 的方式去做數據處理,這會導致計算負載集中爆發,對集羣的壓力很大;第二,標籤導入也屬於一種 T+1 批量任務,每次全量導入都會耗費很長的時間;第三,數據質量的監控也必須是 T+1 的,導致沒辦法及時發現數據的一些問題。

既然業務側和平臺側都有實時化的這個訴求,那 OPPO 是如何來構建自己的實時數倉呢?

1.5. 離線到實時的平滑遷移

無論是一個平臺還是一個系統,都離不開上下兩個層次的構成:上層是 API,是面向用戶的編程抽象與介面;下層是 Runtime,是面向內核的執行引擎。我們希望從離線到實時的遷移是平滑的,是什麼意思呢?從 API 這層來看,數倉的抽象是 Table、編程介面是 SQL+UDF,離線數倉時代用戶已經習慣了這樣的 API,遷移到實時數倉後最好也能保持一致。而從 Runtime 這層來看,計算引擎從 Hive 演進到了 Flink,存儲引擎從 HDFS 演進到了 Kafka。

基於以上的思路,只需要把之前提到的離線數倉 pipeline 改造下,就得到了實時數倉 pipeline。

1.6. 構建 OPPO 實時數倉

從上圖可以看到,整個 pipeline 與離線數倉基本相似,只是把 Hive 替換為 Flink,把 HDFS 替換為 Kafka。從總體流程來看,基本模型是不變的,還是由原始層、明細層、匯總層、應用層的級聯計算來構成。

因此,這裡的核心問題是如何基於 Flink 構建出這個 pipeline,下面就介紹下我們基於 Flink SQL 所做的一些工作。

二. 基於 Flink SQL 的擴展工作

2.1.Why Flink SQL

首先,為什麼要用 Flink SQL? 下圖展示了 Flink 框架的基本結構,最下面是 Runtime,這個執行引擎我們認為最核心的優勢是四個:第一,低延遲,高吞吐;第二,端到端的 Exactly-once;第三,可容錯的狀態管理;第四,Window & Event time 的支持。基於 Runtime 抽象出 3 個層次的 API,SQL 處於最上層。

Flink SQL API 有哪些優勢呢?我們也從四個方面去看:第一,支持 ANSI SQL 的標準;第二,支持豐富的數據類型與內置函數,包括常見的算術運算與統計聚合;第三,可自定義 Source/Sink,基於此可以靈活地擴展上下游;第四,批流統一,同樣的 SQL,既可以跑離線也可以跑實時。

那麼,基於 Flink SQL API 如何編程呢?下面是一個簡單的演示:

首先是定義與註冊輸入 / 輸出表,這裡創建了 2 張 Kakfa 的表,指定 kafka 版本是什麼、對應哪個 topic;接下來是註冊 UDF,篇幅原因這裡沒有列出 UDF 的定義;最後是纔是執行真正的 SQL。可以看到,為了執行 SQL,需要做這麼多的編碼工作,這並不是我們希望暴露給用戶的介面。

2.2. 基於 WEB 的開發 IDE

前面提到過,數倉的抽象是 Table,編程介面是 SQL+UDF。對於用戶來說,平臺提供的編程界面應該是類似上圖的那種,有用過 HUE 做交互查詢的應該很熟悉。左邊的菜單是 Table 列表,右邊是 SQL 編輯器,可以在上面直接寫 SQL,然後提交執行。要實現這樣一種交互方式,Flink SQL 默認是無法實現的,中間存在 gap,總結下來就 2 點:第一,元數據的管理,怎麼去創建庫表,怎麼去上傳 UDF,使得之後在 SQL 中可直接引用;第二,SQL 作業的管理,怎麼去編譯 SQL,怎麼去提交作業。在技術調研過程中,我們發現了 Uber 在 2017 年開源的 AthenaX 框架。

2.3.AthenaX:基於 REST 的 SQL 管理器

AthenaX 可以看作是一個基於 REST 的 SQL 管理器,它是怎麼實現 SQL 作業與元數據管理的呢?

對於 SQL 作業提交,AthenaX 中有一個 Job 的抽象,封裝了要執行的 SQL 以及作業資源等信息。所有的 Job 由一個 JobStore 來託管,它定期跟 YARN 當中處於 Running 狀態的 App 做一個匹配。如果不一致,就會向 YARN 提交對應的 Job。

對於元數據管理,核心的問題是如何將外部創建的庫表注入 Flink,使得 SQL 中可以識別到。實際上,Flink 本身就預留了與外部元數據對接的能力,分別提供了 ExternalCatalog 和 ExternalCatalogTable 這兩個抽象。AthenaX 在此基礎上再封裝出一個 TableCatalog,在介面層面做了一定的擴展。在提交 SQL 作業的階段,AthenaX 會自動將 TableCatalog 註冊到 Flink,再調用 Flink SQL 的介面將 SQL 編譯為 Flink 的可執行單元 JobGraph,並最終提交到 YARN 生成新的 App。

AthenaX 雖然定義好了 TableCatalog 介面,但並沒有提供可直接使用的實現。那麼,我們怎麼來實現,以便對接到我們已有的元數據系統呢?

2.4.Flink SQL 註冊庫表的過程

首先,我們得搞清楚 Flink SQL 內部是如何註冊庫表的。整個過程涉及到三個基本的抽象:TableDescriptor、TableFactory 以及 TableEnvironment。

TableDescriptor 顧名思義,是對錶的描述,它由三個子描述符構成:第一是 Connector,描述數據的來源,比如 Kafka、ES 等;第二是 Format,描述數據的格式,比如 csv、json、avro 等;第三是 Schema,描述每個欄位的名稱與類型。TableDescriptor 有兩個基本的實現——ConnectTableDescriptor 用於描述內部表,也就是編程方式創建的表;ExternalCatalogTable 用於描述外部表。

有了 TableDescriptor,接下來需要 TableFactory 根據描述信息來實例化 Table。不同的描述信息需要不同的 TableFactory 來處理,Flink 如何找到匹配的 TableFactory 實現呢?實際上,為了保證框架的可擴展性,Flink 採用了 Java SPI 機制來載入所有聲明過的 TableFactory,通過遍歷的方式去尋找哪個 TableFactory 是匹配該 TableDescriptor 的。TableDescriptor 在傳遞給 TableFactory 前,被轉換成一個 map,所有的描述信息都用 key-value 形式來表達。TableFactory 定義了兩個用於過濾匹配的方法——一個是 requiredContext(),用於檢測某些特定 key 的 value 是否匹配,比如 connector.type 是否為 kakfa;另一個是 supportedProperties(),用於檢測 key 是否能識別,如果出現不識別的 key,說明無法匹配。

匹配到了正確的 TableFactory,接下來就是創建真正的 Table,然後將其通過 TableEnvironment 註冊。最終註冊成功的 Table,才能在 SQL 中引用。

2.5.Flink SQL 對接外部數據源

搞清楚了 Flink SQL 註冊庫表的過程,給我們帶來這樣一個思路:如果外部元數據創建的表也能被轉換成 TableFactory 可識別的 map,那麼就能被無縫地註冊到 TableEnvironment。基於這個思路,我們實現了 Flink SQL 與已有元數據中心的對接,大致過程參見下圖:

通過元數據中心創建的表,都會將元數據信息存儲到 MySQL,我們用一張表來記錄 Table 的基本信息,然後另外三張表分別記錄 Connector、Format、Schema 轉換成 key-value 後的描述信息。之所以拆開成三張表,是為了能夠能獨立的更新這三種描述信息。接下來是定製實現的 ExternalCatalog,能夠讀取 MySQL 這四張表,並轉換成 map 結構。

2.6. 實時表 - 維表關聯

到目前為止,我們的平臺已經具備了元數據管理與 SQL 作業管理的能力,但是要真正開放給用戶使用,還有一點基本特性存在缺失。通過我們去構建數倉,星型模型是無法避免的。這裡有一個比較簡單的案例:中間的事實表記錄了廣告點擊流,周邊是關於用戶、廣告、產品、渠道的維度表。

假定我們有一個 SQL 分析,需要將點擊流表與用戶維表進行關聯,這個目前在 Flink SQL 中應該怎麼來實現?我們有兩種實現方式,一個基於 UDF,一個基於 SQL 轉換,下面分別展開來講一下。

2.7. 基於 UDF 的維表關聯

首先是基於 UDF 的實現,需要用戶將原始 SQL 改寫為帶 UDF 調用的 SQL,這裡是 userDimFunc,上圖右邊是它的代碼實現。UserDimFunc 繼承了 Flink SQL 抽象的 TableFunction,它是其中一種 UDF 類型,可以將任意一行數據轉換成一行或多行數據。為了實現維表關聯,在 UDF 初始化時需要從 MySQL 全量載入維表的數據,緩存在內存 cache 中。後續對每行數據的處理,TableFunction 會調用 eval() 方法,在 eval() 中根據 user_id 去查找 cache,從而實現關聯。當然,這裡是假定維表數據比較小,如果數據量很大,不適合全量的載入與緩存,這裡不做展開了。

基於 UDF 的實現,對用戶和平臺來說都不太友好:用戶需要寫奇怪的 SQL 語句,比如圖中的 LATERAL TABLE;平臺需要為每個關聯場景定製特定的 UDF,維護成本太高。有沒有更好的方式呢?下面我們來看看基於 SQL 轉換的實現。

2.8. 基於 SQL 轉換的維表關聯

我們希望解決基於 UDF 實現所帶來的問題,用戶不需要改寫原始 SQL,平臺不需要開發很多 UDF。有一種思路是,是否可以在 SQL 交給 Flink 編譯之前,加一層 SQL 的解析與改寫,自動實現維表的關聯?經過一定的技術調研與 POC,我們發現是行得通的,所以稱之為基於 SQL 轉換的實現。下面將該思路展開解釋下。

首先,增加的 SQL 解析是為了識別 SQL 中是否存在預先定義的維度表,比如上圖中的 user_dim。一旦識別到維表,將觸發 SQL 改寫的流程,將紅框標註的 join 語句改寫成新的 Table,這個 Table 怎麼得到呢?我們知道,流計算領域近年來發展出「流表二象性」的理念,Flink 也是該理念的踐行者。這意味著,在 Flink 中 Stream 與 Table 之間是可以相互轉換的。我們把 ad_clicks 對應的 Table 轉換成 Stream,再調用 flatmap 形成另一個 Stream,最後再轉換回 Table,就得到了 ad_clicks_user。最後的問題是,flatmap 是如何實現維表關聯的?

Flink 中對於 Stream 的 flatmap 操作,實際上是執行一個 RichFlatmapFunciton,每來一行數據就調用其 flatmap() 方法做轉換。那麼,我們可以定製一個 RichFlatmapFunction,來實現維表數據的載入、緩存、查找以及關聯,功能與基於 UDF 的 TableFunction 實現類似。

既然 RichFlatmapFunciton 的實現邏輯與 TableFunction 相似,那為什麼相比基於 UDF 的方式,這種實現能更加通用呢?核心的點在於多了一層 SQL 解析,可以將維表的信息獲取出來(比如維表名、關聯欄位、select 欄位等),再封裝成 JoinContext 傳遞給 RichFlatmapFunciton,使得的表達能力就具備通用性了。

三.構建實時數倉的應用案例

下面分享幾個典型的應用案例,都是在我們的平臺上用 Flink SQL 來實現的。

3.1. 實時 ETL 拆分

這裡是一個典型的實時 ETL 鏈路,從大表中拆分出各業務對應的小表:

OPPO 的最大數據來源是手機端埋點,從手機 APP 過來的數據有一個特點,所有的數據是通過統一的幾個通道上報過來。因為不可能每一次業務有新的埋點,都要去升級客戶端,去增加新的通道。比如我們有個 sdk_log 通道,所有 APP 應用的埋點都往這個通道上報數據,導致這個通道對應的原始層表巨大,一天幾十個 TB。但實際上,每個業務只關心它自身的那部分數據,這就要求我們在原始層進行 ETL 拆分。

這個 SQL 邏輯比較簡單,無非是根據某些業務欄位做篩選,插入到不同的業務表中去。它的特點是,多行 SQL 最終合併成一個 SQL 提交給 Flink 執行。大家擔心的是,包含了 4 個 SQL,會不會對同一份數據重複讀取 4 次?其實,在 Flink 編譯 SQL 的階段是會做一些優化的,因為最終指向的是同一個 kafka topic,所以只會讀取 1 次數據。

另外,同樣的 Flink SQL,我們同時用於離線與實時數倉的 ETL 拆分,分別落入 HDFS 與 Kafka。Flink 中本身支持寫入 HDFS 的 Sink,比如 RollingFileSink。

3.2. 實時指標統計

這裡是一個典型的計算信息流 CTR 的這個案例,分別計算一定時間段內的曝光與點擊次數,相除得到點擊率導入 Mysql,然後通過我們內部的報表系統來可視化。這個 SQL 的特點是它用到了窗口 (Tumbling Window) 以及子查詢。

3.3. 實時標籤導入

這裡是一個實時標籤導入的案例,手機端實時感知到當前用戶的經緯度,轉換成具體 POI 後導入 ES,最終在標籤系統上做用戶定向。

這個 SQL 的特點是用了 AggregateFunction,在 5 分鐘的窗口內,我們只關心用戶最新一次上報的經緯度。AggregateFunction 是一種 UDF 類型,通常是用於聚合指標的統計,比如計算 sum 或者 average。在這個示例中,由於我們只關心最新的經緯度,所以每次都替換老的數據即可。

四. 未來工作的思考和展望

最後,給大家分享一下關於未來工作,我們的一些思考與規劃,還不是太成熟,拋出來和大家探討一下。

4.1. 端到端的實時流處理

什麼是端到端?一端是採集到的原始數據,另一端是報表 / 標籤 / 介面這些對數據的呈現與應用,連接兩端的是中間實時流。當前我們基於 SQL 的實時流處理,源表是 Kafka,目標表也是 Kafka,統一經過 Kafka 後再導入到 Druid/ES/HBase。這樣設計的目的是提高整體流程的穩定性與可用性:首先,kafka 作為下游系統的緩衝,可以避免下游系統的異常影響實時流的計算(一個系統保持穩定,比起多個系統同時穩定,概率上更高點);其次,kafka 到 kafka 的實時流,exactly-once 語義是比較成熟的,一致性上有保證。

然後,上述的端到端其實是由割裂的三個步驟來完成的,每一步可能需要由不同角色人去負責處理:數據處理需要數據開發人員,數據導入需要引擎開發人員,數據資產化需要產品開發人員。

我們的平臺能否把端到端給自動化起來,只需要一次 SQL 提交就能打通處理、導入、資產化這三步?在這個思路下,數據開發中看到的不再是 Kafka Table,而應該是面向場景的展示表 / 標籤表 / 介面表。比如對於展示表,創建表的時候只要指定維度、指標等欄位,平臺會將實時流結果數據從 Kafka 自動導入 Druid,再在報表系統自動導入 Druid 數據源,甚至自動生成報表模板。

4.2. 實時流的血緣分析

關於血緣分析,做過離線數倉的朋友都很清楚它的重要性,它在數據治理中都起著不可或缺的關鍵作用。對於實時數倉來說也莫不如此。我們希望構建端到端的血緣關係,從採集系統的接入通道開始,到中間流經的實時表與實時作業,再到消費數據的產品,都能很清晰地展現出來。基於血緣關係的分析,我們才能評估數據的應用價值,覈算數據的計算成本。

4.3. 離線 - 實時數倉一體化

最後提一個方向是離線實時數倉的一體化。我們認為短期內,實時數倉無法替代離線數倉,兩者並存是新常態。在離線數倉時代,我們積累的工具體系,如何去適配實時數倉,如何實現離線與實時數倉的一體化管理?理論上來講,它們的數據來源是一致的,上層抽象也都是 Table 與 SQL,但本質上也有不同的點,比如時間粒度以及計算模式。對於數據工具與產品來說,需要做哪些改造來實現完全的一體化,這也是我們在探索和思考的。

本文作者:apache_flink

原文鏈接

更多技術乾貨敬請關注云棲社區知乎機構號:阿里云云棲社區 - 知乎

本文為雲棲社區原創內容,未經允許不得轉載。


推薦閱讀:
相關文章