流量彙總案例程式設計,
阿新 • • 發佈:2018-12-15
public class FlowCountMapper extends Mapper<LongWritable,Text,Text,FlowBean>{ public void map(LongWritable key, Text value, Context context) throws IOException,InterruptedException{ //1、獲取資料 String line = value.toString(); //2、切片 String[] fields = line.split("\t"); //3、封裝物件,拿到關鍵字,資料清洗 String phoneN = fields[1]; long upFlow = long.parseLong(fields[fields.length-3]); long dfFlow = long.parseLong(fields[fields.length-2]); //4、輸出到reduce端 context.write(new Text(phoneN), new FlowBean(upFlow,dfFlow)); } } public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean>{ pubic void reducer<Text key, Iterable<FlowBean> value, Context context> throws Exception{ //1、相同手機號的流量使用兩次彙總 long upFlow_sum = 0; long dfFlow_sum = 0; //2、累加 for(FlowBean f : value){ upFlow_sum = f.getUpFlow(); dfFlow_sum = f.getDfFlow(); } FlowBean rs = new FlowBean(upFlow_sum, dfFlow_sum); //3、輸出 context.write(key,rs); } } public class FlowCountDriver{ public static void main(String[] args){ //1)獲取job資訊 Configuration conf = new Configuration(); Job job = Job.getInstance(); //2)獲取jar包 job.setJarByClass(FlowCountDriver.class); //3)獲取自定義的mapper和reducer類 job.setMapperClass(FlowCountMapper.class); job.setReducerClass(FlowCountReducer.class); //4)設定map輸出的資料型別 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); //5)設定reduce輸出的資料型別 job.setOutputkeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //6)設定輸入存在的路徑與處理後的結果路徑 FileInputFormat.setInputPaths(job,new Path("/wc/in")); FileOutputFormat.setOutputPaths(job,new Path("/wc/out")); //7)提交任務 boolean rs = job.waitForCompletion(true); System.out.println(rs?0:1); } } public class FlowBean implements Writable{ private long upFlow; private long dfFlow; private long flowsum; pulibc FlowBean(){ } public FlowBean(long upFlow, long dfFlow){ this.upFlow = upFlow; this.dfFlow = dfFlow; this.flowsum = upFlow+dfFlow; } setter getter @Override public void readFields(DataInput in) throws Exception{ //反序列化 upFlow = in.readLong(); dfFlow = in.readLong(); flowsum = in.readLong(); } @Override public void write(DataOutput out) throws Exception{ //序列化 out.writeLong(upFlow); out.writeLong(dfFlow); out.writeLong(flowsum); } @Override public String toString(){ return upFlow+"\t"+dfFlow+"\t"+flowsum; } }