MapReduce優化例項(自定義Partition Combiner)
阿新 • • 發佈:2019-02-04
MapReduce優化例項
1.案例介紹
我們使用簡單的成績資料集,統計出0~20、20~50、50~100這三個年齡段的男、女學生的最高分數
2.資料集
姓名 | 年齡 | 性別 | 成績 |
---|---|---|---|
Alice | 23 | female | 45 |
Bob | 34 | male | 89 |
Chris | 67 | male | 97 |
Kristine | 38 | female | 53 |
Connor | 25 | male | 27 |
Daniel | 78 | male | 95 |
James | 34 | male | 79 |
Alex | 52 | male | 69 |
3、分析
基於需求,我們通過以下幾步完成:
1、編寫Mapper類,按需求將資料集解析為key=gender,value=name+age+score,然後輸出
2、編寫Partitioner類,按年齡段,將結果指定給不同的Reduce執行
3、編寫Reducer類,分別統計出男女學生的最高分
4、編寫run方法執行MapReduce作業
4、程式碼實現
package org.bigdata.hadoop;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* @ProjectName BestScoreCount
* @PackageName com.buaa
* @ClassName Gender *
* @Description 統計不同年齡段內,男、女最高分數
* @Author Administartor
* @Date 2017-07-31 21:49:50
*/
public class GenderMR extends Configured implements Tool {
private static String TAB_SEPARATOR = "\t";
public static class GenderMapper extends Mapper<LongWritable, Text, Text, Text> {
/**
* 呼叫map解析一行資料,該行的資料儲存在value引數中,然後根據\t分隔符,
* 解析出姓名,年齡,性別和成績
*/
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
/**
* 姓名 年齡 性別 成績
* Alice 23 female 45 * 每個欄位的分隔符是tab鍵
*/
// 使用\t,分割資料
String[] tokens = value.toString().split(TAB_SEPARATOR);
// 性別
String gender = tokens[2];
// 姓名 年齡 成績
String nameAgeScore = tokens[0] + TAB_SEPARATOR + tokens[1] + TAB_SEPARATOR + tokens[3];
// 輸出key=gender value=name+age+score
context.write(new Text(gender), new Text(nameAgeScore));
}
}
/**
* 合併 Mapper輸出結果
*/
public static class GenderCombiner extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
int maxScore = Integer.MIN_VALUE;
int score = 0;
String name = " ";
String age = " ";
for (Text val : values) {
String[] valTokens = val.toString().split(TAB_SEPARATOR);
score = Integer.parseInt(valTokens[2]);
if (score > maxScore) {
name = valTokens[0];
age = valTokens[1];
maxScore = score;
}
}
context.write(key, new Text(name + TAB_SEPARATOR + age + TAB_SEPARATOR + maxScore));
}
}
/**
* 根據 age年齡段將map輸出結果均勻分佈在reduce上
*/
public static class GenderPartitioner extends Partitioner<Text, Text> {
@Override
public int getPartition(Text key, Text value, int numReduceTasks) {
String[] nameAgeScore = value.toString().split(TAB_SEPARATOR);
// 學生年齡
int age = Integer.parseInt(nameAgeScore[1]);
// 預設指定分割槽 0
if (numReduceTasks == 0) {
return 0;
}
// 年齡小於等於20,指定分割槽0
if (age <= 20) {
return 0;
} else if (age <= 50) {
// 年齡大於20,小於等於50,指定分割槽1
return 1 % numReduceTasks;
} else {
// 剩餘年齡,指定分割槽2
return 2 % numReduceTasks;
}
}
}
/* * 統計出不同性別的最高分 */
public static class GenderReducer extends Reducer<Text, Text, Text, Text> {
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
int maxScore = Integer.MIN_VALUE;
int score = 0;
String name = " ";
String age = " ";
String gender = " ";
// 根據key,迭代 values集合,求出最高分
for (Text val : values) {
String[] valTokens = val.toString().split(TAB_SEPARATOR);
score = Integer.parseInt(valTokens[2]);
if (score > maxScore) {
name = valTokens[0];
age = valTokens[1];
gender = key.toString();
maxScore = score;
}
}
context.write(new Text(name), new Text("age:" + age + TAB_SEPARATOR + "gender:" + gender + TAB_SEPARATOR + "score:" + maxScore));
}
}
public int run(String[] args) throws Exception {
// 讀取配置檔案
Configuration conf = new Configuration();
// 新建一個任務
Job job = Job.getInstance(conf, this.getClass().getSimpleName());
// 主類
job.setJarByClass(Gender.class);
// 輸入路徑
Path inPath = new Path(args[0])
FileInputFormat.addInputPath(job, inPath);
// Mapper
job.setMapperClass(GenderMapper.class);
// Reducer
job.setReducerClass(GenderReducer.class);
// map 輸出key型別
job.setMapOutputKeyClass(Text.class);
// map 輸出value型別
job.setMapOutputValueClass(Text.class);
// reduce 輸出key型別
job.setOutputKeyClass(Text.class);
// reduce 輸出value型別
job.setOutputValueClass(Text.class);
// 設定Combiner類
job.setCombinerClass(GenderCombiner.class);
// 設定Partitioner類
job.setPartitionerClass(GenderPartitioner.class);
//reduce個數設定為3
job.setNumReduceTasks(3);
// 輸出路徑
Path outpath = new Path(args[1]);
FileSystem fs = mypath.getFileSystem(conf);
if (fs.exists(outpath)) {
fs.delete(outpath, true);
}
FileOutputFormat.setOutputPath(job, outPath);
// 提交任務
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
args = new String[]{ "hdfs://com.learn.bigdata:8020/input/gender.txt",
"hdfs://com.learn.bigdta:8020/output"};
int status = ToolRunner.run(
conf,
new GenderMR(),
args);
System.exit(status);
}
}