1. 程式人生 > >Hadoop的計算上下行流量的案例

Hadoop的計算上下行流量的案例

問題:統計每一使用者的(手機號)所消耗的總的上行流量,下行流量,總流量


思路:1.建立mapper,reducer,job           2.與之間的worcount的區別在於,輸出的結果不是一個,而是多個,所以需要我們將需要輸出的資料封裝到一個類中 1.建立mapper,reducer,job(將三者放在一個類中或者分開寫)
package cn.lsm.bigdata.sumflow;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class flowcount {
      /**
       * mapper
       */
      static class flowcountMapper extends Mapper<LongWritable, Text, Text, flowBean> {
           @Override
           protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                 String line = value.toString();
                 String[] fields = line.split("\t");

                 String phone = fields[1];

                 long upFlow = Long.parseLong(fields[fields.length - 3]);
                 long downFlow = Long.parseLong(fields[fields.length - 2]);
                 // long sumFlow=upFlow+downFlow;

                 context.write(new Text(phone), new flowBean(upFlow, downFlow));
           }
      }

      /**
       * reducer
       */
      static class flowcountReducer extends Reducer<Text, flowBean, Text, flowBean> {
           @Override
           protected void reduce(Text key, Iterable<flowBean> values, Context context)
                      throws IOException, InterruptedException {

                 // 定義上行流量和下行流量的數量
                 long upflowNum = 0;
                 long downflowNum = 0;

                 // 迴圈新增相同號碼的上行和下行流量
                 for (flowBean value : values) {
                      upflowNum += value.getUpflow();
                      downflowNum += value.getDownflow();
                 }

                 // 建立一個新的flowBean來存放結果
                 flowBean result = new flowBean(upflowNum, downflowNum);

                 context.write(key, result);
           }
      }

      /**
       * job
       */
      public static void main(String[] args) throws Exception {
           Configuration conf = new Configuration();
           Job myjob = Job.getInstance(conf);

           myjob.setJarByClass(flowcount.class);

           myjob.setMapperClass(flowcountMapper.class);
           myjob.setReducerClass(flowcountReducer.class);

           myjob.setMapOutputKeyClass(Text.class);
           myjob.setMapOutputValueClass(flowBean.class);

           myjob.setOutputKeyClass(Text.class);
           myjob.setOutputValueClass(flowBean.class);

           FileInputFormat.setInputPaths(myjob, new Path(args[0]));
           FileOutputFormat.setOutputPath(myjob, new Path(args[1]));

           myjob.waitForCompletion(true);
      }
}
2.建立輸出類
package cn.lsm.bigdata.sumflow;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

public class flowBean implements Writable {
    // 上行流量
    private long upflow;
    // 下行流量
    private long downflow;
    // 總流量
    private long sumflow;

    //反序列化時,需要反射呼叫空參建構函式,所以需要
    public flowBean(){};

    public flowBean(long upflow, long downflow) {
        this.upflow = upflow;
        this.downflow = downflow;
        this.sumflow = upflow + downflow;
    }

    public long getUpflow() {
        return upflow;
    }

    public void setUpflow(long upflow) {
        this.upflow = upflow;
    }

    public long getDownflow() {
        return downflow;
    }

    public void setDownflow(long downflow) {
        this.downflow = downflow;
    }

    public long getSumflow() {
        return sumflow;
    }

    public void setSumflow(long sumflow) {
        this.sumflow = sumflow;
    }

    @Override
    public String toString() {
        return upflow + "\t" + downflow + "\t" + sumflow;
    }

    // 序列化
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upflow);
        out.writeLong(downflow);
        out.writeLong(sumflow);
    }

    // 反序列化
    @Override
    public void readFields(DataInput in) throws IOException {
        upflow = in.readLong();
        downflow = in.readLong();
        sumflow = in.readLong();
    }

}
3.將封裝好的jar,傳到叢集中,執行 hadoop jar flowcount1.jar cn.lsm.bigdata.sumflow.flowcount /review/input /review/output
執行正確的過程和結果是:

結果: