1. 程式人生 > >自定義物件實現 MapReduce 框架的序列化及排序

自定義物件實現 MapReduce 框架的序列化及排序

如果需要將自定義的 bean 放在 key 中傳輸,則還需要實現 Comparable 介面,因為 MapReduce框中的 shuffle 過程一定會對 key 進行排序,此時,自定義的 bean 實現的介面應該是:public class FlowBean implements WritableComparable<FlowBean>:

例:

進行了序列化的 Flow 類:

package flow.pojo;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

/**
 * 使用者自定義的POJO類如果充當key的話,那麼必須要進行序列化操作和執行排序規則
 * 
 * 讓 Flow實現Writable介面,就是讓該類具有序列化和反序列化的能力
 * 
 * 真正的操作: 其實就是把當前的某個物件,進行序列化,就是把屬性值通過流進行傳輸到其他的儲存介質或者流
 * 
 * 實現序列化操作
 * 實現反序列化操作
 * 指定排序規則
 *
 */


public class Flow  implements WritableComparable<Flow>{

	private String phone;
	private long upFlow;
	private long downFlow;
	private long sumFlow;
	
	
	
	public String getPhone() {
		return phone;
	}

	public void setPhone(String phone) {
		this.phone = phone;
	}

	public long getUpFlow() {
		return upFlow;
	}

	public void setUpFlow(long upFlow) {
		this.upFlow = upFlow;
	}

	public long getDownFlow() {
		return downFlow;
	}

	public void setDownFlow(long downFlow) {
		this.downFlow = downFlow;
	}

	public long getSumFlow() {
		return sumFlow;
	}

	public void setSumFlow(long sumFlow) {
		this.sumFlow = sumFlow;
	}
		
	public Flow() {
		super();
	}

	public Flow(String phone, long upFlow, long downFlow, long sumFlow) {
		super();
		this.phone = phone;
		this.upFlow = upFlow;
		this.downFlow = downFlow;
		this.sumFlow = sumFlow;
	}

	@Override
	public String toString() {
		return "Flow [phone=" + phone + ", upFlow=" + upFlow + ", downFlow=" + downFlow + ", sumFlow=" + sumFlow + "]";
	}


	/**
	 * 序列化方法
	 */
	@Override
	public void write(DataOutput out) throws IOException {

		out.writeUTF(phone);
		out.writeLong(upFlow);
		out.writeLong(downFlow);
		out.writeLong(sumFlow);

	}
	
	/**
	 * 反序列化操作
	 */
	@Override
	public void readFields(DataInput in) throws IOException {
		
		this.phone = in.readUTF();
		this.upFlow = in.readLong();
		this.downFlow = in.readLong();
		this.sumFlow = in.readLong();
		
	}
	

	/**
	 * 排序規則
	 */
	@Override
	public int compareTo(Flow o) {
		/**
		 * 按照總流量 從大到小
		 */
		long diff = o.getSumFlow() - this.getSumFlow();
		if(diff == 0){
			return 0;
		}else{
			return diff > 0 ? 1 : -1;
		}
		
	}

}

統計上行流量和下行流量之和並且按照流量大小倒序排序的 MR 程式Flow2MR :

package flow.pojo;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
* 實現流量彙總並且按照流量大小倒序排序 前提:處理的資料是已經彙總過的結果檔案
*/

public class Flow2MR {
	// 在 kv 中傳輸我們自定義的物件是可以的,但是必須實現 hadoop 的序列化機制 implements Writable, 如果要排序,
	// 還要實現 Comparable 介面, hadoop 為 我 們 提 供 了 一 個 方 便 的 類 , 叫 做 WritableComparable,直接實現就好
	
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		//叢集
		conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
		System.setProperty("HADOOP_USER_NAME", "hadoop");
		
		Job job = Job.getInstance(conf);
		// 告訴框架,我們的程式所在 jar 包的路徑
		job.setJarByClass(Flow2MR.class);
		
		// 告訴框架,我們的程式所用的 mapper 類和 reducer 類
		job.setMapperClass(Flow2MRMapper.class);	
//		job.setReducerClass(Flow2MRReducer.class);
		
		// 告訴框架,我們的 mapperreducer 輸出的資料型別
		job.setMapOutputKeyClass(Flow.class);
		job.setMapOutputValueClass(NullWritable.class);	
		
//		// 如果reducer階段的輸出的key-value的型別和mapper階段的一致,那麼可以省略前面的setMapOutClass()
//		job.setOutputKeyClass(Text.class);
//		job.setOutputValueClass(Text.class);
		
		
		// 框架中預設的輸入輸出元件就是這倆貨,所以可以省略這兩行程式碼
		/*
		* job.setInputFormatClass(TextInputFormat.class);
		* job.setOutputFormatClass(TextOutputFormat.class);
		*/
		
		// 告訴框架,我們要處理的檔案在哪個路徑下
		Path inputPath = new Path(args[0]);
		// 告訴框架,我們的處理結果要輸出到哪裡去
		Path outputPath = new Path(args[1]);
		FileInputFormat.setInputPaths(job, inputPath);
		FileSystem fs = FileSystem.get(conf);
		if(fs.exists(outputPath)){
			fs.delete(outputPath,true);
		}
		FileOutputFormat.setOutputPath(job, outputPath);
		
		
		boolean isDone = job.waitForCompletion(true);
		System.exit(isDone ? 0 : 1);
			
	}
	
	
	/**
	 * 		Mapper階段的業務邏輯
	 * 
	 * 	null也有對於的參與序列化的指定型別: NullWritable
	 */
	private static class Flow2MRMapper extends Mapper<LongWritable, Text, Flow, NullWritable>{

		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			// 將讀到的一行資料進行欄位切分
			String[] split = value.toString().split("\t");
			// 抽取業務所需要的各欄位
			String phone = split[0];
			long upFlow = Long.parseLong(split[1]);
			long downFlow = Long.parseLong(split[2]);
			long sumFlow = Long.parseLong(split[3]);
			Flow flow = new Flow(phone, upFlow, downFlow, sumFlow);
			
			context.write(flow, NullWritable.get());
		}
		
	}
	
	/**
	 * Reducer階段的業務邏輯
	 */
	private static class Flow2MRReducer extends Reducer<Text, Text, Text, Text>{

		// reduce 方法接收到的 key 是某一組<a 手機號,bean><a 手機號,bean><a 手機號,bean>中的第一個手機號
		// reduce 方法接收到的 vlaues 是這一組 kv 中的所有 bean 的一個迭代器
					
		@Override
		protected void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {
			/**
			 * 在當前排序操作中,根本不需要 reducer階段去指定 一些邏輯
			 * 
			 * 但是需要Reducer階段: 因為只有有reducer階段,最終的結果集才會按照key進行排序
			 */
		
		}

		
		
	}
	
	

}