Presto之Task執行框架

概要

熟悉presto的同學都對上面這張圖並不陌生,本文想要著重介紹的是SQL經過Planner之後,如何經由Scheduler最終下發至Worker上進行執行,Task之間數據交互的方式,以及Task的生命周期。

名詞解釋

Coordinator: 負責接收SQL,進行解析,執行計劃生成,Task以及Presto Worker管理。

Worker: 負責Task的執行,驅動運算元進行數據載入,中間計算結果shuffle等。

Stage: 代表了整個SQL分散式執行計劃樹中的一棵子樹,stage往往是因為執行計劃樹的父子節點之間的data distribution不同而進行切分的。

Task: 作為Stage在某一個Presto Worker上的一個instance,負責從上游Task拉取數據,並發地驅動運算元執行,並生成數據供下游Task(或者coordinator)消費。

OutputBuffer: 緩存當前Task生成的數據,供下游Task消費。

Split: 若干個Split組成了整個stage的輸入數據集,對於包含TableScanNode的stage會從connector的table中載入數據,對於不包含TableScanNode的stage會從其他Stage中載入數據。

Stage生成

Presto coordinator在生成Plan tree後,會再對Plan tree進行切分,其切分的依據是遇到一個Remote ExchangeNode就會切分一個Fragment,而該Fragment實際上就對應一個Stage需要執行的Plan。當Plan tree中父子節點之間的data distribution不一致時,ExchangeNode就會被添加。(這裡就不展開介紹,有興趣的可以研究源碼中AddExchanges這個類)

Stage scheduler

Presto coordinator有AllAtOnceExecution、PhasedExecution兩種Stage調度策略。在AllAtOnceExecution策略下,所有stage按照Plan tree中自底向上的順序被逐個調度(其實自頂向下被調度,Presto也是支持的)。而PhasedExecution模式下,coordinator會根據Stage之間的依賴關係,分批對這些Stage進行調度。 上面我們提到了Task是Stage在某個Presto Worker上的一個instance,而Stage在被調度的過程中,如何決策往哪些Presto Worker上下發Task,發送多少個Task呢? 首先對於源頭Stage(包含TableScanNode),coordinator會嘗試從connector獲取對應TableScanNode的splits,split包含isRemotelyAccessible屬性。當remotelyAccessible=false時,該split只能被下發至addresses包含的Presto Worker上;而當remotelyAccessible=true時,該split可能被調度至集群內的任意一個Presto Worker上。當該stage的split為第一次被調度至Presto Worker上時,coordinator就會往該Presto Worker下發Task。 對於非源頭Stage,coordinator會從Presto Worker中選擇min(hashPartitionCount, aliveWokers)個worker,每個worker下發一個task。

Task非同步並行下發

這裡需要進一步明確上一小節中提到的Stage被調度的定義。當某個Stage的所有split均被分配了對應的Task,而該Task、split並不一定已經由coordinator下發至Presto Worker,整個Task、split的下發過程是一個非同步過程。 這裡就有一個比較有意思的問題,在AllAtOnceExecution的調度模式下,雖然Stage是自底向上被調度的,但是由於Task的非同步調度機制存在,某個Task被Presto Worker執行的時候,其上游的Task可能並沒有被對應的Presto Worker執行。這裡不得不重點提一下Presto LazyOutputBuffer機制。當某個Task還未被Presto Worker執行時就收到下游Task數據讀取的請求時,LazyOutputBuffer會記錄下該請求,當該Task被執行並生成數據後,再響應對應的請求。 另外在AllAtOnceExecution的模式下,Stage其實也是可以被自頂向下調度的。這也就是說,Task被執行時,其上游Task可能被調度至哪個Presto Worker執行都是未知的。隨著上游Task被調度,通過StageLinkage的依賴關係,下游Task會被動態分配split,該split中包含了上游Task的Location信息。 綜上兩點,我們可以看到Presto Stage Scheduler是完全解耦的,並且靈活的,任意一個Task被下發,並不依賴於其上游Stage、或者其下游Stage的調度。Task的非同步並行下發機制,不得不說是Presto能夠達到毫秒級響應的一個基石。

Task state

coordinator下發Task後,該Query下所有Task的狀態均有coordinator實時監控。 當且僅當所有Stage下屬的所有Task finish,該Stage進入Finish狀態。 當Root Stage為Finished狀態時,該Query進入Finished狀態。 Task state包含的狀態如下: PLANNED: coordinator分配該Task,但Presto Worker還未接受並實例化該Task RUNNING: Presto Worker接收並實例化Task FINISHED: Task的所有Operator執行結束,並且生成的數據被全部消費完 CANCELED: Query被用戶取消,進而Task進入CANCELED狀態 ABORTED: Query中其他Task執行失敗,進而當前Task被ABORT FAILED: Task執行失敗

Task pipeline

整個Task的狀態管理由coordinator負責,而Task與Task之間僅存在著數據流的pipeline,下游Task通過非同步拉取的方式從上游Task獲取數據。在拉取模式下,數據流pipeline較為容易進行流控(這也是presto做的較為出色的地方),另外非同步的方式,也保證了拉取模式下pipeline的實效性。

結語

Presto作為互動式海量數據SQL查詢引擎,在高效支持海量數據大查詢的同時。其在毫秒級Query的處理能力以及並發度上有著相當不錯的表現(不考慮coordinator單節點)。而本文中提到的高效的非同步並行Stage/Task調度機制便是其中的一個基石。此外,其Stage Scheduler的靈活性,Task的弱關聯性,也為其後面在計算模型上做一些改造提供了相當大的便捷。

參考資料:

docs.starburstdata.com/ github.com/prestodb/pre facebook.com/notes/face

推薦閱讀:

相关文章