1. 程式人生 > >MapReduce優化例項(自定義Partition Combiner)

MapReduce優化例項(自定義Partition Combiner)

本文轉載自

MapReduce優化例項

1.案例介紹

我們使用簡單的成績資料集,統計出0~20、20~50、50~100這三個年齡段的男、女學生的最高分數

2.資料集

姓名 年齡 性別 成績
Alice 23 female 45
Bob 34 male 89
Chris 67 male 97
Kristine 38 female 53
Connor 25 male 27
Daniel 78 male 95
James 34 male 79
Alex 52 male 69

3、分析

基於需求,我們通過以下幾步完成:
1、編寫Mapper類,按需求將資料集解析為key=gender,value=name+age+score,然後輸出
2、編寫Partitioner類,按年齡段,將結果指定給不同的Reduce執行
3、編寫Reducer類,分別統計出男女學生的最高分
4、編寫run方法執行MapReduce作業

4、程式碼實現

package org.bigdata.hadoop;

import java.io.IOException;

import
org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import
org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * @ProjectName BestScoreCount * @PackageName com.buaa * @ClassName Gender * * @Description 統計不同年齡段內,男、女最高分數 * @Author Administartor * @Date 2017-07-31 21:49:50 */ public class GenderMR extends Configured implements Tool { private static String TAB_SEPARATOR = "\t"; public static class GenderMapper extends Mapper<LongWritable, Text, Text, Text> { /** * 呼叫map解析一行資料,該行的資料儲存在value引數中,然後根據\t分隔符, * 解析出姓名,年齡,性別和成績 */ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { /** * 姓名 年齡 性別 成績 * Alice 23 female 45 * 每個欄位的分隔符是tab鍵 */ // 使用\t,分割資料 String[] tokens = value.toString().split(TAB_SEPARATOR); // 性別 String gender = tokens[2]; // 姓名 年齡 成績 String nameAgeScore = tokens[0] + TAB_SEPARATOR + tokens[1] + TAB_SEPARATOR + tokens[3]; // 輸出key=gender value=name+age+score context.write(new Text(gender), new Text(nameAgeScore)); } } /** * 合併 Mapper輸出結果 */ public static class GenderCombiner extends Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { int maxScore = Integer.MIN_VALUE; int score = 0; String name = " "; String age = " "; for (Text val : values) { String[] valTokens = val.toString().split(TAB_SEPARATOR); score = Integer.parseInt(valTokens[2]); if (score > maxScore) { name = valTokens[0]; age = valTokens[1]; maxScore = score; } } context.write(key, new Text(name + TAB_SEPARATOR + age + TAB_SEPARATOR + maxScore)); } } /** * 根據 age年齡段將map輸出結果均勻分佈在reduce上 */ public static class GenderPartitioner extends Partitioner<Text, Text> { @Override public int getPartition(Text key, Text value, int numReduceTasks) { String[] nameAgeScore = value.toString().split(TAB_SEPARATOR); // 學生年齡 int age = Integer.parseInt(nameAgeScore[1]); // 預設指定分割槽 0 if (numReduceTasks == 0) { return 0; } // 年齡小於等於20,指定分割槽0 if (age <= 20) { return 0; } else if (age <= 50) { // 年齡大於20,小於等於50,指定分割槽1 return 1 % numReduceTasks; } else { // 剩餘年齡,指定分割槽2 return 2 % numReduceTasks; } } } /* * 統計出不同性別的最高分 */ public static class GenderReducer extends Reducer<Text, Text, Text, Text> { @Override public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { int maxScore = Integer.MIN_VALUE; int score = 0; String name = " "; String age = " "; String gender = " "; // 根據key,迭代 values集合,求出最高分 for (Text val : values) { String[] valTokens = val.toString().split(TAB_SEPARATOR); score = Integer.parseInt(valTokens[2]); if (score > maxScore) { name = valTokens[0]; age = valTokens[1]; gender = key.toString(); maxScore = score; } } context.write(new Text(name), new Text("age:" + age + TAB_SEPARATOR + "gender:" + gender + TAB_SEPARATOR + "score:" + maxScore)); } } public int run(String[] args) throws Exception { // 讀取配置檔案 Configuration conf = new Configuration(); // 新建一個任務 Job job = Job.getInstance(conf, this.getClass().getSimpleName()); // 主類 job.setJarByClass(Gender.class); // 輸入路徑 Path inPath = new Path(args[0]) FileInputFormat.addInputPath(job, inPath); // Mapper job.setMapperClass(GenderMapper.class); // Reducer job.setReducerClass(GenderReducer.class); // map 輸出key型別 job.setMapOutputKeyClass(Text.class); // map 輸出value型別 job.setMapOutputValueClass(Text.class); // reduce 輸出key型別 job.setOutputKeyClass(Text.class); // reduce 輸出value型別 job.setOutputValueClass(Text.class); // 設定Combiner類 job.setCombinerClass(GenderCombiner.class); // 設定Partitioner類 job.setPartitionerClass(GenderPartitioner.class); //reduce個數設定為3 job.setNumReduceTasks(3); // 輸出路徑 Path outpath = new Path(args[1]); FileSystem fs = mypath.getFileSystem(conf); if (fs.exists(outpath)) { fs.delete(outpath, true); } FileOutputFormat.setOutputPath(job, outPath); // 提交任務 return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); args = new String[]{ "hdfs://com.learn.bigdata:8020/input/gender.txt", "hdfs://com.learn.bigdta:8020/output"}; int status = ToolRunner.run( conf, new GenderMR(), args); System.exit(status); } }