MapReduce 實現統計單數出現次數
阿新 • • 發佈:2021-10-11
工程配置
- 在 windows 中配置 hadoop 及環境變數 HADOOP_
下載 winutils.exe 放入 bin目錄中
https://github.com/cdarlint/winutils - 建立 maven 工程 引入依賴
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.10.1</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j</artifactId> <version>2.14.1</version> </dependency>
- 編寫 log4j 配置檔案
參考:https://www.cnblogs.com/orginly/p/14847470.html
整體思路
仿照原始碼
Map 階段
- map()方法中把傳入的資料轉為 String 型別
- 根據空格切分出單詞
- 輸出<單詞,1>
package com.orginly.mapreduce.wc; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * 單詞記數 * 繼承 Mapper 類 * Mapper 類的泛型引數共4個 兩個key value * 第一對kv:map輸入引數型別 (LongWritable, Text 文字偏移量,一行文字內容) * 第二對kv:map輸出引數型別 (Text, IntWritable 單詞,1) */ public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { // 提升為成員變數避免每次執行 map 方法時都建立一次物件 private final Text word = new Text(); private final IntWritable intWritable = new IntWritable(1); /** * map 方法的輸入引數,一行文字就呼叫一次 map 方法 * * @param key 文字偏移量 * @param value 一行文字內容 * @param context */ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { // 1. 接收到文字內容,轉為String 型別 String str = value.toString(); // 2. 按照空格進行拆分單詞 String[] words = str.split(" "); // 3. 輸出<單詞,1> for (String s : words) { word.set(s); context.write(word, intWritable); } } }
Reduce 階段
- 總各個key(單詞)的個數,遍歷 value 資料進行累加
- 輸出 key 的總數
package com.orginly.mapreduce.wc; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * 繼承 Reducer 類 * 有四個泛型,兩對 kv * 第一對kv要與 Mapper 輸出型別一致 (Text,IntWritable) * 第二對kv自己設計決定輸出結果資料是什麼型別 */ public class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable total = new IntWritable(); /** * 假設 map 方法 執行三次得到:hello,1 hello,1 hello,1 * reduce 的 key => hello, values => <1,1,1> * <p> * 假設 map 方法得到 hello,1 hello,1 hello,1 hadoop,1 reduce,1 hadoop,1 * reduce 方法何時呼叫:一組 key 相同的 kv 中 value 組成然後呼叫一次 reduce * 第一次:key => hello, values => <1,1,1> * 第一次:key => hadoop, values => <1,1> * 第三次:key => reduce, values => <1> * * @param key 方法輸出的key本案例中就是單詞 * @param values 一組key相同的kv的value組成的集合 * @param context * @throws IOException * @throws InterruptedException */ @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { // 遍歷 key 對應的 values 進行累加 int sum = 0; for (IntWritable value : values) { sum += value.get(); } // 直接輸出當前 key 對應的 sum 值,結果就是單詞出現的總次數 total.set(sum); context.write(key,total); } }
Driver
- 獲取配置檔案物件,獲取 job 物件例項
- 指定程式 jar 的本地路徑
- 指定 Mapper / Reduce類
- 指定 Mapper 輸出的 kv 資料型別
- 指定 最終輸出的 kv 資料型別
- 指定 job 處理的原始資料路徑
- 指定 job 輸出結果路徑
- 提交作業
package com.orginly.mapreduce.wc;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* 封裝任務並提交執行
*/
public class WordCountDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 1. 獲取配置檔案物件,獲取 job 物件例項
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "WordCountDriver");
// 2. 指定程式 jar 的本地路徑
job.setJarByClass(WordCountDriver.class);
// 3. 指定 Mapper / Reduce類
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReduce.class);
// 4. 指定 Mapper 輸出的 kv 資料型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 5. 指定最終輸出的 kv 資料型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6. 指定 job 處理的原始資料路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 7. 指定 job 輸出結果路徑
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 8. 提交作業
boolean flag = job.waitForCompletion(true);// 等待完成 true為完成
System.exit(flag ? 0 : 1);
}
}
驗證程式
本地驗證
使用 IDEA 執行 Drive 中的 main() 方法
如果出現其他異常請檢視 https://www.cnblogs.com/orginly/p/15392871.html
- 先自動執行一次,此時會 args 下標異常
- 編輯執行配置新增引數
- 重新執行
Yarn 叢集驗證
- 把程式打成 jar 包,改名為 wordCount.jar 上傳到 Hadoop 叢集
# 上傳到伺服器
rz
# 重新命名
mv wordCount-1.0-SNAPSHOT.jar wordCount.jar
- 啟動 Hadoop 叢集(Hdfs,Yarn)
- 使用 Hadoop 命令提交任務執行
因為是叢集,原始檔不能存放在本地目錄,需要上傳至 HFDS 進行處理
hadoop jar wordCount.jar com.orginly.mapreduce.wc.WordCountDriver /mapReduce/wc.txt /wcoutput
執行成功
如果執行命令時出現版本過低提示,請安裝伺服器所使用的 jdk 版本進行 jar 的打包