1. 程式人生 > >hadoop增加combiner器 java實現

hadoop增加combiner器 java實現

常規的MapReduce模型由mapper和reducer組成,mapper的輸出按照鍵排序,作為reducer的輸入。combiner的作用是在mapper和reducer之間增加一箇中間層,舉一個例子:

對於key為1950,有兩個mapper,mapper1輸出:

1950 10

1950 20

1950 30

mapper2輸出:

1950 40

1950 50

那麼reducer的輸入是(1950,[10,20,30,40,50]),設定規則為選擇最大值,最終輸出為(1950,50)

增加combiner之後,mapper1和mapper2先將資料輸入combiner選擇最大值,然後combiner的輸出進入reducer作為輸入,即reducer的輸入為(1950,[30,50]),這裡combiner的邏輯和reducer的邏輯一樣,都是選擇最大值,因此不用為combiner單獨編寫一套流程,只要修改作業執行函式即可,如下:

public class MaxTemperatureWithCombiner {
    public static void main(String[] args) throws Exception {
        if (args.length != 2)
            System.exit(-1);

        Job job = new Job();
        job.setJarByClass(MaxTemperatureWithCombiner.class);
        job.setJobName("MaxTemperatureWithCombiner");

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setMapperClass(MaxTemperatureMapper.class);
        /* 設定combiner,這裡直接複用reducer的邏輯 */
        job.setCombinerClass(MaxTemperatureReducer.class);
        job.setReducerClass(MaxTemperatureReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

最終執行結果顯然和沒有combiner是一樣的。combiner的使用有限制,這裡是選擇最大值,因此可以加combiner,但是若是求平均數,那麼不能加combiner,因為對兩個區域性求平均之後再求平均顯然無法得到正確的結果