一起學Hadoop——第一個MapReduce程序 上一篇我們學習了MapReduce的原理,今天我們使用代碼來加深對MapReduce原理的理解。wordcount是Hadoop入門的經典例子,我們也不能免俗,也使用這個例子作為學習Hadoop的第一個程序。本文將介紹使用java和python編寫第一個MapReduce程序。本文使用Idea2018開發工具開發第一個Hadoop程序。使用的編程語言是Java。打開idea,新建一個工程,如下圖所示: 在彈出新建工程的界面選擇Java,接著選擇SDK,一般默認即可,點擊「Next」按鈕,如下圖: 在彈出的選擇創建項目的模板頁面,不做任何操作,直接點擊「Next」按鈕。 輸入項目名稱,點擊Finish,就完成了創建新項目的工作,我們的項目名稱為:WordCount。如下圖所示: 添加依賴jar包,和Eclipse一樣,要給項目添加相關依賴包,否則會出錯。點擊Idea的File菜單,然後點擊「Project Structure」菜單,如下圖所示: 依次點擊Modules和Dependencies,然後選擇「+」的符號,如下圖所示: 選擇hadoop的包,我用得是hadoop2.6.1。把下面的依賴包都加入到工程中,否則會傳下某個類找不到的錯誤。(1)」/usr/local/hadoop/share/hadoop/common」目錄下的hadoop-common-2.6.1.jar和haoop-nfs-2.6.1.jar; (2)/usr/local/hadoop/share/hadoop/common/lib」目錄下的所有JAR包; (3)「/usr/local/hadoop/share/hadoop/hdfs」目錄下的haoop-hdfs-2.6.1.jar和haoop-hdfs-nfs-2.7.1.jar; (4)「/usr/local/hadoop/share/hadoop/hdfs/lib」目錄下的所有JAR包。 工程已經創建好,我們開始編寫Map類、Reduce類和運行MapReduce的入口類:Map類如下:import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WordcountMap extends Mapper<LongWritable,Text,Text,IntWritable> { public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException{ String line = value.toString();//讀取一行數據 String str[] = line.split("");//因為英文字母是以「 」為間隔的,因此使用「 」分隔符將一行數據切成多個單詞並存在數組中 for(String s :str){//循環迭代字元串,將一個單詞變成<key,value>形式,及<"hello",1> context.write(new Text(s),new IntWritable(1)); } } } Reudce類:import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.io.Text; import java.io.IOException; public class WordcountReduce extends Reducer<Text,IntWritable,Text,IntWritable> { public void reduce(Text key, Iterable<IntWritable> values,Context context)throws IOException,InterruptedException{ int count = 0; for(IntWritable value: values) { count++; } context.write(key,new IntWritable(count)); } } 入口類 :import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; public class WordCount { public static void main(String[] args)throws Exception{ Configuration conf = new Configuration(); //獲取運行時輸入的參數,一般是通過shell腳本文件傳進來。 String [] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs(); if(otherArgs.length < 2){ System.err.println("必須輸入讀取文件路徑和輸出路徑"); System.exit(2); } Job job = new Job(); job.setJarByClass(WordCount.class); job.setJobName("wordcount app"); //設置讀取文件的路徑,都是從HDFS中讀取。讀取文件路徑從腳本文件中傳進來 FileInputFormat.addInputPath(job,new Path(args[0])); //設置mapreduce程序的輸出路徑,MapReduce的結果都是輸入到文件中 FileOutputFormat.setOutputPath(job,new Path(args[1])); //設置實現了map函數的類 job.setMapperClass(WordcountMap.class); //設置實現了reduce函數的類 job.setReducerClass(WordcountReduce.class); //設置reduce函數的key值 job.setOutputKeyClass(Text.class); //設置reduce函數的value值 job.setOutputValueClass(IntWritable.class); System.exit(job.waitForCompletion(true) ? 0 :1); } } 代碼寫好之後,開始jar包,按照下圖打包。點擊「File」,然後點擊「Project Structure」,彈出如下的界面, 依次點擊"Artifacts" -> "+" -> "JAR" -> "From modules with dependencies",然後彈出一個選擇入口類的界面,選擇剛剛寫好的WordCount類,如下圖: 按照上面設置好之後,就開始打jar包,如下圖: 點擊上圖的「Build」之後就會生成一個jar包。jar的位置看下圖,依次點擊File->Project Structure->Artifacts就會看到如下的界面: 將打好包的wordcount.jar文件上傳到裝有hadoop集群的機器中,然後創建shell文件,shell文件內容如下,/usr/local/src/hadoop-2.6.1是hadoop集群中hadoop的安裝位置,/usr/local/src/hadoop-2.6.1/bin/hadoop jar wordcount.jar #執行jar文件的命令以及jar文件名, hdfs://hadoop-master:8020/data/english.txt #輸入路徑 hdfs://hadoop-master:8020/wordcount_output #輸出路徑 執行shell文件之後,會看到如下的信息, 上圖中數字1表示輸入分片split的數量,數字2表示map和reduce的進度,數字3表示mapreduce執行成功,數字4表示啟動多少個map任務,數字5表示啟動多少個reduce任務。自行成功後在hadoop集群中的hdfs文件系統中會看到一個wordcount_output的文件夾。使用「hadoop fs -ls /」命令查看: 在wordcount_output文件夾中有兩個文件,分別是_SUCCESS和part-r-00000,part-r-00000記錄著mapreduce的執行結果,使用hadoop fs -cat /wordcount_output/part-r-00000查看part-r-00000的內容: 可以每個英文單詞出現的次數。至此,藉助idea 2018工具開發第一個使用java語言編寫的mapreduce程序已經成功執行。下面介紹使用python語言編寫的第一個mapreduce程序,相對於java,python編寫mapreduce會簡單很多,因為hadoop提供streaming,streaming是使用Unix標準流作為Hadoop和應用程序之間的介面,所以可以使用任何語言通過標準輸入輸出來寫MapReduce程序。下面介紹使用Python編寫MapReduce看代碼:實現了map函數的python程序,命名為map.py:#!/usr/local/bin/python import sys #導入sys包 for line in sys.stdin: #從標準輸入中讀取數據 ss = line.strip().split( )#讀取每一行數據,strip()函數過濾掉空格換行的字元,split( )分隔出每個額單詞並存放在數組ss中 for s in ss: #讀取數組ss中的每個單詞 if s.strip() != "": print "%s %s" % (s, 1)#構造以單詞為key,1為value的鍵值對,並寫入到標準輸出中。 實現了reduce函數的python程序,命名為reduce.py:import sys cur_word = None sum = 0 for line in sys.stdin: ss = line.strip().split( )#從標準輸入中讀取數據。 if len(ss) != 2: continue word,cnt = ss if cur_word == None: cur_word = word #因為從map流轉到reduce的數據時按照key排好序的,cur_word記錄的是上一個單詞,word記 #錄的是當前讀取的單詞,如果兩個單詞一致,則將sum+1,否則將word和sum值組成一個鍵值對, ##寫入到標準輸出,同時sum賦值為0,並且將word賦值給cur_word變數。 if cur_word != word: print .join([cur_word,str(sum)]) cur_word = word sum = 0 sum += int(cnt) print .join([cur_word,str(sum)]) map和reduce程序=已經編寫完畢,下面編寫shell腳本文件:HADOOP_CMD="/usr/local/src/hadoop-2.6.1/bin/hadoop" STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar " INPUT_FILE_PATH_1="/data/english.txt"#輸入路徑 OUTPUT_PATH="/wordcount_output"#輸出路徑 $HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH#每次執行時都刪除輸出路徑,否則會出錯 $HADOOP_CMD jar $STREAM_JAR_PATH -input $INPUT_FILE_PATH_1 #指定輸入路徑 -output $OUTPUT_PATH #指定輸出路徑 -mapper "python map.py" #指定要執行的map程序 -reducer "python reduce.py" #指定要執行reduce程序 -file ./map.py #指定map程序所在的位置 -file ./reduce.py#指定reduce程序所在的位置 至此,使用Java和Python編寫第一個Hadoop MapReduce程序已經全部搞定,如果有對的地方歡迎指正。 推薦閱讀: 相关文章 {{#data}} {{title}} {{/data}}