1. 程式人生 > >流量彙總案例程式設計,

流量彙總案例程式設計,

public class FlowCountMapper extends Mapper<LongWritable,Text,Text,FlowBean>{
    public void map(LongWritable key, Text value, Context context)
            throws IOException,InterruptedException{
        //1、獲取資料
        String line = value.toString();

        //2、切片
        String[] fields = line.split("\t");

        //3、封裝物件,拿到關鍵字,資料清洗
        String phoneN = fields[1];
        long upFlow = long.parseLong(fields[fields.length-3]);
        long dfFlow = long.parseLong(fields[fields.length-2]);

        //4、輸出到reduce端
        context.write(new Text(phoneN), new FlowBean(upFlow,dfFlow));
        }
}

public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
    pubic void reducer<Text key, Iterable<FlowBean> value, Context context>
            throws Exception{
        //1、相同手機號的流量使用兩次彙總
        long upFlow_sum = 0;
        long dfFlow_sum = 0;

        //2、累加
        for(FlowBean f : value){
            upFlow_sum = f.getUpFlow();
            dfFlow_sum = f.getDfFlow();

        }

        FlowBean rs = new FlowBean(upFlow_sum, dfFlow_sum);

        //3、輸出
        context.write(key,rs);
    }
}

public class FlowCountDriver{
    public static void main(String[] args){
        //1)獲取job資訊
        Configuration conf = new Configuration();
        Job job = Job.getInstance();

        //2)獲取jar包
        job.setJarByClass(FlowCountDriver.class);

        //3)獲取自定義的mapper和reducer類
        job.setMapperClass(FlowCountMapper.class);
        job.setReducerClass(FlowCountReducer.class);

        //4)設定map輸出的資料型別
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        //5)設定reduce輸出的資料型別
        job.setOutputkeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        //6)設定輸入存在的路徑與處理後的結果路徑
        FileInputFormat.setInputPaths(job,new Path("/wc/in"));
        FileOutputFormat.setOutputPaths(job,new Path("/wc/out"));

        //7)提交任務
        boolean rs = job.waitForCompletion(true);
        System.out.println(rs?0:1);
    }
}

public class FlowBean implements Writable{

    private long upFlow;
    private long dfFlow;
    private long flowsum;

    pulibc FlowBean(){

    }
    public FlowBean(long upFlow, long dfFlow){
        this.upFlow = upFlow;
        this.dfFlow = dfFlow;
        this.flowsum = upFlow+dfFlow;
    }

    setter
    getter

    @Override
    public void readFields(DataInput in) throws Exception{
        //反序列化
        upFlow = in.readLong();
        dfFlow = in.readLong();
        flowsum = in.readLong();
    }
    @Override
    public void write(DataOutput out) throws Exception{
        //序列化
        out.writeLong(upFlow);
        out.writeLong(dfFlow);
        out.writeLong(flowsum);
    }

    @Override
    public String toString(){
        return upFlow+"\t"+dfFlow+"\t"+flowsum;
    }

}