1. Spark SQL 運行架構

  1. SqlParser 將 SQL 語句轉換為未解析的邏輯查詢計劃;
  2. Analyzer 對邏輯查詢計劃進行屬性和關係關聯檢驗;
  3. Optimizer 通過邏輯查詢優化將邏輯查詢計劃轉換為優化的邏輯查詢計劃;
  4. QueryPlanner 將優化的邏輯查詢計劃轉換為物理查詢計劃,再根據過去的性能統計數據,選擇最佳的物理執行計劃 CostModel,得到 SparkPlan;
  5. 生成 RDD 相關代碼,並執行。

2. Spark SQL 中Optimizer 的常見優化策略有哪些?

基於規則的優化(RBO)

  • 列裁剪:利用列式存儲,減少網路、內存消耗
  • 謂詞下推:提前過濾,減少計算數據量
  • 常量摺疊:減少常量操作,如從 1+1 優化為 2 避免每一條 record 都需要執行一次 1+1 的操作
  • 操作合併:合併多個連續的 Filter、limit 等運算元,加快計算

基於 Cost 的優化(CBO)

Join 方案的選擇

3. Join 的幾種實現方式?

Broadcast Hash Join 和 Shuffle Hash Join 都是基於 Hash Join 的變體,所以先介紹下 Hash Join:

取一張表作為 Build Table,對其按照 Join Key 構建 Hash Table,另外一張表作為 Probe Table,掃描 Probe Table,對其參與的 Join Key Hash 映射 Hash Table 中的記錄,檢查 join key 值是否匹配,匹配即 Join 上了。這便是單機版 Hash Join 的基本思想。

Broadcast Hash Join

  1. Broadcast 階段:將小表廣播分發到大表所在的所有主機。廣播演算法可以有很多,最簡單的是先發給 Driver,Driver 再統一分發給所有 Executor;要不就是基於 bittorrete 的 p2p 思路;
  2. Hash Join 階段:在每個 Executor上執行單機版 Hash Join,小表映射,大表試探。

Spark SQL 規定 Broadcast Hash Join 執行的基本條件為被廣播小表必須小於參數 spark.sql.autoBroadcastJoinThreshold,默認為 10M 。

Shuffle Hash Join

顯然 Broadcast Hash Join 只適合對小表進行廣播,如果是大表呢?此時可以按照 Join Key 進行分區,根據 Key 相同必然分區相同的原理,就可以將大表 Join 分而治之,劃分為很多小表的 Join,充分利用集羣資源並行化。

  1. Shuffle 階段:分別將兩個表按照 Join Key 進行分區,將相同 Join Key 的記錄重分佈到同一分區,這樣兩張表的數據會被重分佈到集羣中所有節點,這個過程稱為 shuffle;
  2. Hash Join 階段:每個分區節點上的數據單獨執行單機 Hash Join 演算法。

Sort Merge Join

如果兩張表都是大表的話,Shuffle Hash Join 在 Hash Join 階段仍然可能比較喫力,Sort Merge Join 就是針對這一階段的優化:

  1. Shuffle 階段:同 Shuffle Hash Join 一樣分別將兩個表根據 Join Key 進行分區;
  2. Sort 階段:對單個分區節點的兩個表的數據,分別進行排序;
  3. Merge 階段:對排好序的兩張分區表數據執行 Join 操作。Join操作很簡單,分別遍歷兩個有序序列,碰到相同 Join key 就 Merge 輸出。

簡單來說 Sort Merge Join 就是通過排序加快了 Hash Join。

參考文章

  1. BigData-『基於代價優化』究竟是怎麼一回事?

本文首發於公眾號「數據Man」,歡迎關注!

數據Man

推薦閱讀:

相關文章