MapReduce在Shuffle階段按Mapper輸出的Value進行排序
阿新 • • 發佈:2020-07-03
ZKe
-----------------
在MapReduce框架中,Mapper的輸出在Shuffle階段,根據Key值分組之後,還將會根據Key值進行排序,因此Reducer的輸出我們看到的結果是按Key有序的。
同樣我們可以讓它按Value有序。通過job.setSortComparatorClass(IntWritableComparator.class);即可(這裡的排序規則和型別通過自己定義)
實體類不僅需要實現Comparable介面,同樣還要重寫readFiles方法和write方法。然後定義一個該實體的比較器。
這裡定義一個實體類,由String的id和int的count作為屬性,我們根據count進行排序。
static class Record implements Comparable<Record>{ private String personalId; private int count; public Record(String id, int count){ this.personalId = id; this.count = count; } public Record(String line){this.personalId = line.split("\t")[0]; this.count = Integer.parseInt(line.split("\t")[1]); } /* * 反序列化方法 * @author 180512235 ZhaoKe */ public void readFields(DataInput arg0) throws IOException { this.personalId = arg0.readUTF();this.count = arg0.readInt(); } // 序列化方法 public void write(DataOutput arg0) throws IOException { arg0.writeUTF(this.personalId); arg0.writeInt(this.count); } public int compareTo(Record o) { // TODO Auto-generated method stub return this.count<o.count?1:-1; } public String getPersonalId(){ return this.personalId; } public int getCount(){ return this.count; } }
它的比較器如下
static class IntWritableComparator extends WritableComparator { /* * 重寫構造方法,定義比較類 IntWritable */ public IntWritableComparator() { super(IntWritable.class, true); } /* * 重寫compare方法,自定義比較規則 */ @Override public int compare(WritableComparable a, WritableComparable b) { //向下轉型 IntWritable ia = (IntWritable) a; IntWritable ib = (IntWritable) b; return ib.compareTo(ia); } }
Mapper和Reducer如下,沒有任何操作,因為Shuffle階段自己會呼叫比較器進行排序
static class SortMapper extends Mapper<LongWritable, Text, IntWritable, Text>{ private Record r; protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ r = new Record(value.toString()); context.write(new IntWritable(r.getCount()), new Text(r.getPersonalId())); } } static class SortReducer extends Reducer<IntWritable, Text, Text, IntWritable>{ protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException{ for(Text value:values){ context.write(value, key); } } }
主類如下,大家作為模板即可
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // TODO Auto-generated method stub String inputFile = "hdfs://master:9000/user/root/finalClassDesign/originData/submitTop10output/"; String outputFile = "hdfs://master:9000/user/root/finalClassDesign/originData/sortedSubmitTop10/"; BasicConfigurator.configure(); Configuration conf = new Configuration(); // String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); // if(otherArgs.length != 2){ // System.err.println("Usage:wordcount<in><out>"); // System.exit(2); // } Job job = Job.getInstance(conf, "WordCount"); job.setJarByClass(SortByMapReduce.class); job.setMapperClass(SortMapper.class); job.setReducerClass(SortReducer.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setSortComparatorClass(IntWritableComparator.class); // 此處必須注意設定比較器======================================= // Path path = new Path(otherArgs[1]); Path path = new Path(outputFile); FileSystem fileSystem = path.getFileSystem(conf); if(fileSystem.exists(path)){ fileSystem.delete(path, true); } // FileInputFormat.setInputPaths(job, new Path(args[0])); // FileOutputFormat.setOutputPath(job, new Path(args[1])); FileInputFormat.setInputPaths(job, new Path(inputFile)); FileOutputFormat.setOutputPath(job, new Path(outputFile)); boolean res = job.waitForCompletion(true); if(res) System.out.println("===========waitForCompletion:"+res+"=========="); System.exit(res?0:1); }