前言:

  • 在hadoop 1.x中和hadoop 2.x中,mapreduce的執行流程也不一樣(完全不一樣),在hadoop1.x中 mapreduce的 資源管理與分配和任務監控都是集中在 jobtracker 上,這樣會造成jobtracker的負擔非常大,而且在hadoop 1.x中沒有jobtracker的HA機制,從而會讓集群的健壯性很差

1、hadoop1.x mapreduce執行過程

簡述:client端提交job給jobtracker,jobtracker會給這個job分配資源,在tasktracker上啟動task任務,而且還要監控task任務的狀況,如果task掛了,jobtracker還得重新分配新的資源給掛了的task任務,當task執行完成後,jobtracker會為reduce任務分配資源,然後監控reduce的執行流程,最後執行完成輸出

  • 1、客戶端(Client):編寫mapreduce程序,配置作業,提交作業,啟動Jobclient進程。
  • 2、Jobclient向JobTracker請求一個Job ID, 也就是作業ID。
  • 3、Jobclient拷貝計算需要的代碼等文件
    • Jobclient將運行作業所需要的資源文件複製到HDFS上,包括MapReduce程序打包的JAR文件、配置文件和客戶端計算所得的輸入劃分信息。這些文件都存放在JobTracker專門為該作業創建的文件夾中。文件夾名為該作業的Job ID。JAR文件默認會有10個副本.

  • mapred.submit.replication屬性控制;輸入劃分信息告訴了JobTracker應該為這個作業啟動多少個map任務等信息
  • 4、JobTracker接收到作業後,放入調度隊列,等待調度
    • JobTracker接收到Jobclient提交的作業後,將其放在一個作業隊列里,等待作業調度器對其進行調度
  • 5、在map task開始執行時
    • 它的輸入數據來源於HDFS的block,當然在MapReduce概念中,map task只讀取split。Split與block的對應關係可能是多對一,默認是一對一。
    • 讀取輸入文件內容,解析成key、value對。對輸入文件的每一行,解析成key、value對。每一個鍵值對調用一次map函數,轉換成新的key、value輸出。
    • map任務執行過程中溢寫執行過程:當map task的輸出結果很多時,就可能會撐爆內存,所以需要在一定條件下將緩衝區中的數據臨時寫入磁碟,然後重新利用這塊緩衝區。這個從內存往磁碟寫數據的過程被稱為Spill,中文可譯為溢寫,字面意思很直觀。這個溢寫是由單獨線程來完成,不影響往緩衝區寫map結果的線程。溢寫線程啟動時不應該阻止map的結果輸出,所以整個緩衝區有個溢寫的比例spill.percent。這個比例默認是0.8,也就是當緩衝區的數據已經達到閾值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢寫線程啟動,鎖定這80MB的內存,執行溢寫過程。Map task的輸出結果還可以往剩下的20MB內存中寫,互不影響。 當溢寫線程啟動後,需要對這80MB空間內的key做排序(Sort)。排序是MapReduce模型默認的行為,這裡的排序也是對序列化的位元組做的排序。此處參考另外一篇文章《MapReduce之mapOutputBuffer解析》
    • 當map task真正完成時,內存緩衝區中的數據也全部溢寫到磁碟中形成一個溢寫文件。
      • 最終磁碟中會至少有一個這樣的溢寫文件存在(如果map的輸出結果很少,當map執行完成時,只會產生一個溢寫文件),因為最終的文件只有一個,所以需要將這些溢寫文件歸併到一起,這個過程就叫做Merge。Merge是怎樣的?如前面的例子,「aaa」從某個map task讀取過來時值是5,從另外一個map 讀取時值是8,因為它們有相同的key,所以得merge成group。什麼是group。對於「aaa」就是像這樣的:{「aaa」, [5, 8, 2, …]},數組中的值就是從不同溢寫文件中讀取出來的,然後再把這些值加起來。請注意,因為merge是將多個溢寫文件合併到一個文件,所以可能也有相同的key存在,在這個過程中如果client設置過Combiner,也會使用Combiner來合併相同的key

    • 至此,map端的所有工作都已結束:tasktarcker通知JobTracker,map任務執行完畢,並告知數據輸出路徑,JobTracker啟動reduce任務
      • 最終生成的這個文件也存放在TaskTracker夠得著的某個本地目錄內。每個reduce task不斷地通過RPC從JobTracker那裡獲取map task是否完成的信息,如果reduce task得到通知,獲知某台TaskTracker上的map task執行完成,Shuffle的後半段過程開始啟動。 簡單地說,reduce task在執行之前的工作就是不斷地拉取當前job里每個map task的最終結果,然後對從不同地方拉取過來的數據不斷地做merge,也最終形成一個文件作為reduce task的輸入文件
  • 5、Partitioner: 該主要在Shuffle過程中按照Key值將中間結果分成R份,其中每份都有一個Reduce去負責,可以通過job.setPartitionerClass()方法進行設置,默認的使用hashPartitioner類。實現getPartition函數
  • 6、在reduce task開始執行

    • COPY過程,簡單地拉取數據
      • Reduce進程啟動一些數據copy線程(Fetcher),通過HTTP方式請求map task所在的TaskTracker獲取map task的輸出文件。因為map task早已結束,這些文件就歸TaskTracker管理在本地磁碟中。

    • Merge階段:這裡的merge如map端的merge動作,只是數組中存放的是不同map端copy來的數值.
      • Copy過來的數據會先放入內存緩衝區中,這裡的緩衝區大小要比map端的更為靈活,它基於JVM的heap size設置,因為Shuffle階段Reducer不運行,所以應該把絕大部分的內存都給Shuffle用.
      • merge三種形式 :
        • 1)內存到內存: 默認情況關閉)
        • 2)內存到磁碟: 當內存中的數據量到達一定閾值
        • 3)磁碟到磁
      • 與map 端類似,這也是溢寫的過程,這個過程中如果你設置有Combiner,也是會啟用的
      • 第二種merge方式一直在運行,直到沒有map端的數據時才結束,然後啟動第三種磁碟到磁碟的merge方式生成最終的那個文件

    • Reducer的輸入文件:不斷地merge後,最後會生成一個「最終文件」
      • 為什麼加引號?因為這個文件可能存在於磁碟上,也可能存在於內存中。對我們來說,當然希望它存放於內存中,直接作為Reducer的輸入,但默認情況下,這個文件是存放於磁碟中的。當Reducer的輸入文件已定,整個Shuffle才最終結束。然後就是Reducer執行,把結果放到HDFS上。

2、Yarn簡介

  • Yarn採用Master/Slave結構,總體上採用了雙層調度架構。
    • 在第一層的調度是ResourceManager和NodeManager。ResourceManager是Master節點,相當於JobTracker,包含Scheduler和App Manager兩個組件。這兩個組件分管資源調度和應用管理;NodeManager是Slave節點,可以部署在獨立的機器上,用於管理機器上的資源。NodeManager會向ResourceManager報告它的資源數量、使用情況並接受ResourceManager的資源調度。
    • 第二層的調度指的是NodeManager和Container。NodeManager會將Cpu&內存等資源抽象成一個個的Container,並管理這些Container的生命周期。

  • yarn架構組件分析
    • ResourceManager(master服務):負責資源管理的主服務,整個系統只有一個,它包含兩個組件Scheduler和ApplicationManager
    • 1。Scheduler用於調度集群中的各種隊列,應用。在Hadoop的MapReduce框架中主要有Capacity Scheduler和Fair Scheduler
  1. ApplicationManager主要負責接收Job的提交請求,為應用分配第一個Container來運行ApplicationMaster,同時監控ApplicationMaster,遇到失敗時重啟ApplicationMaster
    • NodeManager(Slave服務):每個節點上的進程,管理這個節點上的資源分配和監控節點的健康狀態
    • 將cpu和內存資源抽象成一個個的Container,並管理它們的生命周期
    • 啟動時向RM註冊並告知本身有多少資源可用,運行期間上報/接收查詢資源的請求
    • 分配Container給應用的某個任務,不知道運行在它上面的應用的信息

  • 雙層資源調度的意思是Master調度Slave(NM),NM調度Container(cpu&內存)
  • 一個作業的執行過程:
    • 1、客戶端向ResourceManager的App Manager提交應用並請求一個App Master實例
    • 2、ResourceManager找到可以運行一個Container的NodeManager,並在這個Container中啟動App Master實例
    • 3、App Master向ResourceManager註冊,註冊之後,客戶端就可以查詢ResourceManager獲得自己App Master的詳情以及和App Master直接交互了
    • 4、接著App Master向Resource Manager請求資源,即Container
    • 5、獲得Container後,App Master啟動Container,並執行Task
    • 6、Container執行過程中,會把運行進度和狀態等信息發送給App Master
    • 7、客戶端主動和App Master交流應用的運行狀態、進度更新等信息
    • 8、所有工作完成,App Master向RM取消註冊然後關閉,同時所有的Container也歸還給系統
    • 通過這個Job的處理過程,我們可以看到,App Master的角色是作為Job的驅動,驅動了Job任務的調度執行。在這個運作流程中,App Manager只需要管理App Master的生命周期以及保存它的內部狀態。 而App Master這個角色的抽象,使得每種類型的應用,都可以定製自己的App Master。這樣其他的計算模型就可以相對容易地運行在Yarn集群上。

  • Yarn的三種資源分配方式。Yarn是通過將資源分配給queue來進行資源分配的。每個queue可以設置它的資源分配方式
    • FIFO Scheduler
      • 如果沒有配置策略的話,所有的任務都提交到一個default隊列。根據它們的提交順序執行。如果有富裕資源,就執行任務;如果資源不富裕,就等待前面的任務執行完畢後釋放資源。在這個時間點Job1提交,它佔用了所有的資源;在它之後不久,Job2提交了,但是此時系統中已經沒有資源可以分配給它了。加入Job1是一個大任務,那麼Job2就只能等待一段很長的時間才能獲得執行的資源。
      • 所以這個先入先出的分配方式存在一個問題就是大任務會佔用很多資源,造成後面的小任務等待時間太長而餓死。因此一般不使用這個默認配置。

* Capacity Scheduler * Capacity Scheduler是一種多租戶,彈性的分配方式。每個租戶一個隊列,每個隊列可以配置能使用的資源上限與下限(譬如50%,達到這個上限後即使其他的資源空置著,也不可使用),通過配置可以令隊列至少有資源下限配置的資源可使用。

      • 圖中的隊列A和隊列B分配了相互獨立的資源。Job1提交給隊列A執行,它只能使用隊列A的資源。不久後,Job2提交給了隊列B,隊列B,此時Job2就不必等待Job1釋放資源了。這樣就可以將大任務和小任務分配在兩個隊列中,這兩個隊列的資源相互獨立,就不會造成小任務餓死的情況了
        • Fair Scheduler

      • 是一種公平的分配方式。所謂的公平就是隊列間會平均地分配資源。它是搶佔式的一種分配方式。 圖中的Job1提交給隊列A,它佔用了集群的所有資源。接著Job2提交給了隊列B。這時Job1就需要釋放它的一半的資源給隊列A中的Job2使用。接著,Job3也提交給了隊列B。這個時候Job2如果還未執行完畢的話,也必須釋放一半的資源給Job3.這就是公平的分配方式,在隊列範圍內,所有任務享用到的資源都是均分的

* 參考地址:hadoop.apache.org/docs/

3、簡述mapreduce在yarn中調度

  • 1、client端會調用resourcemanager,申請執行一個job
  • 2、resourcemanager會給客戶端返回一個hdfs的目錄以及一個application_id號。
  • 3、client端會將切片信息、job的配置信息以及jar包上傳到上一步收到的hdfs目錄下(三個文件分別是:job.split、job.xml、jar包)
  • 4、client請求resourcemanager啟動mrappmaster
  • 5、resourcemanager將client請求初始化成一個task任務,放到執行隊列裡面(默認FIFO),當執行到這個task的時候會給該job分配資源。
  • 6、resourcemanager會找空閑的nodemanager創建一個container容器,並啟動mrappmaster
  • 7、當mrappmaster啟動之後會先將client提交hdfs的資源(job.split、job.xml、jar包)下載到本地
  • 8、mrappmaster根據資源信息的情況請求resourcemanager啟動maptask
  • 9、resourcemanager會為上面的請求找空閑的nodemanager並創建maptask的container
  • 10、mrappmaster將資源發送給各個nodemanager並且啟動上面相應的maptask程序,監控maptask的運行情況(如果maptask掛掉之後,由mrappmaster去處理)。
  • 11、當maptask執行完成後,mrappmaster又會向resourcemanager申請reducetask的資源
  • 12、resourcemanager又會為上面的請求找空閑的nodemanager並創建reducetask的container
  • 13、mrappmaster然後又啟動reducetask任務,並且監控reducetask任務的執行狀況。
  • 14、直到mapreduce的程序執行完成
  • 當mrappmaster掛掉之後,resourcemanager會重新找其他的nodemanager並重新啟動一個新的mrappmaster,所以mrappmaster不存在點單故障問題

4、伏羲調度系統

  • 分散式調度系統需要解決兩個問題:
    • 任務調度:如何將海量數據分片,並在幾千上萬台機器上並行處理,最終匯聚成用戶需要的結果?當並行任務中個別失敗了如何處理?不同任務之間的數據如何傳遞?
    • 資源調度:分散式計算天生就是面向多用戶、多任務的,如何讓多個用戶能夠共享集群資源?如何在多個任務之間調配資源以使得每個任務公平的得到資源?

  • 伏羲調度架構分析:
    • 整個集群包括一台Fuxi Master以及多台Tubo。其中Fuxi Master是集群的中控角色,它負責資源的管理和調度;Tubo是每台機器上都有的一個Agent,它負責管理本台機器上的用戶進程;同時集群中還有一個叫Package Manager的角色,因為用戶的可執行程序以及一些配置需要事先打成一個壓縮包並上傳到Package Manager上,Package Manager專門負責集群中包的分發。
  • 任務執行流程分析
    • 1、客戶端提交任務請求,給Fuxi Master
    • 2、Fuxi Master在一個空閑的節點,啟動一個App Master
    • 3、App Master啟動之後,會發起資源請求,給FuxiMaster, 資源請求協議豐富,避免交互較長
    • 4、FuxiMaster把資源調度情況結果返回給App Master
    • 5、APP Master就知道,在哪些節點啟動 App Worker ,於是App Master通知 Tobo進程, 拉起相應機器的App Worker進程
    • 6、App Worker啟動成功後,回到App Master進程註冊,告訴App Worker 它已經Ready了。
    • 7、App Master於是下發任務給對應的App Worker, 包括App Worker處理的數據分片、存儲位置、以及處理結果存放的地方,這個過程稱之為Instance下發。

  • 任務調度的技術要點
    • 1、數據本地性
    • Instance從本地讀取數據,考慮資源是否均衡
    • 2、數據的Shuffle 傳遞
    • 1、1:1 一對一模式
    • 2、1對N 模式 每個Map 發送給所有個Reduce
    • 3、M:N模式 ,Patition 模式
    • 3、Instance 重試和容錯性 Backup Instance
    • 1、Instance 由於機器掛架導致失敗,可把這個Instance 放到其他機器進行執行
    • 2、由於機器沒掛,但是由於機器硬體老化,導致運行很慢,這種情況叫做「長尾」。
    • 3、解決策略: 為每個App Worker的Instance啟動一個 BackUp Instace, 當App Master發現Insatance出現長尾情況後, 讓BackUp Instance運行, 讓倆個Instance同時運行, 誰先運行完 就算OK。
    • 4、觸發機制: 處理時間遠遠超過了其他Instance平均運行時間, 已經完成Instance比例

  • 資源調度:目標:最大化集群資源利用率,最小化每個任務等待時間,支持資源的配額,支持任務搶佔
  • 1、優先順序
    • 每個作業都有優先順序標籤,priority。
    • 優先順序越高越先調度,相同優先順序按提交時間排序調度
    • 優先分配搞優先順序的JOB, 剩餘分配次優先順序job
  • 2、正在運行的系統,插入高優先順序任務,如何進行搶佔?
    • 優先搶佔優先順序低的任務,去暫停優先順序低的任務,回收資源,分配給緊急任務;如果還滿足不了,那就搶佔優先順序倒數第二的任務的資源,值到高優先順序任務唄滿足執行。是一個遞歸過程

  • 3、優先順序分組
    • 同一個優先順序組內平均分配,按提交時間 先到先得。
  • 4、策略配額 Quota
    • 多個任務組成Group, 按業務區分。每個Group的Job所分配的資源「付費」。
    • 資源共享和資源配額限制。實現動態調節Quota配額。
    • 某個Group沒用為,按比例分給其他Group.

  • 容錯技術:故障恢復要考慮下面四個方面
    • 1、正在運行任務 不中斷
    • 2、對用戶透明
    • 3、自動恢復故障
    • 4、系統恢復時保持可用性
    • 5、安全與性能隔離

推薦閱讀:

相关文章