本文將介紹Google 三架馬車中的第二架——MapReduce 計算框架

MapReduce 是什麼

MapReduce 是一種編程模型,用於大規模數據集(大於1TB)的並行運算。它極大地方便了編程人員將自己的程序運行在分散式系統上。這種編程模型在很長時間以前就出現了,但在以前這個編程模型不是非常火熱。當 Google 關於 MapReduce 計算框架的論文發布,緊接著 Hadoop 的開源使這個編程模型變得異常火爆,從而使得大數據計算通用編程成為可能。

MapReduce 即是編程模型,又是計算框架。開發人員需要基於 MapReduce 編程模型去開發分散式程序,然後將程序通過 MapReduce 計算框架分發到 Hadoop 集群中來運行。


網上有一個形象的例子來解釋這個編程模型。

我問妻子:「你真的想要弄懂什麼是 MapReduce?」 她很堅定地回答說「是的」。 因此我問道:

我: 你是如何準備洋蔥辣椒醬的?(以下並非準確食譜,請勿在家嘗試)

妻子: 我會取一個洋蔥,把它切碎,然後拌入鹽和水,最後放進混合研磨機里研磨。這樣就能得到洋蔥辣椒醬了。

妻子: 但這和 MapReduce 有什麼關係?

我: 你等一下。讓我來編一個完整的情節,這樣你肯定可以在 15 分鐘內弄懂 MapReduce。

妻子: 好吧。

我:現在,假設你想用洋蔥、番茄、辣椒、大蒜弄一瓶混合辣椒醬。你會怎麼做呢?

妻子: 我會取洋蔥一個,番茄一個,辣椒一根,大蒜一根,切碎後加入適量的鹽和水,再放入混合研磨機里研磨,這樣你就可以得到一瓶混合辣椒醬了。

我: 沒錯,讓我們把 MapReduce 的概念應用到食譜上。map 和 reduce 其實是兩種操作,我來給你詳細講解下。 Map(映射):把洋蔥、番茄、辣椒和大蒜切碎,是作用在這些物體上的一個 map 操作。所以你給 map 一個洋蔥,map 就會把洋蔥切碎。 同樣的,你把辣椒,大蒜和番茄一一地拿給 map,你也會得到各種碎塊。 所以,當你在切像洋蔥這樣的蔬菜時,你執行就是一個map操作。 map 操作適用於每一種蔬菜,它會相應地生產出一種或多種碎塊,在我們的例子中生產的是蔬菜塊。在 map 操作中可能會出現有個洋蔥壞掉了的情況,你只要把壞洋蔥丟了就行了。所以,如果出現壞洋蔥了,map 操作就會過濾掉壞洋蔥而不會生產出任何的壞洋蔥塊。

Reduce(規約):在這一階段,你將各種蔬菜碎都放入研磨機里進行研磨,你就可以得到一瓶辣椒醬了。這意味要製成一瓶辣椒醬,你得研磨所有的原料。因此,研磨機通常將 map 操作的蔬菜碎聚集在了一起。

妻子: 所以,這就是 MapReduce?

我: 你可以說是,也可以說不是。 其實這只是 MapReduce 的一部分,MapReduce 的強大在於分散式計算。

妻子: 分散式計算? 那是什麼?請給我解釋下吧。

我: 沒問題。

我: 假設你參加了一個辣椒醬比賽並且你的食譜贏得了最佳辣椒醬獎。得獎之後,辣椒醬食譜大受歡迎,於是你想要開始出售自製品牌的辣椒醬。假設你每天需要生產 10000 瓶辣椒醬,你會怎麼辦呢?

妻子: 我會找一個能為我大量提供原料的供應商。

我:是的,就是那樣。那你能否獨自完成製作呢?也就是說,獨自將原料都切碎? 僅僅一部研磨機又是否能滿足需要?而且現在,我們還需要供應不同種類的辣椒醬,像洋蔥辣椒醬、青椒辣椒醬、番茄辣椒醬等等。

妻子: 當然不能了,我會僱傭更多的工人來切蔬菜。我還需要更多的研磨機,這樣我就可以更快地生產辣椒醬了。

我:沒錯,所以現在你就不得不分配工作了,你將需要幾個人一起切蔬菜。每個人都要處理滿滿一袋的蔬菜,而每一個人都相當於在執行一個簡單的 map 操作。每一個人都將不斷的從袋子里拿出蔬菜來,並且每次只對一種蔬菜進行處理,也就是將它們切碎,直到袋子空了為止。 這樣,當所有的工人都切完以後,工作台(每個人工作的地方)上就有了洋蔥塊、番茄塊、和蒜蓉等等。

妻子:但是我怎麼會製造出不同種類的番茄醬呢?

我:現在你會看到 MapReduce 遺漏的階段—攪拌階段。MapReduce 將所有輸出的蔬菜碎都攪拌在了一起,這些蔬菜碎都是在以 key 為基礎的 map 操作下產生的。攪拌將自動完成,你可以假設key是一種原料的名字,就像洋蔥一樣。 所以全部的洋蔥 keys 都會攪拌在一起,並轉移到研磨洋蔥的研磨器里。這樣,你就能得到洋蔥辣椒醬了。同樣地,所有的番茄也會被轉移到標記著番茄的研磨器里,並製造出番茄辣椒醬。

以詞頻計數(WordCount)為例

要計算一個文檔中不同詞的數量,最簡單的方法就是建立一個 Hash 表,然後將每個詞放到表中作為 key,遇到一個詞就對相應的 value + 1。小數據量統計詞頻很簡單,但是一旦文檔非常大,單機的程序就不行了,這時候就可以用 MapReduce 來解決。

Hadoop 中的 MapReduce 示常式序如下。

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}

public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

通過上面的代碼我們可以看到 MapReduce 程序的核心是 map 函數與 reduce 函數。map 函數讀入 對,在這裡相當於對每一行用 StringTokenizer 進行切分並輸出成 的形式。Mapreduce 的 shuffle 過程會將所以相同的 收集起來,生成 這樣的形式,然後輸出給 reduce。reduce 函數讀取同一個 key 的數據,並輸出 ,這裡的 sum 就是所有 1 的和。

map 和 reduce 函數都可以分布到多個節點中運算,map 函數從 HDFS 中讀取數據塊,reduce 從不同的 map 節點輸出中獲取數據,從而大大提高計算效率。

下圖為 MapReduce 進行詞頻統計的過程示意圖。

MapReduce 的步驟可以總結如下:

Map 任務處理

  1. 讀取輸入文件內容,將輸入文件的每一行解析成 對;
  2. 寫自己的邏輯,對輸入的 處理,轉換成新的 輸出;
  3. 對輸出的 進行分區(對應不同的 reduce 任務節點);
  4. 對不同分區的數據,按照 key 進行排序、分組,相同 key 的 value 放到一個集合中;
  5. (可選)分組後的數據進行歸約。

Reduce 任務處理

  1. 對多個 map 任務的輸出,按照不同的分區,通過網路複製到不同的 reduce 節點;
  2. 對多個 map 任務的輸出進行合併、排序,根據自己定義的 reduce 函數邏輯,對輸入的 處理,轉換成新的 輸出;
  3. 把 reduce 的輸出保存到文件中。

模型是人們對於一類事物的概述和抽象,可以幫助我們更好地理解事物的本質。如 MapReduce 這樣的編程模型,可以應對大數據場景的幾乎所有計算需求,可見其強大。我們需要不斷思考總結,去培養對事物的抽象能力,去探索問題的背後規律,使自己進步。

—— 李智慧

參考文獻

  • MapReduce Hadoop 官方文檔:hadoop.apache.org/docs/
  • 極客時間大數據專欄
  • MapReduce 論文:static.googleusercontent.com

文|Skye

  • 簡書:jianshu.com/u/70b4fd000
  • 知乎專欄:zhuanlan.zhihu.com/skye
  • 個人博客:skye.fun/
  • GitHub:github.com/Skyexu

歡迎關注我的微信公眾號


推薦閱讀:
相关文章