1. 程式人生 > 實用技巧 >MapReduce在Shuffle階段按Mapper輸出的Value進行排序

MapReduce在Shuffle階段按Mapper輸出的Value進行排序

ZKe

-----------------

  在MapReduce框架中,Mapper的輸出在Shuffle階段,根據Key值分組之後,還將會根據Key值進行排序,因此Reducer的輸出我們看到的結果是按Key有序的。

  同樣我們可以讓它按Value有序。通過job.setSortComparatorClass(IntWritableComparator.class);即可(這裡的排序規則和型別通過自己定義)

  實體類不僅需要實現Comparable介面,同樣還要重寫readFiles方法和write方法。然後定義一個該實體的比較器。

  這裡定義一個實體類,由String的id和int的count作為屬性,我們根據count進行排序。

static class Record implements Comparable<Record>{
        
        private String personalId;
        private int count;
        
        public Record(String id, int count){
            this.personalId = id;
            this.count = count;
        }
        public Record(String line){
            
this.personalId = line.split("\t")[0]; this.count = Integer.parseInt(line.split("\t")[1]); } /* * 反序列化方法 * @author 180512235 ZhaoKe */ public void readFields(DataInput arg0) throws IOException { this.personalId = arg0.readUTF();
this.count = arg0.readInt(); } // 序列化方法 public void write(DataOutput arg0) throws IOException { arg0.writeUTF(this.personalId); arg0.writeInt(this.count); } public int compareTo(Record o) { // TODO Auto-generated method stub return this.count<o.count?1:-1; } public String getPersonalId(){ return this.personalId; } public int getCount(){ return this.count; } }

它的比較器如下

    static class IntWritableComparator extends WritableComparator {
     
        /*
         * 重寫構造方法,定義比較類 IntWritable
         */
        public IntWritableComparator() {
            super(IntWritable.class, true);
        }
        /*
         * 重寫compare方法,自定義比較規則
         */
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            //向下轉型
            IntWritable ia = (IntWritable) a;
            IntWritable ib = (IntWritable) b;
            return ib.compareTo(ia);
        }
    }
    

Mapper和Reducer如下,沒有任何操作,因為Shuffle階段自己會呼叫比較器進行排序

    static class SortMapper extends Mapper<LongWritable, Text, IntWritable, Text>{
        private Record r;
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
            r = new Record(value.toString());
            context.write(new IntWritable(r.getCount()), new Text(r.getPersonalId()));
        }
    }
    static class SortReducer extends Reducer<IntWritable, Text, Text, IntWritable>{
        
        protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException{

            for(Text value:values){
                context.write(value, key);
            }
        }
    }

主類如下,大家作為模板即可

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // TODO Auto-generated method stub
        String inputFile = "hdfs://master:9000/user/root/finalClassDesign/originData/submitTop10output/";
        
        String outputFile = "hdfs://master:9000/user/root/finalClassDesign/originData/sortedSubmitTop10/";
        BasicConfigurator.configure();
        Configuration conf = new Configuration();
//        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
//        if(otherArgs.length != 2){
//            System.err.println("Usage:wordcount<in><out>");
//            System.exit(2);
//        }
        
        Job job = Job.getInstance(conf, "WordCount");
        
        job.setJarByClass(SortByMapReduce.class);
        
        job.setMapperClass(SortMapper.class);
        job.setReducerClass(SortReducer.class);
        
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        job.setSortComparatorClass(IntWritableComparator.class);  // 此處必須注意設定比較器=======================================
        
//        Path path = new Path(otherArgs[1]);
        Path path = new Path(outputFile);
        FileSystem fileSystem = path.getFileSystem(conf);
        if(fileSystem.exists(path)){
            fileSystem.delete(path, true);
        }
        
//        FileInputFormat.setInputPaths(job, new Path(args[0]));
//        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        FileInputFormat.setInputPaths(job, new Path(inputFile));
        FileOutputFormat.setOutputPath(job, new Path(outputFile));
        
        boolean res = job.waitForCompletion(true);
        if(res)
            System.out.println("===========waitForCompletion:"+res+"==========");
        System.exit(res?0:1);
    }