hadoop增加combiner器 java實現
阿新 • • 發佈:2018-11-09
常規的MapReduce模型由mapper和reducer組成,mapper的輸出按照鍵排序,作為reducer的輸入。combiner的作用是在mapper和reducer之間增加一箇中間層,舉一個例子:
對於key為1950,有兩個mapper,mapper1輸出:
1950 10
1950 20
1950 30
mapper2輸出:
1950 40
1950 50
那麼reducer的輸入是(1950,[10,20,30,40,50]),設定規則為選擇最大值,最終輸出為(1950,50)
增加combiner之後,mapper1和mapper2先將資料輸入combiner選擇最大值,然後combiner的輸出進入reducer作為輸入,即reducer的輸入為(1950,[30,50]),這裡combiner的邏輯和reducer的邏輯一樣,都是選擇最大值,因此不用為combiner單獨編寫一套流程,只要修改作業執行函式即可,如下:
public class MaxTemperatureWithCombiner { public static void main(String[] args) throws Exception { if (args.length != 2) System.exit(-1); Job job = new Job(); job.setJarByClass(MaxTemperatureWithCombiner.class); job.setJobName("MaxTemperatureWithCombiner"); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(MaxTemperatureMapper.class); /* 設定combiner,這裡直接複用reducer的邏輯 */ job.setCombinerClass(MaxTemperatureReducer.class); job.setReducerClass(MaxTemperatureReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
最終執行結果顯然和沒有combiner是一樣的。combiner的使用有限制,這裡是選擇最大值,因此可以加combiner,但是若是求平均數,那麼不能加combiner,因為對兩個區域性求平均之後再求平均顯然無法得到正確的結果