自定義類bean進行MapReduce引數傳輸
阿新 • • 發佈:2018-12-12
資料型別:
1363157984041 13660577991(手機號) 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站點統計 24 9 6960(上行流量) 690(下行流量) 200
需求: 求每個手機號的上行,下行流量總和,和總流量,並以bean方式傳輸,並按照手機號開頭三位進行分割槽 統計類:
package mrpro927; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /* *需求1:同一個手機號上行,下行,總流量之和 * */ public class phoneDataBeans { // public static class MyMapper extends Mapper<LongWritable, Text, Text, phoneBean>{ Text t = new Text(); phoneBean p = new phoneBean(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, phoneBean>.Context context) throws IOException, InterruptedException { String[] split = value.toString().split("\t"); //清洗出無用資料 if(split.length == 11){ t.set(split[1]); p.setUpflow(Integer.parseInt(split[7])); p.setDownflow(Integer.parseInt(split[8])); p.setSum(p.getUpflow()+p.getDownflow()); context.write(t, p); } } } //每組呼叫一次 public static class MyReducer extends Reducer<Text, phoneBean, Text, phoneBean>{ phoneBean p = new phoneBean(); @Override protected void reduce(Text key, Iterable<phoneBean> values, Reducer<Text, phoneBean, Text, phoneBean>.Context context) throws IOException, InterruptedException { int upsum = 0; int downsum = 0; int sum = 0; for(phoneBean t:values){ upsum += t.getUpflow(); downsum += t.getDownflow(); sum +=t.getSum(); } p.setUpflow(upsum); p.setDownflow(downsum); p.setSum(sum); context.write(key, p); } } public static class MyPartitioner extends Partitioner<Text, phoneBean>{ @Override public int getPartition(Text key, phoneBean value, int numPartitions) { String substring = key.toString().substring(0, 3); if(substring.equals("136")){ return 0; }else if(substring.equals("137")){ return 1; }else if(substring.equals("138")){ return 2; }else if(substring.equals("139")){ return 3; }else { return 4; } } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //載入配置檔案 Configuration conf = new Configuration(); //eclipse執行設定linux使用者名稱 System.setProperty("HADOOP_USER_NAME", "mading"); //啟動一個job Job job = Job.getInstance(conf); //指定當前任務的主類 job.setJarByClass(phoneDataBeans.class); //指定mapper和reducer類 job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); //指定map輸出的key,value型別,如果和reduce的輸出型別相同的情況下可以省略 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(phoneBean.class); //指定reduce輸出的key,value型別 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //指定分割槽演算法 job.setPartitionerClass(MyPartitioner.class); //設定reducetask的並行度 job.setNumReduceTasks(5); //指定檔案輸入的路徑,這裡是HA高可用叢集的路徑 FileInputFormat.addInputPath(job, new Path("hdfs://master:9000/phonedatain")); //指定檔案的輸出路徑 FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000/pout01")); //提交job job.waitForCompletion(true); } }
bean類,注意實現Writable介面
package mrpro927; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; /* * 自定義的類作為MapReduce傳輸物件的時候,必須序列化,實現Writable 介面 */ public class phoneBean implements Writable{ private int upflow; private int downflow; private int sum; public int getUpflow() { return upflow; } public void setUpflow(int upflow) { this.upflow = upflow; } public int getDownflow() { return downflow; } public void setDownflow(int downflow) { this.downflow = downflow; } public int getSum() { return sum; } public void setSum(int sum) { this.sum = sum; } @Override public String toString() { return upflow + "\t" + downflow + "\t" + sum ; } //序列化的方法,物件=》二進位制 //map發到reduce端的的時候先序列化 @Override public void write(DataOutput out) throws IOException { out.writeInt(upflow); out.writeInt(downflow); out.writeInt(sum); } //反序列化的方法,到reduce端的時候進行反序列化,和序列化的順序一定要一致 @Override public void readFields(DataInput in) throws IOException { this.upflow = in.readInt(); this.downflow = in.readInt(); this.sum = in.readInt(); } }