本文将介绍Google 三架马车中的第二架——MapReduce 计算框架

MapReduce 是什么

MapReduce 是一种编程模型,用于大规模数据集(大于1TB)的并行运算。它极大地方便了编程人员将自己的程序运行在分散式系统上。这种编程模型在很长时间以前就出现了,但在以前这个编程模型不是非常火热。当 Google 关于 MapReduce 计算框架的论文发布,紧接著 Hadoop 的开源使这个编程模型变得异常火爆,从而使得大数据计算通用编程成为可能。

MapReduce 即是编程模型,又是计算框架。开发人员需要基于 MapReduce 编程模型去开发分散式程序,然后将程序通过 MapReduce 计算框架分发到 Hadoop 集群中来运行。


网上有一个形象的例子来解释这个编程模型。

我问妻子:「你真的想要弄懂什么是 MapReduce?」 她很坚定地回答说「是的」。 因此我问道:

我: 你是如何准备洋葱辣椒酱的?(以下并非准确食谱,请勿在家尝试)

妻子: 我会取一个洋葱,把它切碎,然后拌入盐和水,最后放进混合研磨机里研磨。这样就能得到洋葱辣椒酱了。

妻子: 但这和 MapReduce 有什么关系?

我: 你等一下。让我来编一个完整的情节,这样你肯定可以在 15 分钟内弄懂 MapReduce。

妻子: 好吧。

我:现在,假设你想用洋葱、番茄、辣椒、大蒜弄一瓶混合辣椒酱。你会怎么做呢?

妻子: 我会取洋葱一个,番茄一个,辣椒一根,大蒜一根,切碎后加入适量的盐和水,再放入混合研磨机里研磨,这样你就可以得到一瓶混合辣椒酱了。

我: 没错,让我们把 MapReduce 的概念应用到食谱上。map 和 reduce 其实是两种操作,我来给你详细讲解下。 Map(映射):把洋葱、番茄、辣椒和大蒜切碎,是作用在这些物体上的一个 map 操作。所以你给 map 一个洋葱,map 就会把洋葱切碎。 同样的,你把辣椒,大蒜和番茄一一地拿给 map,你也会得到各种碎块。 所以,当你在切像洋葱这样的蔬菜时,你执行就是一个map操作。 map 操作适用于每一种蔬菜,它会相应地生产出一种或多种碎块,在我们的例子中生产的是蔬菜块。在 map 操作中可能会出现有个洋葱坏掉了的情况,你只要把坏洋葱丢了就行了。所以,如果出现坏洋葱了,map 操作就会过滤掉坏洋葱而不会生产出任何的坏洋葱块。

Reduce(规约):在这一阶段,你将各种蔬菜碎都放入研磨机里进行研磨,你就可以得到一瓶辣椒酱了。这意味要制成一瓶辣椒酱,你得研磨所有的原料。因此,研磨机通常将 map 操作的蔬菜碎聚集在了一起。

妻子: 所以,这就是 MapReduce?

我: 你可以说是,也可以说不是。 其实这只是 MapReduce 的一部分,MapReduce 的强大在于分散式计算。

妻子: 分散式计算? 那是什么?请给我解释下吧。

我: 没问题。

我: 假设你参加了一个辣椒酱比赛并且你的食谱赢得了最佳辣椒酱奖。得奖之后,辣椒酱食谱大受欢迎,于是你想要开始出售自制品牌的辣椒酱。假设你每天需要生产 10000 瓶辣椒酱,你会怎么办呢?

妻子: 我会找一个能为我大量提供原料的供应商。

我:是的,就是那样。那你能否独自完成制作呢?也就是说,独自将原料都切碎? 仅仅一部研磨机又是否能满足需要?而且现在,我们还需要供应不同种类的辣椒酱,像洋葱辣椒酱、青椒辣椒酱、番茄辣椒酱等等。

妻子: 当然不能了,我会雇佣更多的工人来切蔬菜。我还需要更多的研磨机,这样我就可以更快地生产辣椒酱了。

我:没错,所以现在你就不得不分配工作了,你将需要几个人一起切蔬菜。每个人都要处理满满一袋的蔬菜,而每一个人都相当于在执行一个简单的 map 操作。每一个人都将不断的从袋子里拿出蔬菜来,并且每次只对一种蔬菜进行处理,也就是将它们切碎,直到袋子空了为止。 这样,当所有的工人都切完以后,工作台(每个人工作的地方)上就有了洋葱块、番茄块、和蒜蓉等等。

妻子:但是我怎么会制造出不同种类的番茄酱呢?

我:现在你会看到 MapReduce 遗漏的阶段—搅拌阶段。MapReduce 将所有输出的蔬菜碎都搅拌在了一起,这些蔬菜碎都是在以 key 为基础的 map 操作下产生的。搅拌将自动完成,你可以假设key是一种原料的名字,就像洋葱一样。 所以全部的洋葱 keys 都会搅拌在一起,并转移到研磨洋葱的研磨器里。这样,你就能得到洋葱辣椒酱了。同样地,所有的番茄也会被转移到标记著番茄的研磨器里,并制造出番茄辣椒酱。

以词频计数(WordCount)为例

要计算一个文档中不同词的数量,最简单的方法就是建立一个 Hash 表,然后将每个词放到表中作为 key,遇到一个词就对相应的 value + 1。小数据量统计词频很简单,但是一旦文档非常大,单机的程序就不行了,这时候就可以用 MapReduce 来解决。

Hadoop 中的 MapReduce 示常式序如下。

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}

public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

通过上面的代码我们可以看到 MapReduce 程序的核心是 map 函数与 reduce 函数。map 函数读入 对,在这里相当于对每一行用 StringTokenizer 进行切分并输出成 的形式。Mapreduce 的 shuffle 过程会将所以相同的 收集起来,生成 这样的形式,然后输出给 reduce。reduce 函数读取同一个 key 的数据,并输出 ,这里的 sum 就是所有 1 的和。

map 和 reduce 函数都可以分布到多个节点中运算,map 函数从 HDFS 中读取数据块,reduce 从不同的 map 节点输出中获取数据,从而大大提高计算效率。

下图为 MapReduce 进行词频统计的过程示意图。

MapReduce 的步骤可以总结如下:

Map 任务处理

  1. 读取输入文件内容,将输入文件的每一行解析成 对;
  2. 写自己的逻辑,对输入的 处理,转换成新的 输出;
  3. 对输出的 进行分区(对应不同的 reduce 任务节点);
  4. 对不同分区的数据,按照 key 进行排序、分组,相同 key 的 value 放到一个集合中;
  5. (可选)分组后的数据进行归约。

Reduce 任务处理

  1. 对多个 map 任务的输出,按照不同的分区,通过网路复制到不同的 reduce 节点;
  2. 对多个 map 任务的输出进行合并、排序,根据自己定义的 reduce 函数逻辑,对输入的 处理,转换成新的 输出;
  3. 把 reduce 的输出保存到文件中。

模型是人们对于一类事物的概述和抽象,可以帮助我们更好地理解事物的本质。如 MapReduce 这样的编程模型,可以应对大数据场景的几乎所有计算需求,可见其强大。我们需要不断思考总结,去培养对事物的抽象能力,去探索问题的背后规律,使自己进步。

—— 李智慧

参考文献

  • MapReduce Hadoop 官方文档:hadoop.apache.org/docs/
  • 极客时间大数据专栏
  • MapReduce 论文:static.googleusercontent.com

文|Skye

  • 简书:jianshu.com/u/70b4fd000
  • 知乎专栏:zhuanlan.zhihu.com/skye
  • 个人博客:skye.fun/
  • GitHub:github.com/Skyexu

欢迎关注我的微信公众号


推荐阅读:
相关文章