手動實現一個單詞統計MapReduce程序與過程原理分析
阿新 • • 發佈:2018-03-04
Hadoop MapReduce Java [toc]
手動實現一個單詞統計MapReduce程序與過程原理分析
前言
我們知道,在搭建好hadoop環境後,可以運行wordcount程序來體驗一下hadoop的功能,該程序在hadoop目錄下的share/hadoop/mapreduce
目錄中,通過下面的命令:
yarn jar $HADOOP_HOME/share/hadoop/mapreducehadoop-mapreduce-examples-2.6.4.jar wordcount inputPath outPath
即可對輸入文件執行單詞統計的計算。
那麽下面就通過手動寫一個wordcount的例子來加深對MapReduce的基本理解。
案例場景
假如有下面一個文本文件需要進行單詞統計:
$ cat hello
hello you
hello he
hello me
Note:該hello文件為李老師的經典文本文件。
下面就來演示MapReduce程序如何來對該文本文件進行計算,最後再依據此寫一個wordcount程序。
MapReduce計算分析
我們先來簡單分析一下MapReduce是如何處理上面的文本文件,然後才寫一個程序。
對於上面的一個文本文件,MapReduce程序分三個步驟進行處理:Map階段、Shuffle階段和Reduce階段。(三個階段的分析在代碼的註釋中也是非常詳細的解釋)
Map階段
上面的文本文件經過Map處理後會得到類似下面的結果:
<hello, 1>
<heelo, 1>
<hello, 1>
<you, 1>
<he, 1>
<me, 1>
shuffle階段
對Map階段的結果進行處理,會得到如下的結果:
<hello, [1, 1, 1]>
<you, [1]>
<he, [1]>
<me, [1]>
Reduce階段
經過reducer處理之後,結果如下:
<hello, 3>
<you, 1>
<he, 1>
<me, 1>
關於上面的過程分析,可以參考下面的幾張圖示以幫助理解:
圖示1:
圖示2:
圖示3:
程序思路分析
* 整個的解題思路,使用map函數進行單詞的拆分,使用reduce函數進行匯總,中間進行shuffle
* 要想讓我們的map函數和reduce函數進行接替運行,需要一個驅動程序
* 代碼的思路:
* 1、編寫一個類繼承Mapper,成為自定義的Mapper,主要業務邏輯就是復寫其中的map函數
* map
* 首先要確定清楚Mapper類或者map函數的數據類型/類型參數--->泛型
* Mapper<K1, V1, K2, V2>
* 2、編寫一個類繼承Reducer,成為自定義的Reducer,主要業務邏輯就是復寫其中的reduce函數
* reduce
* 首先要確定清楚Reducer類或者reduce函數它的數據類型/類型參數--->泛型
* Reducer<K2, V2s, K3, V3>
*
* 需要我們用戶自定義的類型就是K2, V2, K3, V3
* K1和V1一般情況下是固定的,只要數據格式確定,其類型就確定
* 比如我們操作的是普通的文本文件,那麽K1=LongWritable,V1=Text
* K1--->代表的是這一行記錄在整個文本中的偏移量,V1就是這一行文本的內容
* (也就是說,K1和V1取決於我們要處理的是什麽文件)
* 註意:與Hadoop的程序需要使用Hadoop提供的數據類型,而不能使用java中提供的數據類型
wordcount程序
程序代碼中有非常詳細的註釋,可以參考來進行理解。
WordCount.java
package com.uplooking.bigdata.mr.wc;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**
* 統計hdfs://uplooking01:9000/input/mr/hello的單詞出現次數
*
* 整個的解題思路,使用map函數進行單詞的拆分,使用reduce函數進行匯總,中間進行shuffle
* 要想讓我們的map函數和reduce函數進行接替運行,需要一個驅動程序
* 代碼的思路:
* 1、編寫一個類繼承Mapper,成為自定義的Mapper,主要業務邏輯就是復寫其中的map函數
* map
* 首先要確定清楚Mapper類或者map函數的數據類型/類型參數--->泛型
* Mapper<K1, V1, K2, V2>
* 2、編寫一個類繼承Reducer,成為自定義的Reducer,主要業務邏輯就是復寫其中的reduce函數
* reduce
* 首先要確定清楚Reducer類或者reduce函數它的數據類型/類型參數--->泛型
* Reducer<K2, V2s, K3, V3>
*
* 需要我們用戶自定義的類型就是K2, V2, K3, V3
* K1和V1一般情況下是固定的,只要數據格式確定,其類型就確定
* 比如我們操作的是普通的文本文件,那麽K1=LongWritable,V1=Text
* K1--->代表的是這一行記錄在整個文本中的偏移量,V1就是這一行文本的內容
* (也就是說,K1和V1取決於我們要處理的是什麽文件)
* 註意:與Hadoop的程序需要使用Hadoop提供的數據類型,而不能使用java中提供的數據類型
*/
public class WordCount {
/**
* 這裏的main函數就是用來組織map函數和reduce函數的
* 最終mr的運行會轉變成一個個的Job
*
* @param args
*/
public static void main(String[] args) throws Exception {
// 構建Job所需的配置文件和jobName
Configuration configuration = new Configuration();
String jobName = "wordcount";
// 1.創建一個job
Job job = Job.getInstance(configuration, jobName);
// 添加mr要運行的主函數所在的類,就是WordCount這個類
job.setJarByClass(WordCount.class);
// 2.設置mr的輸入參數
// 設置計算的文件
Path inputPath = new Path("hdfs://uplooking01:9000/input/mr/hello");
FileInputFormat.setInputPaths(job, inputPath);
// 指定解析數據源的Format類,即將輸入的數據解析為<K1, V1>的形式,然後再交由mapper函數處理
job.setInputFormatClass(TextInputFormat.class);
// 指定使用哪個mapper來進行計算
job.setMapperClass(WordCountMapper.class);
// 指定mapper結果的key的數據類型(即K2的數據類型),註意要與我們寫的Mapper中定義的一致
job.setMapOutputKeyClass(Text.class);
// 指定mapper結果的value的數據類型(即V2的數據類型),註意要與我們寫的Mapper中定義的一致
job.setMapOutputValueClass(IntWritable.class);
// 3.設置mr的輸出參數
// 設置輸出的目錄
Path outputPath = new Path("hdfs://uplooking01:9000/output/mr/wc");
// 如果outputPath目錄存在,會拋出目錄存在異常,這裏先刪除,保證該目錄不存在
outputPath.getFileSystem(configuration).delete(outputPath, true);
FileOutputFormat.setOutputPath(job, outputPath);
// 指定格式化數據結果的Format類
job.setOutputFormatClass(TextOutputFormat.class);
// 指定使用哪個reducer來進行匯總
job.setReducerClass(WordCountReducer.class);
// 指定reduce結果的key的數據類型(即K3的數據類型),註意要與我們寫的Reducer中定義的一致
job.setOutputKeyClass(Text.class);
// 指定reduce結果的value的數據類型(即V3的數據類型),註意要與我們寫的Reducer中定義的一致
job.setOutputValueClass(IntWritable.class);
// 設置有幾個reducer來執行mr程序,默認為1個
job.setNumReduceTasks(1);
// 提交mapreduce job
job.waitForCompletion(true);
}
}
WordCountMapper.java
package com.uplooking.bigdata.mr.wc;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* 1、編寫一個類繼承Mapper,成為自定義的Mapper,主要業務邏輯就是復寫其中的map函數
* map
* 首先要確定清楚Mapper類或者map函數的數據類型/類型參數--->泛型
* Mapper<K1, V1, K2, V2>
* K1:行的偏移量,如第998行
* V1:行的內容,如 hello you
* K2:輸出的數據的key值,如hello
* V2:輸出的數據的value值,如1
* 註意,為了減少在網絡中傳輸的數據,map之後得到的結果還需要進行shuffle,將相同key的value匯總起來:
* 如:
* map後的結果有:<hello, 1>, <hello, 1>, <hello, 1>, <you, 1>, <he, 1>, <me, 1>
* shuffle後的結果為:<hello, [1, 1, 1]>, <you, [1]>, <he, [1]>, <me, [1]>
* 這樣相比原來map的結果,數據的量就少了許多
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
// 先將每一行轉換為java的String類型
String line = v1.toString();
// 將行中的單詞以空格作為分隔符分離出來得到一個字符串數組
String[] words = line.split(" ");
// 定義輸出數據的變量k2和v2,類型分別為Text和IntWritable
Text k2 = null;
IntWritable v2 = null;
// 統計單詞並寫入到上下文變量context中
for (String word : words) {
k2 = new Text(word);
v2 = new IntWritable(1);
context.write(k2, v2);
}
}
}
WordCountReducer.java
package com.uplooking.bigdata.mr.wc;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* 2、編寫一個類繼承Reducer,成為自定義的Reducer,主要業務邏輯就是復寫其中的reduce函數
* reduce
* 首先要確定清楚Reducer類或者reduce函數它的數據類型/類型參數--->泛型
* Reducer<K2, V2s, K3, V3>
* K2:map輸出中的key值
* V2s:map輸出中根據本周key值shuffle後得到的可叠代列表
* 如:<hello, [1, 1, 1]>, <you, [1]>, <he, [1]>, <me, [1]>
* K3:reduce輸出中的key值
* V3:reduce輸出中的value值
*/
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text k2, Iterable<IntWritable> v2s, Context context) throws IOException, InterruptedException {
// 定義某個key值k2出現次數的變量
int sum = 0;
// 統計k2孤個數
for (IntWritable item : v2s) {
sum += item.get();
}
// 構建reduce輸出的k3和v3,類型分別為Text和IntWritable
Text k3 = k2;
IntWritable v3 = new IntWritable(sum);
// 結果reduce結果寫入到上下文變量context中
context.write(k2, v3);
}
}
測試
將上面的程序打包成jar包,然後上傳到我們的hadoop服務器上,執行下面的命令:
yarn jar wordcount.jar com.uplooking.bigdata.mr.wc.WordCount
這樣就可以使用在hadoop中使用我們自己寫的wodcount程序來進行MapReduce的計算。
任務執行結束後,通過下面的命令查看結果:
$ hdfs dfs -cat /output/mr/wc/part-r-00000
18/03/03 13:59:55 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
he 1
hello 3
me 1
you 1
這樣就完成了從編寫MR程序到測試的完整過程。
手動實現一個單詞統計MapReduce程序與過程原理分析