MapReduce應用案例

這是一個在我的Github上開源的基於MapReduce,以數據清洗為背景的案常式序分析項目。

目的是為了能熟練運用MapReduce程序來進行數據清洗工作。歡迎大家star或fork

環境說明

Hadoop搭建環境:

虛擬機操作系統: CentOS6.3 64位,單核,1G內存

JDK:1.7.0_60 64位

Hadoop:2.4.1

MR程序編譯環境:

Eclipse IED

mapred.LocalJobRunner本地運行模式

準備測試數據

測試數據包括兩個文件dept(部門)和emp(員工),其中各欄位用逗號分隔:

dept文件內容:

10,ACCOUNTING,NEW YORK

20,RESEARCH,DALLAS 30,SALES,CHICAGO 40,OPERATIONS,BOSTON

emp文件內容:

7369,SMITH,CLERK,7902,17-12月-80,800,,20

7499,ALLEN,SALESMAN,7698,20-2月 -81,1600,300,30 7521,WARD,SALESMAN,7698,22-2月 -81,1250,500,30 7566,JONES,MANAGER,7839,02-4月 -81,2975,,20 7654,MARTIN,SALESMAN,7698,28-9月 -81,1250,1400,30 7698,BLAKE,MANAGER,7839,01-5月 -81,2850,,30 7782,CLARK,MANAGER,7839,09-6月 -81,2450,,10 7839,KING,PRESIDENT,,17-11月-81,5000,,10 7844,TURNER,SALESMAN,7698,08-9月 -81,1500,0,30 7900,JAMES,CLERK,7698,03-12月-81,950,,30

7902,FORD,ANALYST,7566,03-12月-81,3000,,20

7934,MILLER,CLERK,7782,23-1月 -82,1300,,10

應用案例

例子1:求各個部門的總工資

問題分析

MapReduce中的join分為好幾種,比如有最常見的 reduce side join、map side join和semi join 等。reduce join 在shuffle階段要進行大量的數據傳輸,會造成大量的網路IO效率低下,而map side join 在處理多個小表關聯大表時非常有用 。

Map side join是針對以下場景進行的優化:兩個待連接表中,有一個表非常大,而另一個表非常小,以至於小表可以直接存放到內存中。這樣我們可以將小表複製多份,讓每個map task內存中存在一份(比如存放到hash table中),然後只掃描大表:對於大表中的每一條記錄key/value,在hash table中查找是否有相同的key的記錄,如果有,則連接後輸出即可。為了支持文件的複製,Hadoop提供了一個類DistributedCache,使用該類的方法如下:

(1)用戶使用靜態方法`DistributedCache.addCacheFile()`指定要複製的文件,它的參數是文件的URI(如果是HDFS上的文件,可以這樣:`hdfs://jobtracker:50030/home/XXX/file`)。JobTracker在作業啟動之前會獲取這個URI列表,並將相應的文件拷貝到各個TaskTracker的本地磁碟上。

(2)用戶使用:在分散式環境`DistributedCache.getLocalCacheFiles()`/在偽分散式環境`DistributedCache.getCacheFiles()`方法獲取文件目錄,並使用標準的文件讀寫API讀取相應的文件。

在下面代碼中,將會把數據量小的表(部門dept)緩存在內存中,在Mapper階段對員工部門編號映射成部門名稱,該名稱作為key輸出到Reduce中,在Reduce中計算按照部門計算各個部門的總工資。

處理流程圖

求各個部門的總工資處理流程圖

源代碼

SumDeptSalary.java


例子2:求各個部門的人數和平均工資

問題分析

求各個部門的人數和平均工資,需要得到各部門工資總數和部門人數,通過兩者相除獲取各部門平均工資。首先和問題1類似在Mapper的Setup階段緩存部門數據,然後在Mapper階段抽取出部門編號和員工工資,利用緩存部門數據把部門編號對應為部門名稱,接著在Shuffle階段把傳過來的數據處理為部門名稱對應該部門所有員工工資的列表,最後在Reduce中按照部門歸組,遍歷部門所有員工,求出總數和員工數,輸出部門名稱和平均工資。

處理流程圖

求各個部門的人數和平均工資處理流程圖

源代碼

DeptNumberAveSalary.java


例子3:求每個部門最早進入公司的員工姓名

問題分析

求每個部門最早進入公司員工姓名,需要得到各部門所有員工的進入公司日期,通過比較獲取最早進入公司員工姓名。首先和問題1類似在Mapper的Setup階段緩存部門數據,然後Mapper階段抽取出key為部門名稱(利用緩存部門數據把部門編號對應為部門名稱),value為員工姓名和進入公司日期,接著在Shuffle階段把傳過來的數據處理為部門名稱對應該部門所有員工+進入公司日期的列表,最後在Reduce中按照部門歸組,遍歷部門所有員工,找出最早進入公司的員工並輸出。

處理流程圖

求每個部門最早進入公司的員工姓名處理流程圖

源代碼

DeptEarliestEmp.java


例子4:求各個城市的員工的總工資

問題分析

求各個城市員工的總工資,需要得到各個城市所有員工的工資,通過對各個城市所有員工工資求和得到總工資。首先和測試例子1類似在Mapper的Setup階段緩存部門對應所在城市數據,然後在Mapper階段抽取出key為城市名稱(利用緩存數據把部門編號對應為所在城市名稱),value為員工工資,接著在Shuffle階段把傳過來的數據處理為城市名稱對應該城市所有員工工資,最後在Reduce中按照城市歸組,遍歷城市所有員工,求出工資總數並輸出。

處理流程圖

求各個城市的員工的總工資處理流程圖

源代碼

SumCitySalary.java


例子5:列出工資比上司高的員工姓名及其工資

問題分析

求工資比上司高的員工姓名及工資,需要得到上司工資及上司所有下屬員工,通過比較他們工資高低得到比上司工資高的員工。在Mapper階段輸出經理數據和員工對應經理表數據,其中經理數據key為員工編號、value為"M,該員工工資",員工對應經理表數據key為經理編號、value為"E,該員工姓名,該員工工資";然後在Shuffle階段把傳過來的經理數據和員工對應經理表數據進行歸組,如編號為7698員工,value中標誌M為自己工資,value中標誌E為其下屬姓名及工資;最後在Reduce中遍歷比較員工與經理工資高低,輸出工資高於經理的員工。

處理流程圖

列出工資比上司高的員工姓名及其工資處理流程圖

源代碼

EarnMoreThanManager.java


例子6:列出工資比公司平均工資要高的員工姓名及其工資

問題分析

求工資比公司平均工資要高的員工姓名及工資,需要得到公司的平均工資和所有員工工資,通過比較得出工資比平均工資高的員工姓名及工資。這個問題可以分兩個作業進行解決,先求出公司的平均工資,然後與所有員工進行比較得到結果;也可以在一個作業進行解決,這裡就得使用作業setNumReduceTasks方法,設置Reduce任務數為1,保證每次運行一個reduce任務,在該例子中,只需要一個reduce任務就可以處理完數據,從而能先求出平均工資,然後進行比較得出結果。

在Mapper階段輸出兩份所有員工數據,其中一份key為0、value為該員工工資,另外一份key為1、value為"該員工姓名 ,員工工資";然後在Shuffle階段把傳過來數據按照key進行歸組,在該任務中有key值為0和1兩組數據;最後在Reduce中對key值0的所有員工求工資總數和員工數,獲得平均工資;對key值1,比較員工與平均工資的大小,輸出比平均工資高的員工和對應的工資。

處理流程圖

列出工資比公司平均工資要高的員工姓名及其工資處理流程圖

源代碼

HigherThanAveSalary.java


例子7:列出名字以J開頭的員工姓名及其所屬部門名稱

問題分析

求名字以J開頭的員工姓名機器所屬部門名稱,只需判斷員工姓名是否以J開頭。首先和問題1類似在Mapper的Setup階段緩存部門數據,然後在Mapper階段判斷員工姓名是否以J開頭,如果是抽取出員工姓名和員工所在部門編號,利用緩存部門數據把部門編號對應為部門名稱,轉換後輸出結果。

處理流程圖

列出名字以J開頭的員工姓名及其所屬部門名稱處理流程圖

源代碼

NameDeptOfStartJ.java


例子8:列出工資最高的頭三名員工姓名及其工資

問題分析

求工資最高的頭三名員工姓名及工資,可以通過冒泡法得到。在Mapper階段輸出經理數據和員工對應經理表數據,其中經理數據key為0值、value為"員工姓名,員工工資";最後在Reduce中通過冒泡法遍歷所有員工,比較員工工資多少,求出前三名。

處理流程圖

列出工資最高的頭三名員工姓名及其工資處理流程圖

源代碼

SalaryTop3Salary.java


例子9:將全體員工按照總收入(工資+提成)從高到低排列

問題分析

求全體員工總收入降序排列,獲得所有員工總收入並降序排列即可。在Mapper階段輸出所有員工總工資數據,其中key為員工總工資、value為員工姓名,在Mapper階段的最後會先調用job.setPartitionerClass對數據進行分區,每個分區映射到一個reducer,每個分區內又調用job.setSortComparatorClass設置的key比較函數類排序。由於在本作業中Map的key只有0值,故能實現對所有數據進行排序。

處理流程圖

將全體員工按照總收入(工資+提成)從高到低排列處理流程圖

源代碼

EmpSalarySort.java


例子10:求任何兩名員工信息傳遞所需要經過的中間節點數

問題分析

該公司所有員工可以形成入下圖的樹形結構,求兩個員工的溝通的中間節點數,可轉換在員工樹中求兩個節點連通所經過的節點數,即從其中一節點到匯合節點經過節點數加上另一節點到匯合節點經過節點數。例如求M到Q所需節點數,可以先找出M到A經過的節點數,然後找出Q到A經過的節點數,兩者相加得到M到Q所需節點數。

求M到Q所需節點數

在作業中首先在Mapper階段所有員工數據,其中經理數據key為0值、value為"員工編號,員工經理編號",然後在Reduce階段把所有員工放到員工列表和員工對應經理鏈表Map中,最後在Reduce的Cleanup中按照上面說所演算法對任意兩個員工計算出溝通的路徑長度並輸出。

處理流程圖

求任何兩名員工信息傳遞所需要經過的中間節點數處理流程圖

源代碼

MiddlePersonsCountForComm.java


推薦閱讀:
相關文章