MapReduce 是 Google 提出的一個軟體架構,用於大規模數據集(大於1TB)的並行運算。簡而言之,就是將任務切分成很小的任務然後一個一個區的執行最後匯總,這就像小時候我們老師經常教育我們一樣,大事化小,小事化了(瞬間感覺那時候老師好言簡意賅啊!!!)思想就這麼一個思想,那麼按照這個思想在現代軟體定義一切的世界裡面,我們怎麼運用這樣的方式來解決海量數據的處理,這篇就告訴你一個這樣的一個簡單的實現使用 Go 語言。

上車

簡單介紹一下幾個概念:

概念「Map(映射)」和「Reduce(歸納)」,及他們的主要思想,都是從函數式編程語言借來的,還有從矢量編程語言借來的特性。當前的軟體實現是指定一個 Map(映射)函數,用來把一組鍵值對映射成一組新的鍵值對,指定並發的 Reduce(歸納)函數,用來保證所有映射的鍵值對中的每一個共享相同的鍵組。

以一個例子為簡單的開始:

詞頻的統計(WorldCount),在現實的需求的上面可能我們可能有這樣的一個需求,就是計算出一篇文章裡面出現每個單詞的個數。具體到生活就是,就算 Top N 的結果,比如全校要開表彰大會,找出 10 個好學生這樣的 Top N 這樣的例子比比皆是,而 World Count 就是他的一個實現,只是最終的結果只取出排在前面的結果而已。

有了上面找出 10 個好學生的需求的時候,我們來想想怎麼去實現它呢,很顯然這個需求可能是校長在開會的時候提出來的,那麼具體的實現就是每個年級組長是不是要把每個年級排名前 10 的學生找出來,然後年級組長的領導,將這些信息在匯總取出 前 10 的學生咯,那麼具體的每個年級怎麼做呢?同理,將每個班的前10名學生找出來,然後匯總到年級部門咯。

發車

基本概覽和思路已經明白了,現在開始構建整個 MapReduce 框架了,首先我們明確一個思想就是,將任務劃分成合適的大小,然後對其進行計算,然後將每一步計算的的結果,進行一個匯總合併的過程。那麼這兩個過程我們先分別定義為Map 和Reduce 過程。

還是以 World Count 這個為例子:

Map 的處理過程就是讀取給定的文件,將文件裡面的每個單詞的出現頻率初始化為 1。

Reduce 的處理過程就是將相同的單詞,數據進行一個累加的過程。那麼,我們 MapReduce 框架的目的是調用在合適的時候調用這個 Map 和 Reduce 的過程。

在 common_map.go 裡面 doMap 方法就是給定文件,讀取數據然後,調用 Map 這個過程,代碼裡面有注釋,在這裡進行一個簡單概述一下主要有這幾個步驟:
  1. 讀取文件;
  2. 將讀文件的內容,調用用戶 Map 函數,生產對於的 KeyValue 值;
  3. 最後按照 KeyValue 裡面的 Key 進行分區,將內容寫入到文件裡面,以便於後面的 Reduce 過程執行;

func doMap(
jobName string, // // the name of the MapReduce job
mapTaskNumber int, // which map task this is
inFile string,
nReduce int, // the number of reduce task that will be run
mapF func(file string, contents string) []KeyValue,
) {

//setp 1 read file
contents, err := ioutil.ReadFile(inFile)
if err != nil {
log.Fatal("do map error for inFile ",err)
}
//setp 2 call user user-map method ,to get kv
kvResult := mapF(inFile, string(contents))

/**
* setp 3 use key of kv generator nReduce file ,partition
* a. create tmpFiles
* b. create encoder for tmpFile to write contents
* c. partition by key, then write tmpFile
*/

var tmpFiles [] *os.File = make([] *os.File, nReduce)
var encoders [] *json.Encoder = make([] *json.Encoder, nReduce)

for i := 0; i < nReduce; i++ {
tmpFileName := reduceName(jobName,mapTaskNumber,i)
tmpFiles[i],err = os.Create(tmpFileName)
if err!=nil {
log.Fatal(err)
}

defer tmpFiles[i].Close()
encoders[i] = json.NewEncoder(tmpFiles[i])
if err!=nil {
log.Fatal(err)
}
}

for _ , kv := range kvResult {
hashKey := int(ihash(kv.Key)) % nReduce
err := encoders[hashKey].Encode(&kv)
if err!=nil {
log.Fatal("do map encoders ",err)
}
}
}

doReduce 函數在 common_reduce.go 裡面,主要步驟:

  1. 讀取 doMap 過程中產生的中間文件;
  2. 按照讀取相同文件中的 Key 進新按照字典順序進行排序;
  3. 遍歷讀取的 KeyValue,並且調用用戶的 Reduce 方法,將計算的結果繼續寫入到文件中;

func doReduce(
jobName string, // the name of the whole MapReduce job
reduceTaskNumber int, // which reduce task this is
nMap int, // the number of map tasks that were run ("M" in the paper)
reduceF func(key string, values []string) string,
) {

// file.Close()

//setp 1,read map generator file ,same key merge put map[string][]string

kvs := make(map[string][]string)

for i := 0; i < nMap; i++ {
fileName := reduceName(jobName, i, reduceTaskNumber)
file, err := os.Open(fileName)
if err != nil {
log.Fatal("doReduce1: ", err)
}

dec := json.NewDecoder(file)

for {
var kv KeyValue
err = dec.Decode(&kv)
if err != nil {
break
}

_, ok := kvs[kv.Key]
if !ok {
kvs[kv.Key] = []string{}
}
kvs[kv.Key] = append(kvs[kv.Key], kv.Value)
}
file.Close()
}

var keys []string

for k := range kvs {
keys = append(keys, k)
}

//setp 2 sort by keys
sort.Strings(keys)

//setp 3 create result file
p := mergeName(jobName, reduceTaskNumber)
file, err := os.Create(p)
if err != nil {
log.Fatal("doReduce2: ceate ", err)
}
enc := json.NewEncoder(file)

//setp 4 call user reduce each key of kvs
for _, k := range keys {
res := reduceF(k, kvs[k])
enc.Encode(KeyValue{k, res})
}

file.Close()
}

Merge 過程

當然最後就是將每個 Reduce 產生的結果進行一個Merge 的過程,在 merge 的過程中,同樣也是需要進行按照 Key 進行字典順序排列,然後寫入到最終的文件中。代碼跟 reduce 還是相似的,這裡就不自愛贅述了。

使用 go 的多線程來實現分散式的任務執行,這裡主要是是 schedule.go 裡面的 schedule 方法,主要是步驟:

  1. 通過不同的階段( Map or Reduce ),獲取到需要執行多少個 map (reduce),然後調用遠程的 worker.go 裡面的 DoTask 方法;
  2. 等待所有的任務完成,然後才結束。這裡主要使用了go 語言的一些特性,Go RPC documentation 和 Concurrency in Go。

func (mr *Master) schedule(phase jobPhase) {
var ntasks int
var nios int // number of inputs (for reduce) or outputs (for map)
switch phase {
case mapPhase:
ntasks = len(mr.files)
nios = mr.nReduce
case reducePhase:
ntasks = mr.nReduce
nios = len(mr.files)
}

fmt.Printf("Schedule: %v %v tasks (%d I/Os)
", ntasks, phase, nios)

//use go routing,worker rpc executor task,
done := make(chan bool)
for i := 0; i < ntasks; i++ {
go func(number int) {

args := DoTaskArgs{mr.jobName, mr.files[ntasks], phase, number, nios}
var worker string
reply := new(struct{})
ok := false
for ok != true {
worker = <- mr.registerChannel
ok = call(worker, "Worker.DoTask", args, reply)
}
done <- true
mr.registerChannel <- worker
}(i)

}

//wait for all task is complate
for i := 0; i< ntasks; i++ {
<- done
}
fmt.Printf("Schedule: %v phase done
", phase)
}

原文鏈接:github.com/happyer/dist

Go語言內幕五篇

  1. Go語言內幕(1):主要概念與項目結構
  2. Go語言內幕(2):深入 Go 編譯器
  3. Go語言內幕(3):鏈接器、鏈接器、重定位
  4. Go語言內幕(4):目標文件和函數元數據
  5. Go語言內幕(5):運行時啟動過程

Go語言主題推薦

  • 用 Go 語言實現 Raft 分散式一致性協議
  • Go語言TCP Socket編程
  • GoLang之Concurrency順序管道模式
  • Golang的包管理之道
  • 優秀的 Go 存儲開源項目和庫推薦
  • golang 核心開發者 Dmitry Vyukov(1.1 調度器作者) 關於性能剖析
  • 使用Golang實現的無鎖隊列,性能與Disruptor相當達到1400萬/秒
  • Golang的包管理之道
  • Go語言並發之美
  • 用Go構建Teamwork項目的9條教訓

加入區塊鏈交流羣請加微信:isLishude


推薦閱讀:
相關文章