自定義物件實現 MapReduce 框架的序列化及排序
阿新 • • 發佈:2019-01-31
如果需要將自定義的 bean 放在 key 中傳輸,則還需要實現 Comparable 介面,因為 MapReduce框中的 shuffle 過程一定會對 key 進行排序,此時,自定義的 bean 實現的介面應該是:public class FlowBean implements WritableComparable<FlowBean>:
例:
進行了序列化的 Flow 類:
package flow.pojo; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; /** * 使用者自定義的POJO類如果充當key的話,那麼必須要進行序列化操作和執行排序規則 * * 讓 Flow實現Writable介面,就是讓該類具有序列化和反序列化的能力 * * 真正的操作: 其實就是把當前的某個物件,進行序列化,就是把屬性值通過流進行傳輸到其他的儲存介質或者流 * * 實現序列化操作 * 實現反序列化操作 * 指定排序規則 * */ public class Flow implements WritableComparable<Flow>{ private String phone; private long upFlow; private long downFlow; private long sumFlow; public String getPhone() { return phone; } public void setPhone(String phone) { this.phone = phone; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } public Flow() { super(); } public Flow(String phone, long upFlow, long downFlow, long sumFlow) { super(); this.phone = phone; this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = sumFlow; } @Override public String toString() { return "Flow [phone=" + phone + ", upFlow=" + upFlow + ", downFlow=" + downFlow + ", sumFlow=" + sumFlow + "]"; } /** * 序列化方法 */ @Override public void write(DataOutput out) throws IOException { out.writeUTF(phone); out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } /** * 反序列化操作 */ @Override public void readFields(DataInput in) throws IOException { this.phone = in.readUTF(); this.upFlow = in.readLong(); this.downFlow = in.readLong(); this.sumFlow = in.readLong(); } /** * 排序規則 */ @Override public int compareTo(Flow o) { /** * 按照總流量 從大到小 */ long diff = o.getSumFlow() - this.getSumFlow(); if(diff == 0){ return 0; }else{ return diff > 0 ? 1 : -1; } } }
統計上行流量和下行流量之和並且按照流量大小倒序排序的 MR 程式Flow2MR :
package flow.pojo; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 實現流量彙總並且按照流量大小倒序排序 前提:處理的資料是已經彙總過的結果檔案 */ public class Flow2MR { // 在 kv 中傳輸我們自定義的物件是可以的,但是必須實現 hadoop 的序列化機制 implements Writable, 如果要排序, // 還要實現 Comparable 介面, hadoop 為 我 們 提 供 了 一 個 方 便 的 類 , 叫 做 WritableComparable,直接實現就好 public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); //叢集 conf.set("fs.defaultFS", "hdfs://hadoop01:9000"); System.setProperty("HADOOP_USER_NAME", "hadoop"); Job job = Job.getInstance(conf); // 告訴框架,我們的程式所在 jar 包的路徑 job.setJarByClass(Flow2MR.class); // 告訴框架,我們的程式所用的 mapper 類和 reducer 類 job.setMapperClass(Flow2MRMapper.class); // job.setReducerClass(Flow2MRReducer.class); // 告訴框架,我們的 mapperreducer 輸出的資料型別 job.setMapOutputKeyClass(Flow.class); job.setMapOutputValueClass(NullWritable.class); // // 如果reducer階段的輸出的key-value的型別和mapper階段的一致,那麼可以省略前面的setMapOutClass() // job.setOutputKeyClass(Text.class); // job.setOutputValueClass(Text.class); // 框架中預設的輸入輸出元件就是這倆貨,所以可以省略這兩行程式碼 /* * job.setInputFormatClass(TextInputFormat.class); * job.setOutputFormatClass(TextOutputFormat.class); */ // 告訴框架,我們要處理的檔案在哪個路徑下 Path inputPath = new Path(args[0]); // 告訴框架,我們的處理結果要輸出到哪裡去 Path outputPath = new Path(args[1]); FileInputFormat.setInputPaths(job, inputPath); FileSystem fs = FileSystem.get(conf); if(fs.exists(outputPath)){ fs.delete(outputPath,true); } FileOutputFormat.setOutputPath(job, outputPath); boolean isDone = job.waitForCompletion(true); System.exit(isDone ? 0 : 1); } /** * Mapper階段的業務邏輯 * * null也有對於的參與序列化的指定型別: NullWritable */ private static class Flow2MRMapper extends Mapper<LongWritable, Text, Flow, NullWritable>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 將讀到的一行資料進行欄位切分 String[] split = value.toString().split("\t"); // 抽取業務所需要的各欄位 String phone = split[0]; long upFlow = Long.parseLong(split[1]); long downFlow = Long.parseLong(split[2]); long sumFlow = Long.parseLong(split[3]); Flow flow = new Flow(phone, upFlow, downFlow, sumFlow); context.write(flow, NullWritable.get()); } } /** * Reducer階段的業務邏輯 */ private static class Flow2MRReducer extends Reducer<Text, Text, Text, Text>{ // reduce 方法接收到的 key 是某一組<a 手機號,bean><a 手機號,bean><a 手機號,bean>中的第一個手機號 // reduce 方法接收到的 vlaues 是這一組 kv 中的所有 bean 的一個迭代器 @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { /** * 在當前排序操作中,根本不需要 reducer階段去指定 一些邏輯 * * 但是需要Reducer階段: 因為只有有reducer階段,最終的結果集才會按照key進行排序 */ } } }