MapReduce程式設計之Combiner
阿新 • • 發佈:2018-12-13
Combiner 可以理解為本地的reducer,減少了Map Tasks輸出的資料量以及資料網路傳輸量
編譯執行: hadoop jar /home/zq/lib/HDFS_Test-1.0-SNAPSHOT.jar MapReduce.CombinerApp hdfs://zq:8020/hello.txt hdfs://zq:8020/output/wc 和前一篇部落格的程式碼是差不多的,只是多出這句核心程式碼:
//通過job設定combiner處理類,其實邏輯上和我們的reduce是一模一樣的
job.setCombinerClass(MyReducer.class);
詳細程式碼如下 CombinerApp.java
package MapReduce; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * 使用MapReduce開發WordCount應用程式 */ public class CombinerApp { /** * Map:讀取輸入的檔案 */ public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ LongWritable one = new LongWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 接收到的每一行資料 String line = value.toString(); //按照指定分隔符進行拆分 String[] words = line.split(" "); for(String word : words) { // 通過上下文把map的處理結果輸出 context.write(new Text(word), one); } } } /** * Reduce:歸併操作 */ public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> { @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long sum = 0; for(LongWritable value : values) { // 求key出現的次數總和 sum += value.get(); } // 最終統計結果的輸出 context.write(key, new LongWritable(sum)); } } /** * 定義Driver:封裝了MapReduce作業的所有資訊 */ public static void main(String[] args) throws Exception{ //建立Configuration Configuration configuration = new Configuration(); // 準備清理已存在的輸出目錄 Path outputPath = new Path(args[1]); FileSystem fileSystem = FileSystem.get(configuration); if(fileSystem.exists(outputPath)){ fileSystem.delete(outputPath, true); System.out.println("output file exists, but is has deleted"); } //建立Job Job job = Job.getInstance(configuration, "wordcount"); //設定job的處理類 job.setJarByClass(CombinerApp.class); //設定作業處理的輸入路徑 FileInputFormat.setInputPaths(job, new Path(args[0])); //設定map相關引數 job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); //設定reduce相關引數 job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //通過job設定combiner處理類,其實邏輯上和我們的reduce是一模一樣的 job.setCombinerClass(MyReducer.class); //設定作業處理的輸出路徑 FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
注意: 使用場景: 求和、次數(區域性相加起來就是總和)等是適用的 求平均數(區域性平均數的平均數不是總體平均數)不適用