Hadoop的計算上下行流量的案例
阿新 • • 發佈:2019-01-03
問題:統計每一使用者的(手機號)所消耗的總的上行流量,下行流量,總流量
2.建立輸出類package cn.lsm.bigdata.sumflow; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class flowcount { /** * mapper */ static class flowcountMapper extends Mapper<LongWritable, Text, Text, flowBean> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\t"); String phone = fields[1]; long upFlow = Long.parseLong(fields[fields.length - 3]); long downFlow = Long.parseLong(fields[fields.length - 2]); // long sumFlow=upFlow+downFlow; context.write(new Text(phone), new flowBean(upFlow, downFlow)); } } /** * reducer */ static class flowcountReducer extends Reducer<Text, flowBean, Text, flowBean> { @Override protected void reduce(Text key, Iterable<flowBean> values, Context context) throws IOException, InterruptedException { // 定義上行流量和下行流量的數量 long upflowNum = 0; long downflowNum = 0; // 迴圈新增相同號碼的上行和下行流量 for (flowBean value : values) { upflowNum += value.getUpflow(); downflowNum += value.getDownflow(); } // 建立一個新的flowBean來存放結果 flowBean result = new flowBean(upflowNum, downflowNum); context.write(key, result); } } /** * job */ public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job myjob = Job.getInstance(conf); myjob.setJarByClass(flowcount.class); myjob.setMapperClass(flowcountMapper.class); myjob.setReducerClass(flowcountReducer.class); myjob.setMapOutputKeyClass(Text.class); myjob.setMapOutputValueClass(flowBean.class); myjob.setOutputKeyClass(Text.class); myjob.setOutputValueClass(flowBean.class); FileInputFormat.setInputPaths(myjob, new Path(args[0])); FileOutputFormat.setOutputPath(myjob, new Path(args[1])); myjob.waitForCompletion(true); } }
3.將封裝好的jar,傳到叢集中,執行 hadoop jar flowcount1.jar cn.lsm.bigdata.sumflow.flowcount /review/input /review/outputpackage cn.lsm.bigdata.sumflow; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; public class flowBean implements Writable { // 上行流量 private long upflow; // 下行流量 private long downflow; // 總流量 private long sumflow; //反序列化時,需要反射呼叫空參建構函式,所以需要 public flowBean(){}; public flowBean(long upflow, long downflow) { this.upflow = upflow; this.downflow = downflow; this.sumflow = upflow + downflow; } public long getUpflow() { return upflow; } public void setUpflow(long upflow) { this.upflow = upflow; } public long getDownflow() { return downflow; } public void setDownflow(long downflow) { this.downflow = downflow; } public long getSumflow() { return sumflow; } public void setSumflow(long sumflow) { this.sumflow = sumflow; } @Override public String toString() { return upflow + "\t" + downflow + "\t" + sumflow; } // 序列化 @Override public void write(DataOutput out) throws IOException { out.writeLong(upflow); out.writeLong(downflow); out.writeLong(sumflow); } // 反序列化 @Override public void readFields(DataInput in) throws IOException { upflow = in.readLong(); downflow = in.readLong(); sumflow = in.readLong(); } }
執行正確的過程和結果是:
結果: