1. 程式人生 > >MapReduce排序之 二次排序

MapReduce排序之 二次排序

一:背景

Hadoop中雖然有自動排序和分組,由於自帶的排序是按照Key進行排序的,有些時候,我們希望同時對Key和Value進行排序。自帶的排序功能就無法滿足我們了,還好Hadoop提供了一些元件可以讓開發人員進行二次排序。

二:技術實現

我們先來看案例需求

#需求1: 首先按照第一列數字升序排列,當第一列數字相同時,第二列數字也升序排列(列之間用製表符\t隔開)

3	3
3	2
3	1
2	2
2	1
1	1
MapReduce計算之後的結果應該是:
1	1
2	1
2	2
3	1
3	2
3	3

#需求2:第一列不相等時,第一列按降序排列,當第一列相等時,第二列按升序排列
3	3
3	2
3	1
2	2
2	1
1	1
MapReduce計算之後的結果應該是:
3	1
3	2
3	3
2	1
2	2
1	1


下面是實現程式碼,實現兩種需求的關鍵是compareTo()方法的實現不同:
public class SecondSortTest {

	// 定義輸入路徑
		private static final String INPUT_PATH = "hdfs://liaozhongmin:9000/data";
		// 定義輸出路徑
		private static final String OUT_PATH = "hdfs://liaozhongmin:9000/out";

		public static void main(String[] args) {

			try {
				// 建立配置資訊
				Configuration conf = new Configuration();
				
				/**********************************************/
				//對Map端輸出進行壓縮
				//conf.setBoolean("mapred.compress.map.output", true);
				//設定map端輸出使用的壓縮類
				//conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);
				//對reduce端輸出進行壓縮
				//conf.setBoolean("mapred.output.compress", true);
				//設定reduce端輸出使用的壓縮類
				//conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);
				// 新增配置檔案(我們可以在程式設計的時候動態配置資訊,而不需要手動去改變叢集)
				/*
				 * conf.addResource("classpath://hadoop/core-site.xml"); 
				 * conf.addResource("classpath://hadoop/hdfs-site.xml");
				 * conf.addResource("classpath://hadoop/hdfs-site.xml");
				 */

				// 建立檔案系統
				FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
				// 如果輸出目錄存在,我們就刪除
				if (fileSystem.exists(new Path(OUT_PATH))) {
					fileSystem.delete(new Path(OUT_PATH), true);
				}

				// 建立任務
				Job job = new Job(conf, SecondSortTest.class.getName());

				//1.1	設定輸入目錄和設定輸入資料格式化的類
				FileInputFormat.setInputPaths(job, INPUT_PATH);
				job.setInputFormatClass(TextInputFormat.class);

				//1.2	設定自定義Mapper類和設定map函式輸出資料的key和value的型別
				job.setMapperClass(MySecondSortMapper.class);
				job.setMapOutputKeyClass(CombineKey.class);
				job.setMapOutputValueClass(LongWritable.class);

				//1.3	設定分割槽和reduce數量(reduce的數量,和分割槽的數量對應,因為分割槽為一個,所以reduce的數量也是一個)
				job.setPartitionerClass(HashPartitioner.class);
				job.setNumReduceTasks(1);

				//1.4	排序、分組
				//1.5	歸約
				//2.1	Shuffle把資料從Map端拷貝到Reduce端。
				//2.2	指定Reducer類和輸出key和value的型別
				job.setReducerClass(MySecondSortReducer.class);
				job.setOutputKeyClass(LongWritable.class);
				job.setOutputValueClass(LongWritable.class);

				//2.3	指定輸出的路徑和設定輸出的格式化類
				FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
				job.setOutputFormatClass(TextOutputFormat.class);


				// 提交作業 退出
				System.exit(job.waitForCompletion(true) ? 0 : 1);
			
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	
	public static class MySecondSortMapper extends Mapper<LongWritable, Text, CombineKey, LongWritable>{
		
		//定義聯合的key
		private CombineKey combineKey = new CombineKey();
		
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, CombineKey, LongWritable>.Context context) throws IOException,
				InterruptedException {
			//對輸入的value進行切分
			String[] splits = value.toString().split("\t");
			//設定聯合的key
			combineKey.setComKey(Long.parseLong(splits[0]));
			combineKey.setComVal(Long.parseLong(splits[1]));
			
			//通過context寫出去
			context.write(combineKey, new LongWritable(Long.parseLong(splits[1])));
		}
	}
	
	
	public static class MySecondSortReducer extends Reducer<CombineKey, LongWritable, LongWritable, LongWritable>{
		@Override
		protected void reduce(CombineKey combineKey, Iterable<LongWritable> values, Reducer<CombineKey, LongWritable, LongWritable, LongWritable>.Context context)
				throws IOException, InterruptedException {
			//因為輸入的CombineKey已經排好序了,所有我們只要獲取其中的兩個成員變數寫出去就可以了。values在這個例子中沒有什麼作用
			context.write(new LongWritable(combineKey.getComKey()), new LongWritable(combineKey.getComVal()));
		}
	}

}

/**
 * 重新組合成一個key,實現二次排序
 * @author 廖*民
 * time : 2015年1月18日下午7:27:52
 * @version
 */
class CombineKey implements WritableComparable<CombineKey>{

	public long comKey;
	public long comVal;
	
	//必須提供無參建構函式,否則hadoop反射機制會出錯
	public CombineKey() {
		
	}
	//有參建構函式
	public CombineKey(long comKey, long comVal) {
		this.comKey = comKey;
		this.comVal = comVal;
	}

	
	
	public long getComKey() {
		return comKey;
	}
	public void setComKey(long comKey) {
		this.comKey = comKey;
	}
	public long getComVal() {
		return comVal;
	}
	public void setComVal(long comVal) {
		this.comVal = comVal;
	}
	
	public void write(DataOutput out) throws IOException {
		out.writeLong(comKey);
		out.writeLong(comVal);
	}

	public void readFields(DataInput in) throws IOException {
		this.comKey = in.readLong();
		this.comVal = in.readLong();
	}

	/**
	 * 這個方法一定要實現
	 * java裡面排序預設是小的放在前面,即返回負數的放在前面,這樣就是所謂的升序排列
	 * 我們在下面的方法中直接返回一個差值,也就相當於會升序排列。
	 * 如果我們要實現降序排列,那麼我們就可以返回一個正數
	 */
	/*public int compareTo(CombineKey o) {
		//第一列不相同時按升序排列,當第一列相同時第二列按升序排列
		long minus = this.comKey - o.comKey;
		//如果第一個值不相等時,我們就先對第一列進行排序
		if (minus != 0){
			return (int) minus;
		}
		//如果第一列相等時,我們就對第二列進行排序
		return (int) (this.comVal - o.comVal);
	}*/
	
	/**
	 * 為了實現第一列不同時按降序排序,第一列相同時第二列按升序排列
	 * 第一列:降序,當第一列相同時,第二列:升序
	 * 為了實現降序,
	 */
	public int compareTo(CombineKey o) {
		//如果a-b<0即,a小於b,按這樣 的思路應該是升序排列,我們可以返回一個相反數使其降序
		long tmp = this.comKey - o.comKey;
		//如果第一個值不相等時,我們就先對第一列進行排序
		if (tmp != 0){
			return (int) (-tmp);
		}
		//如果第一列相等時,我們就對第二列進行升序排列
		return (int) (this.comVal - o.comVal);
	}
	
	
	@Override
	public int hashCode() {
		final int prime = 31;
		int result = 1;
		result = prime * result + (int) (comKey ^ (comKey >>> 32));
		return result;
	}
	@Override
	public boolean equals(Object obj) {
		if (this == obj)
			return true;
		if (obj == null)
			return false;
		if (getClass() != obj.getClass())
			return false;
		CombineKey other = (CombineKey) obj;
		if (comKey != other.comKey)
			return false;
		return true;
	}
	
}


程式執行結果就不貼了,和預想的一樣!