自定義writable統計電話號碼的上下行及總流量
阿新 • • 發佈:2018-12-14
Driver類
public class flowSum { static class mymapper extends Mapper<LongWritable, Text, Text, FlowBean>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //切分欄位 String[] fields = value.toString().split("\t"); //取出手機號 String phoneNbr = fields[1]; //取出上行流量下行流量 long upFlow = Long.parseLong(fields[fields.length-3]); long dFlow = Long.parseLong(fields[fields.length-2]); // FlowBean fb=new FlowBean(upFlow, dFlow); // context.write(new Text(phoneNbr), fb); context.write(new Text(phoneNbr), new FlowBean(upFlow, dFlow)); } } static class myreducer extends Reducer< Text, FlowBean,Text, FlowBean>{ @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { long sum_upFlow = 0; long sum_dFlow = 0; //遍歷所有bean,將其中的上行流量,下行流量分別累加 for(FlowBean bean: values){ sum_upFlow += bean.getUpFlow(); sum_dFlow += bean.getdFlow(); } FlowBean fb = new FlowBean(sum_upFlow, sum_dFlow); context.write(key, fb); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job=Job.getInstance(conf); //設定主函式的入口 job.setJarByClass(flowSum.class); //設定job的mapper的類 job.setMapperClass(mymapper.class); //設定job的reduce類 job.setReducerClass(myreducer.class); //設定map輸出key的型別 job.setMapOutputKeyClass(Text.class); //設定map輸出value的型別 job.setMapOutputValueClass(FlowBean.class); //設定reduce輸出key的型別 job.setOutputKeyClass(Text.class); //設定reduce輸出value的型別 job.setOutputValueClass(FlowBean.class); // job.setCombinerClass(WordCountReduce.class); //檔案輸入類 Path inpath=new Path("hdfs://master:9000/hadoop/input"); FileInputFormat.addInputPath(job, inpath); //檔案輸出類 Path outpath=new Path("hdfs://master:9000/hadoop/result2"); FileOutputFormat.setOutputPath(job,outpath); //提交job // job.submit();//這個不列印日誌 job.waitForCompletion(true); // System.exit(job.waitForCompletion(true) ? 0 : 1); } }
FlowBean
public class FlowBean implements Writable{ private long upFlow; private long dFlow; private long sumFlow; // 反序列化時,需要反射呼叫空參建構函式,所以要顯示定義一個 public FlowBean() { } public FlowBean(long upFlow, long dFlow) { this.upFlow = upFlow; this.dFlow = dFlow; this.sumFlow = upFlow + dFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getdFlow() { return dFlow; } public void setdFlow(long dFlow) { this.dFlow = dFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } /** * 序列化方法 */ public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(dFlow); out.writeLong(sumFlow); } /** * 反序列化方法 * 注意:反序列化的順序跟序列化的順序完全一致 */ public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); dFlow = in.readLong(); sumFlow = in.readLong(); } @Override public String toString() { return upFlow + "\t" + dFlow + "\t" + sumFlow; } }
要注意implements Writable介面否則會報錯 Initialization of all the collectors failed. Error in last collector was :null