自定義bean物件實現序列化介面(Writable)以及實現案例
阿新 • • 發佈:2018-12-20
-
自定義bean物件序列化傳輸必須注意
(1) 實現Writable介面 (2)反序列化時,需要反射呼叫空建構函式,所以必須有空構造
public FlowBean() { super(); }
(3)重寫序列化方法
@Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); }
(4)重寫反序列化方法
@Override public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); }
(5)反序列化的順序和序列化的順序完全一致 (6)要想把結果顯示在檔案中,需要重寫toString(),可以用"\t"分開,以便後續用 (7)如果需要將自定義的bean放key中傳輸,則還要實現Comparable介面,因為MapReduce框中的shuffle過程一定會對key排序
@Override public int compareTo(FlowBean o) { // 倒序排列,從大到小 return this.sumFlow > o.getSumFlow() ? -1 : 1; }
物件
package com.zyd.flowsum; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Set; import org.apache.hadoop.io.Writable; /** * Bean物件要需要實現Writable介面 * @author Administrator * */ public class FlowBean implements Writable{ private long upFlow; //上行流量 private long downFlow; //下行流量 private long sumFlow; //總流量 //必須要有空參構造,為了後續反射用 public FlowBean() { super(); } public FlowBean(long upFlow, long downFlow) { super(); this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow+downFlow;//總流量 } //負責累加上行流量和下行流量 public void set(long upFlow, long downFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow+downFlow;//總流量 } //改造toString方法,方便後續處理資料 @Override public String toString() { return upFlow+"\t"+ downFlow+ "\t" + sumFlow; } //反序列化方法 @Override public void readFields(DataInput in) throws IOException { //順序一致 this.upFlow = in.readLong(); this.downFlow = in.readLong(); this.sumFlow = in.readLong(); } //序列化方法 @Override public void write(DataOutput out) throws IOException { //順序一致 out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } //由於物件是作為value傳輸,所以不重寫compareTo方法 key是手機號 }
Mapper類
package com.zyd.flowsum; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; //輸入:行號 LongWritable 行內容Text 輸出 key:手機號 Text value:FlowBean public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> { FlowBean v = new FlowBean(); Text k = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200 //1 獲取一行 String line = value.toString(); //2 切割 String[] fields= line.split("\t"); //3 封裝物件 //手機號 String phoneNum = fields[1]; //上行流量 long upFlow = Long.parseLong(fields[fields.length-3]); //下行流量 long downFlow = Long.parseLong(fields[fields.length-2]); k.set(phoneNum); v.set(upFlow, downFlow); //4 寫出資料 context.write(k, v); } }
Reducer
package com.zyd.flowsum;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
@Override
protected void reduce(Text key, Iterable<FlowBean> values,
Context context)
throws IOException, InterruptedException {
//1 累加求和 相同的求和操作
//對整個資料上行流量和下行流量的求和
long sum_upFlow = 0;
long sum_downFlow = 0;
for (FlowBean flowBean : values) {
sum_upFlow += flowBean.getUpFlow();
sum_downFlow += flowBean.getDownFlow();
}
//2. 輸出
context.write(key, new FlowBean(sum_upFlow,sum_downFlow));
}
}
Driver
package com.zyd.flowsum;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FlowDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1 獲取job物件
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2 設定jar包路徑
job.setJarByClass(FlowDriver.class);
//3 管理類mapper和Reducer
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
//4 設定mapper輸出k,v型別
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
//5 設定最終輸出k,v型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//6 設定輸入輸出路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//7 提交
boolean result = job.waitForCompletion(true);
System.exit(result?0:1);
}
}
原資料
1363157993055 13565436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1514 92054 200
1363157993055 13560436866 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1916 914 200
1363157993055 13568436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 11616 95 200
1363157993055 13760436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 11126 95420 200
1363157993055 13564436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 11856 954 200
1363157993055 13561536666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 11685 952 200
1363157993055 13560485666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1916 9504 200
1363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1616 1954 200
1363157993055 13560463666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1216 9354 200
1363157993055 13560488666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1616 9543 200
1363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1316 95420 200
1363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1316 95474 200
1363157993055 13560445666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 16616 95489 200
1363157993055 13560436856 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 11136 9524 200
1363157993055 13560478666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 11162 9554 200
1363157993055 13560445666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 16616 95489 200
1363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 11165 9954 200
1363157993055 13566436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 11126 9534 200
1363157993055 13460436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1162 9054 200
1363157993055 13560936666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 11201 91254 200
1363157993055 13560445666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 16616 95489 200
1363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1616 1954 200
1363157993055 13560436866 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1916 952 200
1363157993055 13568436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 11616 95 200
1363157993055 13760436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 11126 9542 200
1363157993055 13560445666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 16616 95489 200
1363157993055 13564436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 11856 95452 200