mapreduce分散式計算框架可以讓應用在集羣中可靠的容錯的,並行處理 ,TB級別的數據。因此我會從架構、流程、數據結構三個維度去羅列介紹。

架構(主從結構)

1、一個ResourceManager主節點

2、每個DataNode上有一個NodeManager從節點

3、每個運行於MapReduce的程序有一個MRAppMaster

流程

1、MapReduce將輸入的數據塊邏輯切片(block)

2、map任務以並行方式處理切片數據

3、框架對map輸出排序,然後將數據發送給reduce

4、MapReduce的輸入輸出數據存在於同一個文件系統(HDFS)

5、框架負責任務調度、任務監控和失敗任務的重新執行

(MRAppMaster)

數據結構

Map : (K1,V1) -> list(K2,V2)

Reduce : (K2,list(V2)) -> list(K3,V3)

1、MapReduce處理鍵值對形式的很多鍵值對輸入,生成鍵值對形式的很多鍵值對輸出

2、框架會對鍵和值序列化,因此鍵類型和值類型需要實現Writable介面。框架會對鍵進行排序,因此必須實現WritableComparable介面。

3、map輸出鍵值對類型和reduce鍵值對輸入類型一致

4、map的輸入鍵值對類型和輸出鍵值對類型一般不一致

5、reduce的輸入鍵值對類型和輸出鍵值對類型一般不一致

MapReduce原語: 若干條指令組成的,用於完成一定功能的一個過程

相同key的鍵值對為一組調用一次reduce方法,方法內迭代這一組數據進行計算

Mapper根據業務需求處理數據並映射為KV模型,並行分散式計算。

Reducer對數據進行全量/分量加工,可以包含不同的key,相同分區的key匯聚到一個Reducer中,調用一次reduce方法,排序和比較實現key的匯聚

MapReduce工作歷程-處理流程分為如下四個階段

split -> map -> shuffle -> reduce ->輸出

map任務處理

1.1 讀取HDFS中的文件。每一行解析成一個<k,v>。每一個鍵值對調用一次map函數。 <0,hello you> <10,hello me>

1.2 覆蓋map(),接收1.1產生的<k,v>,進行處理,轉換為新的<k,v>輸出。      <hello,1> <you,1> <hello,1> <me,1>

1.3 對1.2輸出的<k,v>進行分區。默認分為一個區。

1.4 對不同分區中的數據進行排序(按照k)、分組。分組指的是相同key的value放到一個集合中。

排序後:<hello,1> <hello,1> <me,1> <you,1>

分組後:<hello,{1,1}><me,{1}><you,{1}>

    1. (可選)對分組後的數據進行歸約(combiner)。

每一個map可能會產生大量的輸出,combiner的作用就是在map端對輸出先做一次合併,以減少傳輸到reducer的數據量。

combiner最基本是實現本地key的歸併,combiner具有類似本地的reduce功能。 如果不用combiner,那麼,所有的結果都是reduce完成,效率會相對低下。使用combiner,先完成的map會在本地聚合,提升速度。

注意:Combiner的輸出是Reducer的輸入,Combiner絕不能改變最終的計算結果。通常,Combiner只用於那種Reduce的輸入key/value與輸出key/value類型完全一致,且不影響最終結果的場景。比如累加,最大值等。

  1. reduce任務處理

2.1 多個map任務的輸出,按照不同的分區,通過網路copy到不同的reduce節點上。(shuffle)

2.2 對多個map的輸出進行合併、排序。覆蓋reduce函數,接收的是分組後的數據,實現自己的業務邏輯。

<hello,2> <me,1> <you,1>

處理後,產生新的<k,v>輸出。

2.3 對reduce輸出的<k,v>寫到HDFS中。

shuffle過程分析

Map端:

1、在map端首先接觸的是InputSplit,在InputSplit中含有DataNode中的數據,每一個InputSplit都會分配一個Mapper任務,Mapper任務結束後產生<K2,V2>的輸出,這些輸出先存放在緩存中,每個map有一個環形內存緩衝區,用於存儲任務的輸出。默認大小100MB(io.sort.mb屬性),一旦達到閥值0.8(io.sort.spil l.percent),一個後臺線程就把內容寫到(spill)Linux本地磁碟中的指定目錄(mapred.local.dir)下的新建的一個溢出寫文件。(注意:map過程的輸出是寫入本地磁碟而不是HDFS,但是一開始數據並不是直接寫入磁碟而是緩衝在內存中,緩存的好處就是減少磁碟I/O的開銷,提高合併和排序的速度。又因為默認的內存緩衝大小是100M(當然這個是可以配置的),所以在編寫map函數的時候要盡量減少內存的使用,為shuffle過程預留更多的內存,因為該過程是最耗時的過程。)

2、寫磁碟前,要進行partition、sort和combine等操作。通過分區,將不同類型的數據分開處理,之後對不同分區的數據進行排序,如果有Combiner,還要對排序後的數據進行combine。等最後記錄寫完,將全部溢出文件合併為一個分區且排序的文件。(注意:在寫磁碟的時候採用壓縮的方式將map的輸出結果進行壓縮是一個減少網路開銷很有效的方法!)

3、最後將磁碟中的數據送到Reduce中,從圖中可以看出Map輸出有三個分區,有一個分區數據被送到圖示的Reduce任務中,剩下的兩個分區被送到其他Reducer任務中。而圖示的Reducer任務的其他的三個輸入則來自其他節點的Map輸出。

Reduce端:

1、Copy階段:Reducer通過Http方式得到輸出文件的分區。

reduce端可能從n個map的結果中獲取數據,而這些map的執行速度不盡相同,當其中一個map運行結束時,reduce就會從JobTracker中獲取該信息。map運行結束後TaskTracker會得到消息,進而將消息彙報給  JobTracker,reduce定時從JobTracker獲取該信息,reduce端默認有5個數據複製線程從map端複製數據。

2、Merge階段:如果形成多個磁碟文件會進行合併

從map端複製來的數據首先寫到reduce端的緩存中,同樣緩存佔用到達一定閾值後會將數據寫到磁碟中,同樣會進行partition、combine、排序等過程。如果形成了多個磁碟文件還會進行合併,最後一次合併的結果作為reduce的輸入而不是寫入到磁碟中。

3、Reducer的參數:最後將合併後的結果作為輸入傳入Reduce任務中。(注意:當Reducer的輸入文件確定後,整個Shuffle操作才最終結束。之後就是Reducer的執行了,最後Reducer會把結果存到HDFS上。)

1.block塊切片,默認為1,可以改。交給map

<K是偏移量, V一行記錄> k 默認 longwritable v Text

2.環形緩衝區buffer in memory,默認100M,到了80%向磁碟落,三個小文件歸併為一個大文件,默認快排,對分區和分區內的key進行排序

3.mrappmaster拉數據,相同K為一組,

reducer通過HTTP按照分區號獲取map輸出文件的數據。map端有一個HTTP服務處理該reducer的HTTP請求。該HTTP服務最大線程數由mapreduce.shuffle.Max.threads屬性指定。這個屬性指定nodemanager的線程數,而不是對map任務指定線程數,因為nodemanager上有可能運行了好幾個map任務。默認值是0,表示最大線程數是伺服器處理器核心數的兩倍。

map輸出文件位於運行map任務的本地磁碟。一個reduce任務需要從集羣中多個map任務獲取指定分區的數據。多個map任務有可能是在不同時間完成的,每當一個map任務運行完,reduce就從該map任務獲取指定分區數據。reduce任務會以多線程的方式從多個map任務並行獲取指定分區數據。默認線程數是5,可以通過mapreduce.reduce.shuffle.parallelcopies屬性指定。

reducer拷貝map的輸出如果很小,則放在內存中(mapreduce.reduce.shuffle.input.buffer.percent指定堆空間百分比)否則拷貝到磁碟。當內存緩衝區數據大小達到閾值(mapreduce.reduce.shuffle.merge.percent)或map輸出文件個數達到閾值(mapreduce.reduce.merge.inmem.threshold ),就發生文件合併溢寫到磁碟上。如果指定combiner,此處也會進行combine。

當reducer從所有的map拷貝了分區數據之後,reduce進入到合併階段,合併所有從map拷貝過來的數據。該合併會有多個回合。如:50個文件,合併因子是10(mapreduce.task.io.sort.factor,默認10),則需要5輪,得到5個中間文件,就不再合併。然後直接輸入給reduce階段。給reduce的數據一般是從內存和磁碟數據的混合形式。

二次排序

在map階段按照key對鍵值對進行排序,對值不排序。如果相對value進行排序,就需要二次排序。

1、新的key應該是輸入的key和value的組合

2、按照複合key進行比較排序

3、分區比較器和分組比較器只對複合key中的原生key進行分區和分組

Map:

1、根據業務需求處理數據並映射為KV模型

2、並行分散式

3、計算向數據移動

Reduce:

1、數據全量/分量加工

2、Reducer中可以包含不同的key

3、相同分區的Key匯聚到一個Reducer中

4、 」相同」的Key調用一次reduce方法

5、排序和比較實現key的匯聚

K,V使用自定義數據類型

1、節省開發成本,提高程序自由度

2、框架會對鍵和值序列化,因此鍵類型和值類型需要實現Writable介面。

3、框架會對鍵進行排序,因此必須實現WritableComparable介面。

YARN:解耦資源與計算

RM-HA搭建

有搭建文檔。將客戶端集合在自己內部了,和zk有點區別

ResourceManager

主,核心

集羣節點資源管理 ,。

NodeManager

與RM彙報資源

管理Container生命週期

計算框架中的角色都以Container表示

Container:【節點NM,CPU,MEM,I/O大小,啟動命令】

默認NodeManager啟動線程監控Container大小,超出申請資源額度,kill

支持Linux內核的Cgroup

MR :

MR-ApplicationMaster-Container

作業為單位,避免單點故障,負載到不同的節點

創建Task需要和RM申請資源(Container) Task-Container

Client:

RM-Client:請求資源創建AM

AM-Client:與AM交互

Hadoop 2.0新引入的資源管理系統,直接從MRv1演化而來的;核心思想:將MRv1中JobTracker的資源管理和任務調度兩個功能分開,分別由ResourceManager和ApplicationMaster進程實現 ResourceManager:負責整個集羣的資源管理和調度 ApplicationMaster:負責應用程序相關的事務,比如任務調度、任務監控和容錯等 YARN的引入,使得多個計算框架可運行在一個集羣中 每個應用程序對應一個ApplicationMaster 目前多個計算框架可以運行在YARN上,比如MapReduce、Spark、Storm等

MapReduce On YARN:MRv2

將MapReduce作業直接運行在YARN上,而不是由JobTracker和TaskTracker構建的MRv1系統中

基本功能模塊

YARN:負責資源管理和調度

MRAppMaster:負責任務切分、任務調度、任務監控和容錯等

MapTask/ReduceTask:任務驅動引擎,與MRv1一致

每個MapRduce作業對應一個MRAppMaster

MRAppMaster任務調度

YARN將資源分配給MRAppMaster

MRAppMaster進一步將資源分配給內部的任務 MRAppMaster容錯

失敗後,由YARN重新啟動

任務失敗後,MRAppMaster重新申請資


推薦閱讀:
相關文章