MapReduce排序之 二次排序
阿新 • • 發佈:2019-02-02
一:背景
Hadoop中雖然有自動排序和分組,由於自帶的排序是按照Key進行排序的,有些時候,我們希望同時對Key和Value進行排序。自帶的排序功能就無法滿足我們了,還好Hadoop提供了一些元件可以讓開發人員進行二次排序。
二:技術實現
我們先來看案例需求
#需求1: 首先按照第一列數字升序排列,當第一列數字相同時,第二列數字也升序排列(列之間用製表符\t隔開)
3 3
3 2
3 1
2 2
2 1
1 1
MapReduce計算之後的結果應該是:
1 1
2 1
2 2
3 1
3 2
3 3
#需求2:第一列不相等時,第一列按降序排列,當第一列相等時,第二列按升序排列
MapReduce計算之後的結果應該是:3 3 3 2 3 1 2 2 2 1 1 1
3 1
3 2
3 3
2 1
2 2
1 1
下面是實現程式碼,實現兩種需求的關鍵是compareTo()方法的實現不同:
public class SecondSortTest { // 定義輸入路徑 private static final String INPUT_PATH = "hdfs://liaozhongmin:9000/data"; // 定義輸出路徑 private static final String OUT_PATH = "hdfs://liaozhongmin:9000/out"; public static void main(String[] args) { try { // 建立配置資訊 Configuration conf = new Configuration(); /**********************************************/ //對Map端輸出進行壓縮 //conf.setBoolean("mapred.compress.map.output", true); //設定map端輸出使用的壓縮類 //conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class); //對reduce端輸出進行壓縮 //conf.setBoolean("mapred.output.compress", true); //設定reduce端輸出使用的壓縮類 //conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class); // 新增配置檔案(我們可以在程式設計的時候動態配置資訊,而不需要手動去改變叢集) /* * conf.addResource("classpath://hadoop/core-site.xml"); * conf.addResource("classpath://hadoop/hdfs-site.xml"); * conf.addResource("classpath://hadoop/hdfs-site.xml"); */ // 建立檔案系統 FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf); // 如果輸出目錄存在,我們就刪除 if (fileSystem.exists(new Path(OUT_PATH))) { fileSystem.delete(new Path(OUT_PATH), true); } // 建立任務 Job job = new Job(conf, SecondSortTest.class.getName()); //1.1 設定輸入目錄和設定輸入資料格式化的類 FileInputFormat.setInputPaths(job, INPUT_PATH); job.setInputFormatClass(TextInputFormat.class); //1.2 設定自定義Mapper類和設定map函式輸出資料的key和value的型別 job.setMapperClass(MySecondSortMapper.class); job.setMapOutputKeyClass(CombineKey.class); job.setMapOutputValueClass(LongWritable.class); //1.3 設定分割槽和reduce數量(reduce的數量,和分割槽的數量對應,因為分割槽為一個,所以reduce的數量也是一個) job.setPartitionerClass(HashPartitioner.class); job.setNumReduceTasks(1); //1.4 排序、分組 //1.5 歸約 //2.1 Shuffle把資料從Map端拷貝到Reduce端。 //2.2 指定Reducer類和輸出key和value的型別 job.setReducerClass(MySecondSortReducer.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(LongWritable.class); //2.3 指定輸出的路徑和設定輸出的格式化類 FileOutputFormat.setOutputPath(job, new Path(OUT_PATH)); job.setOutputFormatClass(TextOutputFormat.class); // 提交作業 退出 System.exit(job.waitForCompletion(true) ? 0 : 1); } catch (Exception e) { e.printStackTrace(); } } public static class MySecondSortMapper extends Mapper<LongWritable, Text, CombineKey, LongWritable>{ //定義聯合的key private CombineKey combineKey = new CombineKey(); protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, CombineKey, LongWritable>.Context context) throws IOException, InterruptedException { //對輸入的value進行切分 String[] splits = value.toString().split("\t"); //設定聯合的key combineKey.setComKey(Long.parseLong(splits[0])); combineKey.setComVal(Long.parseLong(splits[1])); //通過context寫出去 context.write(combineKey, new LongWritable(Long.parseLong(splits[1]))); } } public static class MySecondSortReducer extends Reducer<CombineKey, LongWritable, LongWritable, LongWritable>{ @Override protected void reduce(CombineKey combineKey, Iterable<LongWritable> values, Reducer<CombineKey, LongWritable, LongWritable, LongWritable>.Context context) throws IOException, InterruptedException { //因為輸入的CombineKey已經排好序了,所有我們只要獲取其中的兩個成員變數寫出去就可以了。values在這個例子中沒有什麼作用 context.write(new LongWritable(combineKey.getComKey()), new LongWritable(combineKey.getComVal())); } } } /** * 重新組合成一個key,實現二次排序 * @author 廖*民 * time : 2015年1月18日下午7:27:52 * @version */ class CombineKey implements WritableComparable<CombineKey>{ public long comKey; public long comVal; //必須提供無參建構函式,否則hadoop反射機制會出錯 public CombineKey() { } //有參建構函式 public CombineKey(long comKey, long comVal) { this.comKey = comKey; this.comVal = comVal; } public long getComKey() { return comKey; } public void setComKey(long comKey) { this.comKey = comKey; } public long getComVal() { return comVal; } public void setComVal(long comVal) { this.comVal = comVal; } public void write(DataOutput out) throws IOException { out.writeLong(comKey); out.writeLong(comVal); } public void readFields(DataInput in) throws IOException { this.comKey = in.readLong(); this.comVal = in.readLong(); } /** * 這個方法一定要實現 * java裡面排序預設是小的放在前面,即返回負數的放在前面,這樣就是所謂的升序排列 * 我們在下面的方法中直接返回一個差值,也就相當於會升序排列。 * 如果我們要實現降序排列,那麼我們就可以返回一個正數 */ /*public int compareTo(CombineKey o) { //第一列不相同時按升序排列,當第一列相同時第二列按升序排列 long minus = this.comKey - o.comKey; //如果第一個值不相等時,我們就先對第一列進行排序 if (minus != 0){ return (int) minus; } //如果第一列相等時,我們就對第二列進行排序 return (int) (this.comVal - o.comVal); }*/ /** * 為了實現第一列不同時按降序排序,第一列相同時第二列按升序排列 * 第一列:降序,當第一列相同時,第二列:升序 * 為了實現降序, */ public int compareTo(CombineKey o) { //如果a-b<0即,a小於b,按這樣 的思路應該是升序排列,我們可以返回一個相反數使其降序 long tmp = this.comKey - o.comKey; //如果第一個值不相等時,我們就先對第一列進行排序 if (tmp != 0){ return (int) (-tmp); } //如果第一列相等時,我們就對第二列進行升序排列 return (int) (this.comVal - o.comVal); } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + (int) (comKey ^ (comKey >>> 32)); return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; CombineKey other = (CombineKey) obj; if (comKey != other.comKey) return false; return true; } }
程式執行結果就不貼了,和預想的一樣!