前幾天,浪尖發了一篇文章,講了Spark 2.4發布更新情況:

Spark2.4.0發布了!

其中,就有一項說到Spark 為了支持深度學習而引入的屏障調度器。本文就詳細講講。

基於消息傳遞結構的計算模型和Spark計算模型是有很大區別。在Spark 內部,每個Stage的某一個一個task不會依賴於相同Stage任何其他的task,因此,Spark的task 可以被獨立進行調度執行。為了在Spark中嵌入MPI功能,需要引入一個新的調度模型,暫時命名為「屏障調度」(浪尖直譯自barrier scheduling),該調度模型會同時啟動任務,並為用戶提供足夠的信息和工具,將分散式DL訓練嵌入到Spark Pipeline中。 Spark還為MPI任務引入了一種新的容錯機制。當任何MPI任務在中間失敗時,Spark將中止所有任務並重新啟動該stage。

1. 要求

概述

  • 每個job中單個barrier stage。
  • 每個job中多個barrier stage。
  • 多job且每個job都帶有barrier stage。
  • Barrier stage 請求的slot比可用的slot多(無動態資源申請)。
  • Barrier stage請求的slot比可用的slot多(有動態資源申請)。(Spark 2.4就不要想了)

目標

  • 支持barrier調度:對於同一個barrierstage同時啟動所有task,並且提供給用戶足夠的信息和工具,以便用戶可以嵌入分散式DL訓練模型。
  • 正確的處理失敗的場景。
  • Barrier執行模式支持運行與Standalone模式
  • 使用yarn/mesos/k8s的用戶可以再有BarrierStage的時候設置MPI。

安全

用戶使用外部線程啟動MPI任務的時候,存在外部進行不被殺死而導致內存泄漏的風險。Barrier tasks會使用遠程客戶端相互交流,但是不會影響Spark當前的安全模型。

API變化

class RDD[T] {
/** Indicates that Spark must launch the tasks together for the current stage. */
def barrier(): RDDBarrier[T] = ???
}
/** A [[TaskContext]] with extra info and tooling for a barrier stage. */
class BarrierTaskContext extends TaskContext {
/** Sets a global barrier and waits until all tasks in this stage hit this
barrier. */
def barrier(): Unit = ???
/** Returns the all task infos in this stage. */
def getTaskInfos(): Array[BarrierTaskInfo]
}
/** Represents an RDD barrier, which forces Spark to launch tasks of this stage
together. */
class RDDBarrier[T] {
/** Maps partitions together with a provided [[BarrierTaskContext]]. */
def mapPartitions[S](f: Iterator[T] => Iterator[S]): RDD[S] = ???
/** TODO extra conf(e.g. timeout) */
}

使用案例

rdd.barrier().mapPartitions { iter =>
// Write iter to disk.
???
// Fetch TaskContext
val context = BarrierTaskContext.get()
// Wait until all tasks finished writing.
context.barrier()
// The 0-th task launches an MPI job.
if (context.partitionId() == 0) {
val hosts = context.getTaskInfos().map(_.address)
// Set up MPI machine file using host infos.
???
// Launch the MPI job by calling mpirun.
??? }
// Wait until the MPI job finished.
context.barrier()
// Collect output and return.
??? }

3. 架構

設計提議

為了使spark支持屏障調度(barrier scheduling),在Spark內部增加了RDDBarrier和BarrierTaskContext。

BarrierStage

如果沒有充足的slot資源,barrier stage不會被拉起(也即是空閑的core 必須能夠拉起該barrier所有tasks),這樣設計使為了滿足一次拉起所有task的目標。

同時當任意的task執行失敗的時候,總是重啟整個barrier stage。

判斷一個stage是否是Barrier stage的一種方式是跟蹤Stage所包含的RDD,如果該stage包含RDDBarrier 或者至少一個父RDD是RDDBarrier,那麼該stage是一個barrier stage,當然要以shuffleDependency為界限。

調度Barrier Tasks

目前,TaskScheduler會儘可能的在可用的slot上調度task,所以通常不會同時啟動同一個stage的所有task。因此需要在barrier stage 的task在調度之前加上資源可用性判斷。由於任務的局部性問題,仍然可能僅啟動整個barrier stage的部分tasks,因此必須在啟動任務之前在此檢查確認同一個barrier stage的所有task同時被啟動。

Barrier tasks預計比常規tasks具有更長的生命周期,因此barriertasks可能會在相對長的時間範圍內佔用集群資源,後續提交的任務估計會延遲運行或者僅使用更少的slot運行。建議使用Fair調度策略進行調度,而不是默認的FIFO調度策略,並將barrier任務獨立運行,這樣至少可以保證普通任務可以在配置給定最少的集群資源上運行。

另一個問題是barrier stage可以提交,但是集群當前沒有足夠的slot來同時啟動所有barrier tasks。如果啟用了動態資源分配,則在等待一段時間後,可能會或可能不會滿足要求(取決於允許的最大節點)。對於Spark 2.4,提出了一個簡單的解決方案,它只檢查當前運行的slot的總數,如果數量不足以同時啟動同一個stage的所有屏障任務,則取消該job。目標是在3.0的時候可以更好地與動態資源分配集成。對於Spark 2.4,在啟用動態資源分配時,job會立即失敗,或者job無法連續提交,因為它試圖提交一個barrier stage,該stage需要比集群中當前可用的slot更多的slot。

Task Barrier

Barrier tasks應允許用戶在task執行過程中插入同步操作,這可以通過在BarrierTaskContext中引入全局barrier操作來實現,這使得當前任務等待直到同一stage中的所有task都達到此barrier。將為BarrierTaskContext.barrier()提交單獨的設計文檔。

關注公眾號,bigdatatip,回復barrier 即可獲得該文檔。

失敗容錯

為確保正確性,當任何task失敗時,barrier stage始終會重試整個stage。因此,將要求殺死失敗stage的所有正在運行的任務,並且還保證每個單個stage最多只能運行一個taskset (沒有zombie task),這是非常簡單的。理想情況下,除了在zombie taskset中殺死正在運行的任務需要一段時間,每個單一stage只應運行一個taskset,必須將失敗的taskset標記為zombie 並正確處理TaskKilled消息。

推測任務(Speculativetask)

在barrier 執行模式中,要求每個barrier task必須僅有一個唯一的task ID,目的是其他的tasks 可以直接使用該ID和它交互。這也就意味著每個task只能嘗試啟動一次,因此必須禁止推測執行。

此外,3.0的時候可能會將Spark任務推測執行設置為單個stage的配置而不是全局配置。

SparkContext.runJob()/PartitionPruningRDD

SparkContext.runJob()執行的時候可以僅是所有分區的子集,其中一個用例是RDD.first(),不會執行所有分區。這種是與barrer執行模式衝突的,可能無法啟動某些barrier tasks。在barrier stage檢測到這種用法,會由於不支持該操作而拋出異常。

ParititionPruningRDD的情況類似,它只在滿足`partitionFilterFunc`的分區上啟動任務。我們將在barrierstage 檢測PartitionPruningRDD並拋出顯式異常。

以上問題都與父RDD與生成的RDD具有不同分區數的問題有關(例如union()/ coalesce()/ first()/ take()/ PartitionPruningRDD),因此可以檢測RDD的血統鏈條,然後在job 提交的時候立即停止。

如果RDD依賴於多個barrier RDD(例如,barrierRdd1.zip(barrierRdd2)),也將立即停止,如果發生這種情況,則無法確保`barrier()`調用的正確行為。

針對Spark 3.0,可以進一步調查上述用例並提出支持它們的方法。

本文牽涉到的英文原文,關注公眾號 bigdatatip,輸入 barrier 獲取。

推薦閱讀:

大數據啊大數據!?

mp.weixin.qq.com
圖標
Spark SQL的幾個里程碑!?

mp.weixin.qq.com

最常見的Kafka面試題及答案?

mp.weixin.qq.com
圖標
乾貨|kafka流量監控的原理及實現?

mp.weixin.qq.com
圖標

推薦閱讀:
相关文章