1. 程式人生 > >自定義bean物件實現序列化介面(Writable)以及實現案例

自定義bean物件實現序列化介面(Writable)以及實現案例

  1. 自定義bean物件序列化傳輸必須注意

    (1) 實現Writable介面 (2)反序列化時,需要反射呼叫空建構函式,所以必須有空構造

     	public FlowBean() {
     	super();
     	}
    

    (3)重寫序列化方法

     @Override
     public void write(DataOutput out) throws IOException {
     out.writeLong(upFlow);
     out.writeLong(downFlow);
     out.writeLong(sumFlow);
     }
    

    (4)重寫反序列化方法

     @Override
     public void readFields(DataInput in) throws IOException {
     upFlow = in.readLong();
     downFlow = in.readLong();
     sumFlow = in.readLong();
     }
    

    (5)反序列化的順序和序列化的順序完全一致 (6)要想把結果顯示在檔案中,需要重寫toString(),可以用"\t"分開,以便後續用 (7)如果需要將自定義的bean放key中傳輸,則還要實現Comparable介面,因為MapReduce框中的shuffle過程一定會對key排序

     @Override
     public int compareTo(FlowBean o) {
     // 倒序排列,從大到小
     return this.sumFlow > o.getSumFlow() ? -1 : 1;
     }
    

物件

	package com.zyd.flowsum;
	
	import java.io.DataInput;
	import java.io.DataOutput;
	import java.io.IOException;
	import java.util.Set;
	
	import org.apache.hadoop.io.Writable;
	/**
	 * Bean物件要需要實現Writable介面
	 * @author Administrator
	 *
	 */
	public class FlowBean implements Writable{
	
		private long upFlow; //上行流量 
		
		private long downFlow; //下行流量
		
		private long sumFlow; //總流量
		
		//必須要有空參構造,為了後續反射用
		public FlowBean() {
			super();
		}
	
		
		public FlowBean(long upFlow, long downFlow) {
			super();
			this.upFlow = upFlow;
			this.downFlow = downFlow;
			this.sumFlow = upFlow+downFlow;//總流量
		}
		
		//負責累加上行流量和下行流量
			public void set(long upFlow, long downFlow) {
				this.upFlow = upFlow;
				this.downFlow = downFlow;
				this.sumFlow = upFlow+downFlow;//總流量
			}
		
		//改造toString方法,方便後續處理資料
		@Override
		public String toString() {
			return  upFlow+"\t"+ downFlow+ "\t" + sumFlow;
		}
	
	
		//反序列化方法
		@Override
		public void readFields(DataInput in) throws IOException {
			//順序一致
			this.upFlow = in.readLong();
			this.downFlow = in.readLong();
			this.sumFlow = in.readLong();
			
		}
		
		
		//序列化方法
		@Override
		public void write(DataOutput out) throws IOException {
			//順序一致
			out.writeLong(upFlow);
			out.writeLong(downFlow);
			out.writeLong(sumFlow);
		}
	
	
		public long getUpFlow() {
			return upFlow;
		}
	
	
		public void setUpFlow(long upFlow) {
			this.upFlow = upFlow;
		}
	
	
		public long getDownFlow() {
			return downFlow;
		}
	
	
		public void setDownFlow(long downFlow) {
			this.downFlow = downFlow;
		}
	
	
		public long getSumFlow() {
			return sumFlow;
		}
	
	
		public void setSumFlow(long sumFlow) {
			this.sumFlow = sumFlow;
		}
	
		
		
		//由於物件是作為value傳輸,所以不重寫compareTo方法 key是手機號
	
	}

Mapper類

package com.zyd.flowsum;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
//輸入:行號 LongWritable 行內容Text 輸出 key:手機號 Text value:FlowBean
public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
	FlowBean v = new FlowBean();
	Text k = new Text();
	
	
	@Override
	protected void map(LongWritable key, Text value,
			Context context)
			throws IOException, InterruptedException {
		//1363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18  15  1116  954  200
		//1 獲取一行
		String line = value.toString();
		
		//2 切割
		String[] fields= line.split("\t");
		//3 封裝物件
		//手機號
		String phoneNum = fields[1];
		//上行流量
		long upFlow = Long.parseLong(fields[fields.length-3]);
		//下行流量
		long downFlow = Long.parseLong(fields[fields.length-2]);
		
		
		k.set(phoneNum);
		
		v.set(upFlow, downFlow);
		//4 寫出資料
		context.write(k, v);
	}
}

Reducer

package com.zyd.flowsum;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {

	@Override
	protected void reduce(Text key, Iterable<FlowBean> values,
			Context context)
			throws IOException, InterruptedException {
		//1 累加求和 相同的求和操作
		//對整個資料上行流量和下行流量的求和
		long sum_upFlow = 0;
		long sum_downFlow = 0;
		for (FlowBean flowBean : values) {
			sum_upFlow += flowBean.getUpFlow();
			sum_downFlow += flowBean.getDownFlow();
		}
		//2. 輸出
		context.write(key, new FlowBean(sum_upFlow,sum_downFlow));
	}
}

Driver

	package com.zyd.flowsum;
	
	import java.io.IOException;
	
	import org.apache.hadoop.conf.Configuration;
	import org.apache.hadoop.fs.Path;
	import org.apache.hadoop.io.Text;
	import org.apache.hadoop.mapreduce.Job;
	import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
	import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
	
	public class FlowDriver {
		public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
			//1 獲取job物件
			Configuration conf = new Configuration();
			Job job = Job.getInstance(conf);
			//2 設定jar包路徑
			job.setJarByClass(FlowDriver.class);
			//3 管理類mapper和Reducer
			job.setMapperClass(FlowMapper.class);
			job.setReducerClass(FlowReducer.class);
			//4 設定mapper輸出k,v型別
			job.setMapOutputKeyClass(Text.class);
			job.setMapOutputValueClass(FlowBean.class);
			//5 設定最終輸出k,v型別
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(FlowBean.class);
			//6 設定輸入輸出路徑
			FileInputFormat.setInputPaths(job, new Path(args[0]));
			FileOutputFormat.setOutputPath(job, new Path(args[1]));
			//7 提交
			boolean result = job.waitForCompletion(true);
			System.exit(result?0:1);
		}
	}

原資料

1363157993055	13565436666 	C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18  15  	1514	92054	200
1363157993055 	13560436866 	C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18  15  	1916	914	200
1363157993055 	13568436666 	C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18  15  	11616	95	200
1363157993055 	13760436666 	C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18  15  	11126	95420	200
1363157993055 	13564436666 	C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18  15  	11856	954	200
1363157993055 	13561536666 	C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18  15  	11685	952	200
1363157993055 	13560485666 	C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18  15  	1916	9504	200
1363157993055 	13560436666 	C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18  15  	1616	1954	200
1363157993055 	13560463666 	C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18  15  	1216	9354	200
1363157993055 	13560488666 	C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18  15  	1616	9543	200
1363157993055 	13560436666 	C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18  15  	1316	95420	200
1363157993055 	13560436666 	C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18  15  	1316	95474	200
1363157993055 	13560445666 	C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18  15  	16616	95489	200
1363157993055 	13560436856 	C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18  15  	11136	9524	200
1363157993055 	13560478666 	C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18  15  	11162	9554	200
1363157993055 	13560445666 	C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18  15  	16616	95489	200
1363157993055 	13560436666 	C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18  15  	11165	9954	200
1363157993055 	13566436666 	C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18  15  	11126	9534	200
1363157993055 	13460436666 	C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18  15  	1162	9054	200
1363157993055 	13560936666 	C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18  15  	11201	91254	200
1363157993055 	13560445666 	C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18  15  	16616	95489	200
1363157993055 	13560436666 	C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18  15  	1616	1954	200
1363157993055 	13560436866 	C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18  15  	1916	952	200
1363157993055 	13568436666 	C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18  15  	11616	95	200
1363157993055 	13760436666 	C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18  15  	11126	9542	200
1363157993055 	13560445666 	C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18  15  	16616	95489	200
1363157993055 	13564436666 	C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18  15  	11856	95452	200