1. 程式人生 > >Mapreduce的排序初步

Mapreduce的排序初步

lec 接口 序列 () mapr 註意 pre 實現接口 con

1. 需求
在得出統計每一個用戶(手機號)所耗費的總上行流量、下行流量,總流量結果的基礎之上再加一個需求:將統計結果按照總流量倒序排序。
2. 分析
基本思路:
實現自定義的bean來封裝流量信息,並將bean作為map輸出的key來傳輸
MR程序在處理數據的過程中會對數據排序(map輸出的kv對傳輸到reduce之前,會排序),排序的依據是map輸出的key。所以,我們如果要實現自己需要的排序規則,則可以考慮將排序因素放到key中,讓key實現接口:WritableComparable,然後重寫key的compareTo方法。
3. 實現
自定義的bean

public class FlowBean implements WritableComparable<FlowBean>{

    private long upFlow;

    private long downFlow;

    private long sumFlow;

//這裏反序列的時候會用到

    public FlowBean() {

    }

    public FlowBean(long upFlow, long downFlow, long sumFlow) {

        this.upFlow = upFlow;

        this.downFlow = downFlow;

        this.sumFlow = sumFlow;

    }

    public FlowBean(long upFlow, long downFlow) {

        this.upFlow = upFlow;

        this.downFlow = downFlow;

        this.sumFlow = upFlow+downFlow;

    }

    public void set(long upFlow, long downFlow) {

        this.upFlow = upFlow;

        this.downFlow = downFlow;

        this.sumFlow = upFlow+downFlow;

    }

    @Override

    public String toString() {

        return upFlow+"\t"+downFlow+"\t"+sumFlow;

    }

    /這裏是序列化方法

    @Override

    public void write(DataOutput out) throws IOException {

          out.writeLong(upFlow);

          out.writeLong(downFlow);

          out.writeLong(sumFlow);

    }

    /這裏是反序列化方法

    @Override

    public void readFields(DataInput in) throws IOException {

        //註意反序列化的順序跟序列化的順序一致

       this.upFlow = in.readLong();

       this.downFlow = in.readLong();

       this.sumFlow = in.readLong();

    }

    //這裏進行bean的自定義比較大小

    @Override

    public int compareTo(FlowBean o) {

        //實現按照 sumflow 的大小倒序排序

        return this.sumFlow>o.getSumFlow()?-1:1;

    }

    }
public class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean>{

    Text k = new Text();

    FlowBean v = new FlowBean();

    @Override

    protected void map(LongWritable key, Text value,Context context)

            throws IOException, InterruptedException {

            String line = value.toString();

            String[] fields = line.split("\t");

            String phoneNum = fields[1];

            long upFlow = Long.parseLong(fields[fields.length-3]);

            long downFlow = Long.parseLong(fields[fields.length-2]);

            k.set(phoneNum);

            v.set(upFlow, downFlow);

            context.write(k, v);

    }

}

public class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean> {

FlowBean v = new FlowBean();

    @Override

    protected void reduce(Text key, Iterable<FlowBean> values, Context context)

            throws IOException, InterruptedException {

        long upFlowCount = 0;

        long downFlowCount = 0;

        for (FlowBean bean : values) {

            upFlowCount += bean.getUpFlow();

            downFlowCount += bean.getDownFlow();

        }

        v.set(upFlowCount, downFlowCount);

        context.write(key, v);

    }

}

Mapreduce的排序初步