1. 程式人生 > >hadoop mr 全域性排序

hadoop mr 全域性排序

全域性排序
    需求:根據使用者每月使用的流量按照使用的流量多少排序

    介面->WritableComparable
    排序操作在hadoop中屬於預設的行為。預設按照字典順序排序。

排序的分類:
    1)部分排序
    2)全排序
    3)輔助排序 
    4)二次排序

封裝類,直接完成排序
public class FlowBean implements WritalbeComparable<FlowBean>{
    private long upFlow;
    private long dfFlow;
    private long flowsum;

    pulibc FlowBean(){

    }
    public FlowBean(long upFlow, long dfFlow){
        this.upFlow = upFlow;
        this.dfFlow = dfFlow;
        this.flowsum = upFlow+dfFlow;
    }

    setter
    getter

    @Override
    public void readFields(DataInput in) throws Exception{
        //反序列化
        upFlow = in.readLong();
        dfFlow = in.readLong();
        flowsum = in.readLong();
    }
    @Override
    public void write(DataOutput out) throws Exception{
        //序列化
        out.writeLong(upFlow);
        out.writeLong(dfFlow);
        out.writeLong(flowsum);
    }
    @Override
    public int compareTo(FlowBean o){
        return this.flowsum >o.getFolwSum()?-1:1;
    }

    @Override
    public String toString(){
        //倒序
        return upFlow+"\t"+dfFlow+"\t"+flowsum;
    }
}

public class FlowSortReducer extends Reducer<FlowBean, Text, Text,FlowBean>{
    @Override
    public void reduce<FlowBean key, Interater<Text> value, Context context>
            throws IOException,InterruptException{
        context.write(value.iterator().next(),key)

    }
}

public class FlowSortMapper extends Mapper<LongWritable, Text, FlowBean, Text>{
    @Override
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptException{
        //1、獲取一行資料
        String line = value.toString();

        //2、切割
        String[] fields = line.split("\t");

        //3、取出關鍵欄位
        long upFlow = Long.parseLong(fields[1]);
        long dfFlow = Long.parseLong(fields[1]);

        //4、寫出到reduce 階段
        context.write(new FlowBean(upFlow,dfFlow), new Text(field[0]))

    }
}
public class FlowSortDriver{
    public static void main(String[] args){
        Configuration conf = new Configuration();
        Job job = Job.getInstance();

        job.setJarByClass(FlowSortDriver.class);

        job.setMapperClass(FlowSortMapper.class);
        job.setReducerClass(FlowSortReducer.class);

        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        FileInputFormat.setInputPaths(job,new Path("/wc/in"));
        FileOutputFormat.setOutputPaths(job,new Paht("wc/out"));

        System.out.pritnln(job.waitForCompletion(true)?0:1)
    }
}


分割槽加排序

    public class PhonenumPartitioner extends Partitioner<FlowBean, Text>{
            @Override
            public int getPartition(FlowBean key, Text value, int numPartitions){
                //1、獲取手機號前三位
                String phoneNum = value.toString().substring(0,3);
                //2、分割槽
                int partitioner = 4;

                if("135".equals(phoneNum)){
                    return 0;
                }else if("137".equals(phoneNum)){
                    return 1;
                }else ("138".equals(phoneNum)){
                    return 2;
                }

                return partitioner;
            }
        }
    Diriver類{
        job.setPartitionerClass(PhonenumPartitioner.class);
        job.setNumReduceTask(4);    //設定的數量,要大於自定義的時候分割槽的數量
    }