1. 程式人生 > 實用技巧 >面向物件六大原則

面向物件六大原則

Hadoop的key和value的傳遞序列化需要涉及兩個重要的介面Writable和WritableComparable:

WritableComparable比Writable多了一個compareTo方法,用來判斷key是否唯一或者說是不是相同。

Hadoop為Key的資料型別必須實現WritableComparable,而Value的資料型別只需要實現Writable即可,能用做Key值的一定可以用做Value值,但是能做Value值的未必能用來做Key值。

package com.atguigu.writableComparable;

/*
對序列化之後的總流量排序
 */
import
org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class sortDriver {
public static void main(String[] args) throws Exception { Job job = Job.getInstance(new Configuration()); job.setJarByClass(sortDriver.class); job.setMapperClass(sortMapper.class); job.setReducerClass(sortReducer.class); job.setMapOutputKeyClass(FlowBean.
class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); FileInputFormat.addInputPath(job,new Path("E:\\phone2.txt")); FileOutputFormat.setOutputPath(job,new Path("E:\\out3")); boolean b = job.waitForCompletion(true); System.exit(b?0:1); } } /* 輸入: 13610009496,300,200,100, 13710009496,200,400,300, 13800094960,200,100,200, 13810009496,300,100,500, 13910009496,600,500,400, 15210009496,300,500,100, 輸出: 13810009496 upFlow=300, downFlow=100, sumFlow=500 13910009496 upFlow=600, downFlow=500, sumFlow=400 13710009496 upFlow=200, downFlow=400, sumFlow=300 13800094960 upFlow=200, downFlow=100, sumFlow=200 15210009496 upFlow=300, downFlow=500, sumFlow=100 13610009496 upFlow=300, downFlow=200, sumFlow=100 */

package com.atguigu.writableComparable;


import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean implements WritableComparable<FlowBean> {
    private long upFlow;
    private long downFlow;
    private long sumFlow;

    @Override
    public String toString(){
        return "upFlow=" + upFlow + ", downFlow=" + downFlow + ", sumFlow=" + sumFlow ;
    }

    public void set(long upFlow,long downFlow){
        this.upFlow=upFlow;
        this.downFlow=downFlow;
        this.sumFlow=upFlow+downFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    //序列化:把資料交給框架
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);


    }
    //反序列化:從框架讀取資料
    public void readFields(DataInput in) throws IOException {
        upFlow = in.readLong();
        downFlow = in.readLong();
        sumFlow = in.readLong();


    }

    @Override
    public int compareTo(FlowBean o) {
        return Long.compare(o.sumFlow,this.sumFlow);
    }
}
package com.atguigu.writableComparable;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;


public class sortMapper extends Mapper<LongWritable,Text,FlowBean,Text> {

    private FlowBean flow = new FlowBean();
    private Text phone = new Text();


    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] fields = value.toString().split(" ");

        phone.set(fields[0]);
        flow.setUpFlow(Long.parseLong(fields[1]));
        flow.setDownFlow(Long.parseLong(fields[2]));
        flow.setSumFlow(Long.parseLong(fields[3]));

        context.write(flow,phone);

    }
}
package com.atguigu.writableComparable;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class sortReducer extends Reducer<FlowBean, Text,Text,FlowBean> {


    @Override
    protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        for(Text value:values){
            context.write(value,key);
        }
    }
}