1. 程式人生 > >MapReduce的二次排序

MapReduce的二次排序

  這裡介紹二次排序的思路整理,並附上具體程式碼
    首先要明確二次排序的基本概念:在我們所之前所熟悉的排序稱為一次排序,即只對key進行排序
      所以二次排序的概念在原來的基礎上便不難理解,即對key進行排序的同時對value進行排序
     (1)二次排序非標準版
    所謂二排非標準版:即所有的第二次排序過程全部放到Reduce進行操作,將所有相同key傳進來的
       values放進一個集合,然後呼叫Collection.sort方法對其進行排序,reduce階段壓力大,不建議使用
    程式碼實現:此處只附上reduce階段的程式碼部分

          (2)二次排序標準版
    在標準版二次排序中,將原始的<key,value>對兒轉換成新的Newkey,value 
    其中Newkey由原來的<key,value>組成而value值不變,即形成了一個大key的概念
    Map 輸出結構為<newKey,value>=<(key,value),value>
    1.由於引入了newkey,所以我們要自己定義newkey類,在其類中實現WritableComparable介面
      上面的介面中會要求我們實現其compareTo的方法,按照我們的需求,則該方法主體是在進行依次對
       key和value進行比較。
    2.自 定 義 partitioner , 保 證 分 區 的 一 致 性 ( 例 題 中 還 是 按 工 號 做 分 區 )。
          重 點 是 繼 承Partitioner 類
    3.還要寫一個分組類,重點是繼承WritableComparator/RawComparator
    
    程式碼實現:
public class SecondSortV3 {
/**
* 自定義的 newKey
*/
public static class KeyPairWritable implements
WritableComparable<KeyPairWritable> {
// 組合 key,key1 是分割槽 key,key2 是二次排序 key
private String key1;
private int key2;
public KeyPairWritable() {
}
public KeyPairWritable(String key1, int key2) {
this.set(key1, key2);
}
// 一次性將兩個 key 設定成完
public void set(String key1, int key2) {
this.key1 = key1;
this.key2 = key2;
}
// 當 map 端寫出的時候的序列化方法,即 map 如何將物件寫出去,保證與讀取的順序一致
@Override
public void write(DataOutput arg0) throws IOException {
arg0.writeUTF(key1);
arg0.writeInt(key2);}
// 在 reducer 讀取資料時候的反序列化方法,即 reduce 如何將物件讀取出來,保證與寫入
的順序一致
@Override
public void readFields(DataInput arg0) throws IOException {
this.key1 = arg0.readUTF();
this.key2 = arg0.readInt();
}
// 自定義比較器方法,先比較 key1,確定分割槽號。在分割槽號相同的情況下,去比較 key2
// 就不需要單獨寫一個 Comparator 了
public int compareTo(KeyPairWritable o) {
int compare = this.key1.compareTo(o.key1);
if (compare != 0) {
return compare;
} else {
// 降序排列,故將 o 放到前邊即可
return Integer.valueOf(o.key2).compareTo(
Integer.valueOf(this.getkey2()));
}
}
public int getkey2() {
return key2;
}
public void setkey2(int key2) {
this.key2 = key2;
}
public String getkey1() {
return key1;
}
public void setkey1(String key1) {
this.key1 = key1;
}
}
// map 類,實現 map 函式
public static class LineProcessMapper extends
Mapper<Object, Text, KeyPairWritable, IntWritable> {
// 暫存每個傳過來的詞的值,省掉重複申請空間
private KeyPairWritable outputKey = new KeyPairWritable();
private IntWritable outputValue = new IntWritable();
// 核心 map 方法的具體實現,逐個<key,value>對去處理
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
// 通過 context 物件,將 map 的輸出逐個輸出
String tempLine = value.toString();
if (tempLine != null && tempLine.trim().length() > 0) {
String[] columnArray = tempLine.split("\\s");
outputKey.set(columnArray[0], Integer.parseInt(columnArray[1]));
outputValue.set(Integer.parseInt(columnArray[1]));
context.write(outputKey, outputValue);}
}
}
/**
* 自定義分割槽類,包證同 key 的記錄,如 S1,S2 等,能對映到相同的 reduce 端去處理
*/
public static class SecondPartitioner extends
Partitioner<KeyPairWritable, IntWritable> {
// 採集預設的 HashPartiton 實現即可
@Override
public int getPartition(KeyPairWritable key, IntWritable value,
int numPartitions) {
/*
* 預設的實現 (key.hashCode() & Integer.MAX_VALUE) % numPartitions
* 讓 key 中 first 欄位作為分割槽依據
*/
return (key.getkey1().hashCode() & Integer.MAX_VALUE)
% numPartitions;
}
}
/**
* 在 shuffle 階段的 sort 全域性排序完成後,如何對資料記錄進行分組
*/
public static class SecondSortGroupComparator extends WritableComparator {
// 物件 KeyPairWritable.class 註冊,讓比較器知道該物件並能夠初始化
protected SecondSortGroupComparator() {
super(KeyPairWritable.class, true);
}
@Override
public int compare(WritableComparable first, WritableComparable second) {
if (first == null || second == null) {
return 0;
}
KeyPairWritable newKey1 = (KeyPairWritable) first;
KeyPairWritable newKey2 = (KeyPairWritable) second;
// 自定義按原始資料中第一個 key 分組
return newKey1.getkey1().compareTo(newKey2.getkey1());
}
}
// reduce 類,實現 reduce 函式
public static class SortReducer extends
Reducer<KeyPairWritable, IntWritable, Text, IntWritable> {
private Text outputKey = new Text();
// 核心 reduce 方法的具體實現,逐個<key,List(v1,v2)>去處理
public void reduce(KeyPairWritable keyPair,
Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
// 進來時已經排序完成
outputKey.set(keyPair.getkey1());for (IntWritable val : values) {
context.write(outputKey, val);
}
}
}
// 啟動 mr 的 driver 方法
public static void main(String[] args) throws Exception {
// 得到叢集配置引數
Configuration conf = new Configuration();
// 引數解析器
GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
String[] remainingArgs = optionParser.getRemainingArgs();
if ((remainingArgs.length != 2)) {
System.err
.println("Usage: yarn jar jar_path main_class_path -D 引數列表 <in>
<out>");
System.exit(2);
}
// 設定到本次的 job 例項中
Job job = Job.getInstance(conf, "天亮二次排序(標準版)");
// 指定本次執行的主類是 WordCount
job.setJarByClass(SecondSortV3.class);
// 指定 map 類
job.setMapperClass(LineProcessMapper.class);
// 指定 partition 類
job.setPartitionerClass(SecondPartitioner.class);
job.setGroupingComparatorClass(SecondSortGroupComparator.class);
// 指定 reducer 類
job.setReducerClass(SortReducer.class);
// 指定 job 輸出的 key 和 value 的型別,如果 map 和 reduce 輸出型別不完全相同,需要重
新設定 map 的 output 的 key 和 value 的 class 型別
job.setMapOutputKeyClass(KeyPairWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 指定輸入資料的路徑
FileInputFormat.addInputPath(job, new Path(remainingArgs[0]));
// 指定輸出路徑,並要求該輸出路徑一定是不存在的
FileOutputFormat.setOutputPath(job, new Path(remainingArgs[1]));
// 指定 job 執行模式,等待任務執行完成後,提交任務的客戶端才會退出!
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}