Hadoop二次排序及MapReduce處理流程例項詳解
一、概述
MapReduce框架對處理結果的輸出會根據key值進行預設的排序,這個預設排序可以滿足一部分需求,但是也是十分有限的,在我們實際的需求當中,往往有要對reduce輸出結果進行二次排序的需求。對於二次排序的實現,網路上已經有很多人分享過了,但是對二次排序的實現原理及整個MapReduce框架的處理流程的分析還是有非常大的出入,而且部分分析是沒有經過驗證的。本文將通過一個實際的MapReduce二次排序的例子,講述二次排序的實現和其MapReduce的整個處理流程,並且通過結果和Map、Reduce端的日誌來驗證描述的處理流程的正確性。
二、需求描述
1.輸入資料
sort1 1 sort2 3 sort2 88 sort2 54 sort1 2 sort6 22 sort6 888 sort6 58
2.目標輸出
sort1 1,2
sort2 3,54,88
sort6 22,58,888
三、解決思路
1.首先,在思考解決問題思路時,我們應該先深刻的理解MapReduce處理資料的整個流程,這是最基礎的,不然的話是不可能找到解決問題的思路的。我描述一下MapReduce處理資料的大概流程:首先,MapReduce框架通過getSplits()方法實現對原始檔案的切片之後,每一個切片對應著一個MapTask,InputSplit輸入到map()函式進行處理,中間結果經過環形緩衝區的排序,然後分割槽、自定義二次排序(如果有的話)和合並,再通過Shuffle操作將資料傳輸到reduce Task端,reduce端也存在著緩衝區,資料也會在緩衝區和磁碟中進行合併排序等操作,然後對資料按照key值進行分組,然後每處理完一個分組之後就會去呼叫一次reduce()函式,最終輸出結果。大概流程 我畫了一下,如下圖:
2.具體解決思路
(1):Map端處理
根據上面的需求,我們有一個非常明確的目標就是要對第一列相同的記錄,並且對合並後的數字進行排序。我們都知道MapReduce框架不管是預設排序或者是自定義排序都只是對key值進行排序,現在的情況是這些資料不是key值,怎麼辦?其實我們可以將原始資料的key值和其對應的資料組合成一個新的key值,然後新的key值對應的value還是原始資料中的valu。那麼我們就可以將原始資料的map輸出變成類似下面的資料結構:
{[sort1,1],1} {[sort2,3],3} {[sort2,88],88} {[sort2,54],54} {[sort1,2],2} {[sort6,22],22} {[sort6,888],888} {[sort6,58],58}
那麼我們只需要對[]裡面的心key值進行排序就OK了,然後我們需要自定義一個分割槽處理器,因為我的目標不是想將新key相同的記錄傳到一個reduce中,而是想將新key中第一個欄位相同的記錄放到同一個reduce中進行分組合並,所以我們需要根據新key值的第一個欄位來自定義一個分割槽處理器。通過分割槽操作後,得到的資料流如下:
Partition1:{[sort1,1],1}、{[sort1,2],2}
Partition2:{[sort2,3],3}、{[sort2,88],88}、{[sort2,54],54}
Partition3:{[sort6,22],22}、{[sort6,888],888}、{[sort6,58],58}
分割槽操作完成之後,我呼叫自己的自定義排序器對新的key值進行排序。
{[sort1,1],1}
{[sort1,2],2}
{[sort2,3],3}
{[sort2,54],54}
{[sort2,88],88}
{[sort6,22],22}
{[sort6,58],58}
{[sort6,888],888}
(2).Reduce端處理
經過Shuffle處理之後,資料傳輸到Reducer端了。在Reducer端按照組合鍵的第一個欄位進行分組,並且每處理完一次分組之後就會呼叫一次reduce函式來對這個分組進行處理和輸出。最終各個分組的資料結果變成類似下面的資料結構:
sort1 1,2
sort2 3,54,88
sort6 22,58,888
四、具體實現
1.自定義組合鍵
public class CombinationKey implements WritableComparable<CombinationKey>{
private Text firstKey;
private IntWritable secondKey;
//無參建構函式
public CombinationKey() {
this.firstKey = new Text();
this.secondKey = new IntWritable();
}
//有參建構函式
public CombinationKey(Text firstKey, IntWritable secondKey) {
this.firstKey = firstKey;
this.secondKey = secondKey;
}
public Text getFirstKey() {
return firstKey;
}
public void setFirstKey(Text firstKey) {
this.firstKey = firstKey;
}
public IntWritable getSecondKey() {
return secondKey;
}
public void setSecondKey(IntWritable secondKey) {
this.secondKey = secondKey;
}
public void write(DataOutput out) throws IOException {
this.firstKey.write(out);
this.secondKey.write(out);
}
public void readFields(DataInput in) throws IOException {
this.firstKey.readFields(in);
this.secondKey.readFields(in);
}
/*public int compareTo(CombinationKey combinationKey) {
int minus = this.getFirstKey().compareTo(combinationKey.getFirstKey());
if (minus != 0){
return minus;
}
return this.getSecondKey().get() - combinationKey.getSecondKey().get();
}*/
/**
* 自定義比較策略
* 注意:該比較策略用於MapReduce的第一次預設排序
* 也就是發生在Map端的sort階段
* 發生地點為環形緩衝區(可以通過io.sort.mb進行大小調整)
*/
public int compareTo(CombinationKey combinationKey) {
System.out.println("------------------------CombineKey flag-------------------");
return this.firstKey.compareTo(combinationKey.getFirstKey());
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((firstKey == null) ? 0 : firstKey.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
CombinationKey other = (CombinationKey) obj;
if (firstKey == null) {
if (other.firstKey != null)
return false;
} else if (!firstKey.equals(other.firstKey))
return false;
return true;
}
}
說明:在自定義組合鍵的時候,我們需要特別注意,一定要實現WritableComparable介面,並且實現compareTo()方法的比較策略。這個用於MapReduce的第一次預設排序,也就是發生在Map階段的sort小階段,發生地點為環形緩衝區(可以通過io.sort.mb進行大小調整),但是其對我們最終的二次排序結果是沒有影響的,我們二次排序的最終結果是由我們的自定義比較器決定的。
2.自定義分割槽器
/**
* 自定義分割槽
* @author 廖鍾*民
* time : 2015年1月19日下午12:13:54
* @version
*/
public class DefinedPartition extends Partitioner<CombinationKey, IntWritable>{
/**
* 資料輸入來源:map輸出 我們這裡根據組合鍵的第一個值作為分割槽
* 如果不自定義分割槽的話,MapReduce會根據預設的Hash分割槽方法
* 將整個組合鍵相等的分到一個分割槽中,這樣的話顯然不是我們要的效果
* @param key map輸出鍵值
* @param value map輸出value值
* @param numPartitions 分割槽總數,即reduce task個數
*/
public int getPartition(CombinationKey key, IntWritable value, int numPartitions) {
System.out.println("---------------------進入自定義分割槽---------------------");
System.out.println("---------------------結束自定義分割槽---------------------");
return (key.getFirstKey().hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}
3.自定義比較器
public class DefinedComparator extends WritableComparator{
protected DefinedComparator() {
super(CombinationKey.class,true);
}
/**
* 第一列按升序排列,第二列也按升序排列
*/
public int compare(WritableComparable a, WritableComparable b) {
System.out.println("------------------進入二次排序-------------------");
CombinationKey c1 = (CombinationKey) a;
CombinationKey c2 = (CombinationKey) b;
int minus = c1.getFirstKey().compareTo(c2.getFirstKey());
if (minus != 0){
System.out.println("------------------結束二次排序-------------------");
return minus;
} else {
System.out.println("------------------結束二次排序-------------------");
return c1.getSecondKey().get() -c2.getSecondKey().get();
}
}
}
4.自定義分組
/**
* 自定義分組有中方式,一種是繼承WritableComparator
* 另外一種是實現RawComparator介面
* @author 廖*民
* time : 2015年1月19日下午3:30:11
* @version
*/
public class DefinedGroupSort extends WritableComparator{
protected DefinedGroupSort() {
super(CombinationKey.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
System.out.println("---------------------進入自定義分組---------------------");
CombinationKey combinationKey1 = (CombinationKey) a;
CombinationKey combinationKey2 = (CombinationKey) b;
System.out.println("---------------------分組結果:" + combinationKey1.getFirstKey().compareTo(combinationKey2.getFirstKey()));
System.out.println("---------------------結束自定義分組---------------------");
//自定義按原始資料中第一個key分組
return combinationKey1.getFirstKey().compareTo(combinationKey2.getFirstKey());
}
}
5.主體程式實現
public class SecondSortMapReduce {
// 定義輸入路徑
private static final String INPUT_PATH = "hdfs://liaozhongmin:9000/sort_data";
// 定義輸出路徑
private static final String OUT_PATH = "hdfs://liaozhongmin:9000/out";
public static void main(String[] args) {
try {
// 建立配置資訊
Configuration conf = new Configuration();
conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, "\t");
// 建立檔案系統
FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
// 如果輸出目錄存在,我們就刪除
if (fileSystem.exists(new Path(OUT_PATH))) {
fileSystem.delete(new Path(OUT_PATH), true);
}
// 建立任務
Job job = new Job(conf, SecondSortMapReduce.class.getName());
//1.1 設定輸入目錄和設定輸入資料格式化的類
FileInputFormat.setInputPaths(job, INPUT_PATH);
job.setInputFormatClass(KeyValueTextInputFormat.class);
//1.2 設定自定義Mapper類和設定map函式輸出資料的key和value的型別
job.setMapperClass(SecondSortMapper.class);
job.setMapOutputKeyClass(CombinationKey.class);
job.setMapOutputValueClass(IntWritable.class);
//1.3 設定分割槽和reduce數量(reduce的數量,和分割槽的數量對應,因為分割槽為一個,所以reduce的數量也是一個)
job.setPartitionerClass(DefinedPartition.class);
job.setNumReduceTasks(1);
//設定自定義分組策略
job.setGroupingComparatorClass(DefinedGroupSort.class);
//設定自定義比較策略(因為我的CombineKey重寫了compareTo方法,所以這個可以省略)
job.setSortComparatorClass(DefinedComparator.class);
//1.4 排序
//1.5 歸約
//2.1 Shuffle把資料從Map端拷貝到Reduce端。
//2.2 指定Reducer類和輸出key和value的型別
job.setReducerClass(SecondSortReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//2.3 指定輸出的路徑和設定輸出的格式化類
FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
job.setOutputFormatClass(TextOutputFormat.class);
// 提交作業 退出
System.exit(job.waitForCompletion(true) ? 0 : 1);
} catch (Exception e) {
e.printStackTrace();
}
}
public static class SecondSortMapper extends Mapper<Text, Text, CombinationKey, IntWritable>{
/**
* 這裡要特殊說明一下,為什麼要將這些變數寫在map函式外邊
* 對於分散式的程式,我們一定要注意到記憶體的使用情況,對於MapReduce框架
* 每一行的原始記錄的處理都要呼叫一次map()函式,假設,這個map()函式要處理1一億
* 條輸入記錄,如果將這些變數都定義在map函式裡面則會導致這4個變數的物件控制代碼
* 非常的多(極端情況下將產生4*1億個控制代碼,當然java也是有自動的GC機制的,一定不會達到這麼多)
* 導致棧記憶體被浪費掉,我們將其寫在map函式外面,頂多就只有4個物件控制代碼
*/
private CombinationKey combinationKey = new CombinationKey();
Text sortName = new Text();
IntWritable score = new IntWritable();
String[] splits = null;
protected void map(Text key, Text value, Mapper<Text, Text, CombinationKey, IntWritable>.Context context) throws IOException, InterruptedException {
System.out.println("---------------------進入map()函式---------------------");
//過濾非法記錄(這裡用計數器比較好)
if (key == null || value == null || key.toString().equals("")){
return;
}
//構造相關屬性
sortName.set(key.toString());
score.set(Integer.parseInt(value.toString()));
//設定聯合key
combinationKey.setFirstKey(sortName);
combinationKey.setSecondKey(score);
//通過context把map處理後的結果輸出
context.write(combinationKey, score);
System.out.println("---------------------結束map()函式---------------------");
}
}
public static class SecondSortReducer extends Reducer<CombinationKey, IntWritable, Text, Text>{
StringBuffer sb = new StringBuffer();
Text score = new Text();
/**
* 這裡要注意一下reduce的呼叫時機和次數:
* reduce每次處理一個分組的時候會呼叫一次reduce函式。
* 所謂的分組就是將相同的key對應的value放在一個集合中
* 例如:<sort1,1> <sort1,2>
* 分組後的結果就是
* <sort1,{1,2}>這個分組會呼叫一次reduce函式
*/
protected void reduce(CombinationKey key, Iterable<IntWritable> values, Reducer<CombinationKey, IntWritable, Text, Text>.Context context)
throws IOException, InterruptedException {
//先清除上一個組的資料
sb.delete(0, sb.length());
for (IntWritable val : values){
sb.append(val.get() + ",");
}
//取出最後一個逗號
if (sb.length() > 0){
sb.deleteCharAt(sb.length() - 1);
}
//設定寫出去的value
score.set(sb.toString());
//將聯合Key的第一個元素作為新的key,將score作為value寫出去
context.write(key.getFirstKey(), score);
System.out.println("---------------------進入reduce()函式---------------------");
System.out.println("---------------------{[" + key.getFirstKey()+"," + key.getSecondKey() + "],[" +score +"]}");
System.out.println("---------------------結束reduce()函式---------------------");
}
}
}
程式執行的結果:
五、處理流程
看到前面的程式碼,都知道我在各個元件上已經設定好了相應的標誌,用於追蹤整個MapReduce處理二次排序的處理流程。現在讓我們分別看看Map端和Reduce端的日誌情況。
(1)Map端日誌分析
15/01/19 15:32:29 INFO input.FileInputFormat: Total input paths to process : 1
15/01/19 15:32:29 WARN snappy.LoadSnappy: Snappy native library not loaded
15/01/19 15:32:30 INFO mapred.JobClient: Running job: job_local_0001
15/01/19 15:32:30 INFO mapred.Task: Using ResourceCalculatorPlugin : null
15/01/19 15:32:30 INFO mapred.MapTask: io.sort.mb = 100
15/01/19 15:32:30 INFO mapred.MapTask: data buffer = 79691776/99614720
15/01/19 15:32:30 INFO mapred.MapTask: record buffer = 262144/327680
---------------------進入map()函式---------------------
---------------------進入自定義分割槽---------------------
---------------------結束自定義分割槽---------------------
---------------------結束map()函式---------------------
---------------------進入map()函式---------------------
---------------------進入自定義分割槽---------------------
---------------------結束自定義分割槽---------------------
---------------------結束map()函式---------------------
---------------------進入map()函式---------------------
---------------------進入自定義分割槽---------------------
---------------------結束自定義分割槽---------------------
---------------------結束map()函式---------------------
---------------------進入map()函式---------------------
---------------------進入自定義分割槽---------------------
---------------------結束自定義分割槽---------------------
---------------------結束map()函式---------------------
---------------------進入map()函式---------------------
---------------------進入自定義分割槽---------------------
---------------------結束自定義分割槽---------------------
---------------------結束map()函式---------------------
---------------------進入map()函式---------------------
---------------------進入自定義分割槽---------------------
---------------------結束自定義分割槽---------------------
---------------------結束map()函式---------------------
---------------------進入map()函式---------------------
---------------------進入自定義分割槽---------------------
---------------------結束自定義分割槽---------------------
---------------------結束map()函式---------------------
---------------------進入map()函式---------------------
---------------------進入自定義分割槽---------------------
---------------------結束自定義分割槽---------------------
---------------------結束map()函式---------------------
15/01/19 15:32:30 INFO mapred.MapTask: Starting flush of map output
------------------進入二次排序-------------------
------------------結束二次排序-------------------
------------------進入二次排序-------------------
------------------結束二次排序-------------------
------------------進入二次排序-------------------
------------------結束二次排序-------------------
------------------進入二次排序-------------------
------------------結束二次排序-------------------
------------------進入二次排序-------------------
------------------結束二次排序-------------------
------------------進入二次排序-------------------
------------------結束二次排序-------------------
------------------進入二次排序-------------------
------------------結束二次排序-------------------
------------------進入二次排序-------------------
------------------結束二次排序-------------------
------------------進入二次排序-------------------
------------------結束二次排序-------------------
------------------進入二次排序-------------------
------------------結束二次排序-------------------
------------------進入二次排序-------------------
------------------結束二次排序-------------------
------------------進入二次排序-------------------
------------------結束二次排序-------------------
15/01/19 15:32:30 INFO mapred.MapTask: Finished spill 0
15/01/19 15:32:30 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
15/01/19 15:32:30 INFO mapred.LocalJobRunner:
15/01/19 15:32:30 INFO mapred.Task: Task 'attempt_local_0001_m_000000_0' done.
15/01/19 15:32:30 INFO mapred.Task: Using ResourceCalculatorPlugin : null
15/01/19 15:32:30 INFO mapred.LocalJobRunner:
從Map端的日誌,我們可以很容易的看出來每一條記錄開始時進入到map()函式進行處理,處理完了之後立馬就自定義分割槽函式中對其進行分割槽,當所有輸入資料經過map()函式和分割槽函式處理之後,就呼叫自定義二次排序函式對其進行排序。
(2)Reduce端日誌分析
15/01/19 15:32:30 INFO mapred.Merger: Merging 1 sorted segments
15/01/19 15:32:30 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 130 bytes
15/01/19 15:32:30 INFO mapred.LocalJobRunner:
---------------------進入自定義分組---------------------
---------------------分組結果:0
---------------------結束自定義分組---------------------
---------------------進入自定義分組---------------------
---------------------分組結果:-1
---------------------結束自定義分組---------------------
---------------------進入reduce()函式---------------------
---------------------{[sort1,2],[1,2]}
---------------------結束reduce()函式---------------------
---------------------進入自定義分組---------------------
---------------------分組結果:0
---------------------結束自定義分組---------------------
---------------------進入自定義分組---------------------
---------------------分組結果:0
---------------------結束自定義分組---------------------
---------------------進入自定義分組---------------------
---------------------分組結果:-4
---------------------結束自定義分組---------------------
---------------------進入reduce()函式---------------------
---------------------{[sort2,88],[3,54,88]}
---------------------結束reduce()函式---------------------
---------------------進入自定義分組---------------------
---------------------分組結果:0
---------------------結束自定義分組---------------------
---------------------進入自定義分組---------------------
---------------------分組結果:0
---------------------結束自定義分組---------------------
---------------------進入reduce()函式---------------------
---------------------{[sort6,888],[22,58,888]}
---------------------結束reduce()函式---------------------
15/01/19 15:32:30 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
15/01/19 15:32:30 INFO mapred.LocalJobRunner:
15/01/19 15:32:30 INFO mapred.Task: Task attempt_local_0001_r_000000_0 is allowed to commit now
15/01/19 15:32:30 INFO output.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://liaozhongmin:9000/out
15/01/19 15:32:30 INFO mapred.LocalJobRunner: reduce > reduce
15/01/19 15:32:30 INFO mapred.Task: Task 'attempt_local_0001_r_000000_0' done.
15/01/19 15:32:31 INFO mapred.JobClient: map 100% reduce 100%
15/01/19 15:32:31 INFO mapred.JobClient: Job complete: job_local_0001
15/01/19 15:32:31 INFO mapred.JobClient: Counters: 19
15/01/19 15:32:31 INFO mapred.JobClient: File Output Format Counters
15/01/19 15:32:31 INFO mapred.JobClient: Bytes Written=40
15/01/19 15:32:31 INFO mapred.JobClient: FileSystemCounters
15/01/19 15:32:31 INFO mapred.JobClient: FILE_BYTES_READ=446
15/01/19 15:32:31 INFO mapred.JobClient: HDFS_BYTES_READ=140
15/01/19 15:32:31 INFO mapred.JobClient: FILE_BYTES_WRITTEN=131394
15/01/19 15:32:31 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=40
15/01/19 15:32:31 INFO mapred.JobClient: File Input Format Counters
15/01/19 15:32:31 INFO mapred.JobClient: Bytes Read=70
15/01/19 15:32:31 INFO mapred.JobClient: Map-Reduce Framework
15/01/19 15:32:31 INFO mapred.JobClient: Reduce input groups=3
15/01/19 15:32:31 INFO mapred.JobClient: Map output materialized bytes=134
15/01/19 15:32:31 INFO mapred.JobClient: Combine output records=0
15/01/19 15:32:31 INFO mapred.JobClient: Map input records=8
15/01/19 15:32:31 INFO mapred.JobClient: Reduce shuffle bytes=0
15/01/19 15:32:31 INFO mapred.JobClient: Reduce output records=3
15/01/19 15:32:31 INFO mapred.JobClient: Spilled Records=16
15/01/19 15:32:31 INFO mapred.JobClient: Map output bytes=112
15/01/19 15:32:31 INFO mapred.JobClient: Total committed heap usage (bytes)=391118848
15/01/19 15:32:31 INFO mapred.JobClient: Combine input records=0
15/01/19 15:32:31 INFO mapred.JobClient: Map output records=8
15/01/19 15:32:31 INFO mapred.JobClient: SPLIT_RAW_BYTES=99
15/01/19 15:32:31 INFO mapred.JobClient: Reduce input records=8
首先,我們看了Reduce端的日誌,第一個資訊我應該很容易能夠很容易看出來,就是分組和reduce()函式處理都是在Shuffle完成之後才進行的。另外一點我們也非常容易看出,就是每次處理完一個分組資料就會去呼叫一次的reduce()函式對這個分組進行處理和輸出。此外,說明一些分組函式的返回值問題,當返回0時才會被分到同一個組中。另外一點我們也可以看出來,一個分組中每合併n個值就會有n-1分組函式返回0值,也就是說進行了n-1次比較。
六、總結
本文主要從MapReduce框架執行的流程,去分析瞭如何去實現二次排序,通過程式碼進行了實現,並且對整個流程進行了驗證。另外,要吐槽一下,網路上有很多文章都記錄了MapReudce處理二次排序問題,但是對MapReduce框架整個處理流程的描述錯漏很多,而且他們最終的流程描述也沒有證據可以支撐。所以,對於網路上的學習資源不能夠完全依賴,要融入自己的思想,並且要重要的觀點進行程式碼或者實踐的驗證。另外,今天在一個hadoop交流群上聽到少部分人在討論,有了hive我們就不用學習些MapReduce程式?對這這個問題我是這麼認為:我不相信寫不好MapReduce程式的程式設計師會寫好hive語句,最起碼的他們對整個執行流程是一無所知的,更不用說效能問題了,有可能連最常見的資料傾斜問題的弄不清楚。
如果文章寫的有問題,歡迎指出,共同學習!