1. 程式人生 > >詳細講解MapReduce二次排序過程

詳細講解MapReduce二次排序過程

我在15年處理大資料的時候還都是使用MapReduce, 隨著時間的推移, 計算工具的發展, 記憶體越來越便宜, 計算方式也有了極大的改變. 到現在再做大資料開發的好多同學都是直接使用spark, hive等工具, 很少有再寫MapReduce的了.
這裡整理一下MapReduce中經常用到的二次排序的方法, 全當複習.

簡介

二次排序(secondary sort)問題是指在Reduce階段對某個鍵關聯的值排序. 利用二次排序技術,可以對傳入Reduce的值完成 升序/降序 排序.
MapReduce框架會自動對Map生成的鍵完成排序. 所以, 在啟動Reduce之前,中間檔案 key-value

是按照key有序的(而不是按照值有序). 它們的值得順序有可能是任意的.

二次排序解決方案

對Reduce中的值排序至少有兩種方案, 這兩種方案在MapReduce/HadoopSpark框架中都可以使用.

  • 第一種方案是讓Reduce讀取和快取給定key的所有的value, 然後在Reduce中對這些值完成排序.(例如: 把一個key對應的所有value放到一個ArrayList中,再排序). 但是這種方式有侷限性, 如果資料量較少還可以使用,如果資料量太大,一個Reduce中放不下所有的值,就會導致記憶體溢位(OutOfMemory).
  • 第二種方式是使用MapReduce
    框架來對值進行排序. 因為MapReduce框架會自動對Map生成的檔案的key進行排序, 所以我們把需要排序的value增加到這個key上,這樣讓框架對這個new_key進行排序,來實現我們的目標.

第二種方法小結:

  1. 使用值鍵轉換設計模式:構造一個組合的中間key,new_key(k, v1), 其中v1是次鍵(secondary key).
  2. MapReduce執行框架完成排序.
  3. 重寫分割槽器,使組合鍵(k, v1) 按照之前單獨的 k 進行分割槽.

示例

假設有一組科學實驗的溫度資料如下:
有4列分別為: 年, 月, 日, 溫度.

2000,12,04,10
2000,11,01,20
2000,12,02,-20
2000,11,07,30
2000,11,24,-40
2000,01,12,10
...

我們需要輸出每一個年-月的溫度,並且值按照升序排序.
所以輸出如下:

(2000-11),[-40,20,30]
(2000-01),[10]
(2000-12),[-20,10]

MapReduce二次排序實現細節

要實現二次排序的特性,還需要一些java的外掛類, 去告訴MapReduce框架一些資訊:

  • 如何對Reduce的鍵排序.
  • 如何對Map產出的資料進行分割槽,進到不同的Reduce.
  • 如何對Reduce中的資料進行分組.

組合鍵的排序順序

要實現二次排序, 我們需要控制組合鍵的排序順序,以及Reduce處理鍵的順序.
首先組合鍵的組成由(年-月 + 溫度)一起組成, 如下圖:

temperature的資料放到鍵中之後, 我們還要指定這個組合鍵排序方式. 使用DateTemperaturePair物件儲存組合鍵, 重寫其compareTo()方法指定排序順序.
Hadoop中,如果需要持久儲存定製資料型別(如DateTemperaturePair),必須實現Writable介面. 如果要比較定製資料型別, 他們還必須實現另外一個介面WritableComparable. 示例程式碼如下:

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
...
public class DateTemperaturePair implements Writable, WritableComparable<DateTemperaturePair> {
    private Text yearMonth = new Text(); //自然鍵
    private Text day = new Text();
    private IntWritable temperature = new IntWritable(); // 次鍵
    ...
    @Override
    /**
    * 這個比較器將控制鍵的排序順序
    * /
    public int compareTo(DateTemperaturePair pair) {
        int compareValue = this.yearMonth.compareTo(pair.getYearMonth());
        if (compareValue == 0) {
            compareValue = temperature.compareTo(pair.getTemperature());
    }
        return compareValue; //升序排序
        //return -1 * compareValue; //降序排序
    }
}

  

定製分割槽器

分割槽器預設會根據Map產出的key來決定資料進到哪個Reduce.
在這裡,我們需要根據yearMonth來分割槽把資料入到不同的Reduce中, 但是我們的鍵已經變成了(yearMonth + temperature)的組合了. 所以需要定製分割槽器來根據yearMonth進行資料分割槽,把相同的yearMonth入到一個Reduce中. 程式碼如下:

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class DateTemperaturePartitioner extends Partitioner<DateTemperaturePair, Text> {
    @Override
    public int getPartition(DatetemperaturePair pair, Text text, int numberOfPartitions) {
    //確保分割槽數非負
    return math.abs(pair.getYearMonth().hashCode() % numberOfPartitions);
    }
}

 

Hadoop提供了一個外掛體系,允許在框架中注入定製分割槽器程式碼. 我們在驅動累中完成這個工作,如下:

import org.apache.hadoop.mapreduce.Job;
...
Job job = ...;
...
job.setPartitionerClass(TemperaturePartitioner.class);

分組比較器

分組比較器會控制哪些鍵要分組到一個Reduce.reduce()方法中呼叫.
預設是按照key分配, 這裡我們期望的是按照組合key(yearMonth + temperature) 中的yearMonth分配, 所以需要重寫分組方法.
如下:

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class DateTemperatureGroupingComparator extends WritableComparator {
    public DateTemperatureGroupingComparator() {
        super(DateTemperaturePair.class, true);
    }
    
    @Override
    /**
    * 比較器控制哪些鍵要分組到一個reduce()方法呼叫
    */
    public int compare(WritableComparable wc1, WritableComparable wc2) {
        DateTemperaturePair pair = (DateTemperaturePair) wc1;
        DateTemperaturePair pair2 = (DateTemperaturePair) wc12;
        return pair.getYearMonth().compareTo(pair2.getYearMonth());
    
    }
}

 

在驅動類中註冊比較器:
job.setGroupingComparatorClass(YearMonthGroupingComparator.class);

使用外掛的資料流

原理總結

MapReduce框架預設會按照key來進行分割槽,排序,分組.
我們需要排序的時候使用key+value所以我們把key變成了新key, (firstkey, secondkey) 對應為(yearMonth, 溫度) .

但是又不想在分割槽 和 分組的時候使用新key, 所以自己寫了Partitioner 和 GroupingComparator 來指定使用組合key中的firstkey來分割槽,分組.

&n