詳細講解MapReduce二次排序過程
我在15年處理大資料的時候還都是使用MapReduce
, 隨著時間的推移, 計算工具的發展, 記憶體越來越便宜, 計算方式也有了極大的改變. 到現在再做大資料開發的好多同學都是直接使用spark
, hive
等工具, 很少有再寫MapReduce
的了.
這裡整理一下MapReduce
中經常用到的二次排序的方法, 全當複習.
簡介
二次排序(secondary sort
)問題是指在Reduce
階段對某個鍵關聯的值排序. 利用二次排序技術,可以對傳入Reduce
的值完成 升序/降序 排序.
MapReduce
框架會自動對Map生成的鍵完成排序. 所以, 在啟動Reduce之前,中間檔案 key-value
key
有序的(而不是按照值有序). 它們的值得順序有可能是任意的.
二次排序解決方案
對Reduce中的值排序至少有兩種方案, 這兩種方案在MapReduce/Hadoop
和 Spark
框架中都可以使用.
- 第一種方案是讓
Reduce
讀取和快取給定key
的所有的value
, 然後在Reduce
中對這些值完成排序.(例如: 把一個key
對應的所有value
放到一個Array
或List
中,再排序). 但是這種方式有侷限性, 如果資料量較少還可以使用,如果資料量太大,一個Reduce
中放不下所有的值,就會導致記憶體溢位(OutOfMemory
). - 第二種方式是使用
MapReduce
MapReduce
框架會自動對Map
生成的檔案的key
進行排序, 所以我們把需要排序的value
增加到這個key
上,這樣讓框架對這個new_key
進行排序,來實現我們的目標.
第二種方法小結:
- 使用值鍵轉換設計模式:構造一個組合的中間key,
new_key(k, v1)
, 其中v1
是次鍵(secondary key
). - 讓
MapReduce
執行框架完成排序. - 重寫分割槽器,使組合鍵
(k, v1)
按照之前單獨的k
進行分割槽.
示例
假設有一組科學實驗的溫度資料如下:
有4列分別為: 年, 月, 日, 溫度.
2000,12,04,10 2000,11,01,20 2000,12,02,-20 2000,11,07,30 2000,11,24,-40 2000,01,12,10 ...
我們需要輸出每一個年-月
的溫度,並且值按照升序排序.
所以輸出如下:
(2000-11),[-40,20,30]
(2000-01),[10]
(2000-12),[-20,10]
MapReduce二次排序實現細節
要實現二次排序的特性,還需要一些java的外掛類, 去告訴MapReduce
框架一些資訊:
- 如何對
Reduce
的鍵排序. - 如何對
Map
產出的資料進行分割槽,進到不同的Reduce
. - 如何對
Reduce
中的資料進行分組.
組合鍵的排序順序
要實現二次排序, 我們需要控制組合鍵的排序順序,以及Reduce
處理鍵的順序.
首先組合鍵的組成由(年-月 + 溫度)
一起組成, 如下圖:

把temperature
的資料放到鍵中之後, 我們還要指定這個組合鍵排序方式. 使用DateTemperaturePair
物件儲存組合鍵, 重寫其compareTo()
方法指定排序順序.
Hadoop
中,如果需要持久儲存定製資料型別(如DateTemperaturePair
),必須實現Writable
介面. 如果要比較定製資料型別, 他們還必須實現另外一個介面WritableComparable
. 示例程式碼如下:
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; ... public class DateTemperaturePair implements Writable, WritableComparable<DateTemperaturePair> { private Text yearMonth = new Text(); //自然鍵 private Text day = new Text(); private IntWritable temperature = new IntWritable(); // 次鍵 ... @Override /** * 這個比較器將控制鍵的排序順序 * / public int compareTo(DateTemperaturePair pair) { int compareValue = this.yearMonth.compareTo(pair.getYearMonth()); if (compareValue == 0) { compareValue = temperature.compareTo(pair.getTemperature()); } return compareValue; //升序排序 //return -1 * compareValue; //降序排序 } }
定製分割槽器
分割槽器預設會根據Map
產出的key
來決定資料進到哪個Reduce
.
在這裡,我們需要根據yearMonth
來分割槽把資料入到不同的Reduce
中, 但是我們的鍵已經變成了(yearMonth + temperature)
的組合了. 所以需要定製分割槽器來根據yearMonth
進行資料分割槽,把相同的yearMonth入到一個Reduce
中. 程式碼如下:
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class DateTemperaturePartitioner extends Partitioner<DateTemperaturePair, Text> { @Override public int getPartition(DatetemperaturePair pair, Text text, int numberOfPartitions) { //確保分割槽數非負 return math.abs(pair.getYearMonth().hashCode() % numberOfPartitions); } }
Hadoop
提供了一個外掛體系,允許在框架中注入定製分割槽器程式碼. 我們在驅動累中完成這個工作,如下:
import org.apache.hadoop.mapreduce.Job;
...
Job job = ...;
...
job.setPartitionerClass(TemperaturePartitioner.class);
分組比較器
分組比較器會控制哪些鍵要分組到一個Reduce.reduce()方法
中呼叫.
預設是按照key
分配, 這裡我們期望的是按照組合key(yearMonth + temperature)
中的yearMonth
分配, 所以需要重寫分組方法.
如下:
import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class DateTemperatureGroupingComparator extends WritableComparator { public DateTemperatureGroupingComparator() { super(DateTemperaturePair.class, true); } @Override /** * 比較器控制哪些鍵要分組到一個reduce()方法呼叫 */ public int compare(WritableComparable wc1, WritableComparable wc2) { DateTemperaturePair pair = (DateTemperaturePair) wc1; DateTemperaturePair pair2 = (DateTemperaturePair) wc12; return pair.getYearMonth().compareTo(pair2.getYearMonth()); } }
在驅動類中註冊比較器:
job.setGroupingComparatorClass(YearMonthGroupingComparator.class);
使用外掛的資料流

原理總結
MapReduce
框架預設會按照key
來進行分割槽,排序,分組.
我們需要排序的時候使用key+value
所以我們把key
變成了新key, (firstkey, secondkey)
對應為(yearMonth, 溫度)
.
但是又不想在分割槽 和 分組的時候使用新key, 所以自己寫了Partitioner 和 GroupingComparator
來指定使用組合key
中的firstkey
來分割槽,分組.
&n