1. 程式人生 > >Hadoop二次排序及MapReduce處理流程例項詳解

Hadoop二次排序及MapReduce處理流程例項詳解

一、概述

MapReduce框架對處理結果的輸出會根據key值進行預設的排序,這個預設排序可以滿足一部分需求,但是也是十分有限的,在我們實際的需求當中,往往有要對reduce輸出結果進行二次排序的需求。對於二次排序的實現,網路上已經有很多人分享過了,但是對二次排序的實現原理及整個MapReduce框架的處理流程的分析還是有非常大的出入,而且部分分析是沒有經過驗證的。本文將通過一個實際的MapReduce二次排序的例子,講述二次排序的實現和其MapReduce的整個處理流程,並且通過結果和Map、Reduce端的日誌來驗證描述的處理流程的正確性。

二、需求描述

1.輸入資料

sort1	1
sort2	3
sort2	88
sort2	54
sort1	2
sort6	22
sort6	888
sort6	58

2.目標輸出

sort1	1,2
sort2	3,54,88
sort6	22,58,888

三、解決思路

1.首先,在思考解決問題思路時,我們應該先深刻的理解MapReduce處理資料的整個流程,這是最基礎的,不然的話是不可能找到解決問題的思路的。我描述一下MapReduce處理資料的大概流程:首先,MapReduce框架通過getSplits()方法實現對原始檔案的切片之後,每一個切片對應著一個MapTask,InputSplit輸入到map()函式進行處理,中間結果經過環形緩衝區的排序,然後分割槽、自定義二次排序(如果有的話)和合並,再通過Shuffle操作將資料傳輸到reduce Task端,reduce端也存在著緩衝區,資料也會在緩衝區和磁碟中進行合併排序等操作,然後對資料按照key值進行分組,然後每處理完一個分組之後就會去呼叫一次reduce()函式,最終輸出結果。大概流程 我畫了一下,如下圖:


2.具體解決思路

(1):Map端處理

根據上面的需求,我們有一個非常明確的目標就是要對第一列相同的記錄,並且對合並後的數字進行排序。我們都知道MapReduce框架不管是預設排序或者是自定義排序都只是對key值進行排序,現在的情況是這些資料不是key值,怎麼辦?其實我們可以將原始資料的key值和其對應的資料組合成一個新的key值,然後新的key值對應的value還是原始資料中的valu。那麼我們就可以將原始資料的map輸出變成類似下面的資料結構:

{[sort1,1],1}
{[sort2,3],3}
{[sort2,88],88}
{[sort2,54],54}
{[sort1,2],2}
{[sort6,22],22}
{[sort6,888],888}
{[sort6,58],58}

那麼我們只需要對[]裡面的心key值進行排序就OK了,然後我們需要自定義一個分割槽處理器,因為我的目標不是想將新key相同的記錄傳到一個reduce中,而是想將新key中第一個欄位相同的記錄放到同一個reduce中進行分組合並,所以我們需要根據新key值的第一個欄位來自定義一個分割槽處理器。通過分割槽操作後,得到的資料流如下:
Partition1:{[sort1,1],1}、{[sort1,2],2}

Partition2:{[sort2,3],3}、{[sort2,88],88}、{[sort2,54],54}

Partition3:{[sort6,22],22}、{[sort6,888],888}、{[sort6,58],58}

分割槽操作完成之後,我呼叫自己的自定義排序器對新的key值進行排序。
{[sort1,1],1}
{[sort1,2],2}
{[sort2,3],3}
{[sort2,54],54}
{[sort2,88],88}
{[sort6,22],22}
{[sort6,58],58}
{[sort6,888],888}


(2).Reduce端處理

經過Shuffle處理之後,資料傳輸到Reducer端了。在Reducer端按照組合鍵的第一個欄位進行分組,並且每處理完一次分組之後就會呼叫一次reduce函式來對這個分組進行處理和輸出。最終各個分組的資料結果變成類似下面的資料結構:

sort1	1,2
sort2	3,54,88
sort6	22,58,888


四、具體實現

1.自定義組合鍵

public class CombinationKey implements WritableComparable<CombinationKey>{

	private Text firstKey;
	private IntWritable secondKey;
	
	//無參建構函式
	public CombinationKey() {
		this.firstKey = new Text();
		this.secondKey = new IntWritable();
	}
	
	//有參建構函式
	public CombinationKey(Text firstKey, IntWritable secondKey) {
		this.firstKey = firstKey;
		this.secondKey = secondKey;
	}

	public Text getFirstKey() {
		return firstKey;
	}

	public void setFirstKey(Text firstKey) {
		this.firstKey = firstKey;
	}

	public IntWritable getSecondKey() {
		return secondKey;
	}

	public void setSecondKey(IntWritable secondKey) {
		this.secondKey = secondKey;
	}

	public void write(DataOutput out) throws IOException {
		this.firstKey.write(out);
		this.secondKey.write(out);
	}

	public void readFields(DataInput in) throws IOException {
		this.firstKey.readFields(in);
		this.secondKey.readFields(in);
	}

	
	/*public int compareTo(CombinationKey combinationKey) {
		int minus = this.getFirstKey().compareTo(combinationKey.getFirstKey());
		if (minus != 0){
			return minus;
		}
		return this.getSecondKey().get() - combinationKey.getSecondKey().get();
	}*/
	/**
	 * 自定義比較策略
	 * 注意:該比較策略用於MapReduce的第一次預設排序
	 * 也就是發生在Map端的sort階段
	 * 發生地點為環形緩衝區(可以通過io.sort.mb進行大小調整)
	 */
	public int compareTo(CombinationKey combinationKey) {
		System.out.println("------------------------CombineKey flag-------------------");
		return this.firstKey.compareTo(combinationKey.getFirstKey());
	}

	@Override
	public int hashCode() {
		final int prime = 31;
		int result = 1;
		result = prime * result + ((firstKey == null) ? 0 : firstKey.hashCode());
		return result;
	}

	@Override
	public boolean equals(Object obj) {
		if (this == obj)
			return true;
		if (obj == null)
			return false;
		if (getClass() != obj.getClass())
			return false;
		CombinationKey other = (CombinationKey) obj;
		if (firstKey == null) {
			if (other.firstKey != null)
				return false;
		} else if (!firstKey.equals(other.firstKey))
			return false;
		return true;
	}

	
}
說明:在自定義組合鍵的時候,我們需要特別注意,一定要實現WritableComparable介面,並且實現compareTo()方法的比較策略。這個用於MapReduce的第一次預設排序,也就是發生在Map階段的sort小階段,發生地點為環形緩衝區(可以通過io.sort.mb進行大小調整),但是其對我們最終的二次排序結果是沒有影響的,我們二次排序的最終結果是由我們的自定義比較器決定的。

2.自定義分割槽器

/**
 * 自定義分割槽
 * @author 廖鍾*民
 * time : 2015年1月19日下午12:13:54
 * @version
 */
public class DefinedPartition extends Partitioner<CombinationKey, IntWritable>{

	/**
	 * 資料輸入來源:map輸出 我們這裡根據組合鍵的第一個值作為分割槽
	 * 如果不自定義分割槽的話,MapReduce會根據預設的Hash分割槽方法
	 * 將整個組合鍵相等的分到一個分割槽中,這樣的話顯然不是我們要的效果
	 * @param key map輸出鍵值
	 * @param value map輸出value值
	 * @param numPartitions 分割槽總數,即reduce task個數
	 */
	public int getPartition(CombinationKey key, IntWritable value, int numPartitions) {
		System.out.println("---------------------進入自定義分割槽---------------------");
		System.out.println("---------------------結束自定義分割槽---------------------");
		return (key.getFirstKey().hashCode() & Integer.MAX_VALUE) % numPartitions;
	}

}

3.自定義比較器
public class DefinedComparator extends WritableComparator{

	protected DefinedComparator() {
		super(CombinationKey.class,true);
	}

	/**
	 * 第一列按升序排列,第二列也按升序排列
	 */
	public int compare(WritableComparable a, WritableComparable b) {
		System.out.println("------------------進入二次排序-------------------");
		CombinationKey c1 = (CombinationKey) a;
		CombinationKey c2 = (CombinationKey) b;
		int minus = c1.getFirstKey().compareTo(c2.getFirstKey());
		
		if (minus != 0){
			System.out.println("------------------結束二次排序-------------------");
			return minus;
		} else {
			System.out.println("------------------結束二次排序-------------------");
			return c1.getSecondKey().get() -c2.getSecondKey().get();
		}
	}
}

4.自定義分組
/**
 * 自定義分組有中方式,一種是繼承WritableComparator
 * 另外一種是實現RawComparator介面
 * @author 廖*民
 * time : 2015年1月19日下午3:30:11
 * @version
 */
public class DefinedGroupSort extends WritableComparator{


	protected DefinedGroupSort() {
		super(CombinationKey.class,true);
	}

	@Override
	public int compare(WritableComparable a, WritableComparable b) {
		System.out.println("---------------------進入自定義分組---------------------");
		CombinationKey combinationKey1 = (CombinationKey) a;
		CombinationKey combinationKey2 = (CombinationKey) b;
		System.out.println("---------------------分組結果:" + combinationKey1.getFirstKey().compareTo(combinationKey2.getFirstKey()));
		System.out.println("---------------------結束自定義分組---------------------");
		//自定義按原始資料中第一個key分組
		return combinationKey1.getFirstKey().compareTo(combinationKey2.getFirstKey());
	}


}

5.主體程式實現
public class SecondSortMapReduce {

		// 定義輸入路徑
		private static final String INPUT_PATH = "hdfs://liaozhongmin:9000/sort_data";
		// 定義輸出路徑
		private static final String OUT_PATH = "hdfs://liaozhongmin:9000/out";

		public static void main(String[] args) {

			try {
				// 建立配置資訊
				Configuration conf = new Configuration();
				conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, "\t");

				// 建立檔案系統
				FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
				// 如果輸出目錄存在,我們就刪除
				if (fileSystem.exists(new Path(OUT_PATH))) {
					fileSystem.delete(new Path(OUT_PATH), true);
				}

				// 建立任務
				Job job = new Job(conf, SecondSortMapReduce.class.getName());

				//1.1	設定輸入目錄和設定輸入資料格式化的類
				FileInputFormat.setInputPaths(job, INPUT_PATH);
				job.setInputFormatClass(KeyValueTextInputFormat.class);

				//1.2	設定自定義Mapper類和設定map函式輸出資料的key和value的型別
				job.setMapperClass(SecondSortMapper.class);
				job.setMapOutputKeyClass(CombinationKey.class);
				job.setMapOutputValueClass(IntWritable.class);

				//1.3	設定分割槽和reduce數量(reduce的數量,和分割槽的數量對應,因為分割槽為一個,所以reduce的數量也是一個)
				job.setPartitionerClass(DefinedPartition.class);
				job.setNumReduceTasks(1);
				
				//設定自定義分組策略
				job.setGroupingComparatorClass(DefinedGroupSort.class);
				//設定自定義比較策略(因為我的CombineKey重寫了compareTo方法,所以這個可以省略)
				job.setSortComparatorClass(DefinedComparator.class);
				
				//1.4	排序
				//1.5	歸約
				//2.1	Shuffle把資料從Map端拷貝到Reduce端。
				//2.2	指定Reducer類和輸出key和value的型別
				job.setReducerClass(SecondSortReducer.class);
				job.setOutputKeyClass(Text.class);
				job.setOutputValueClass(Text.class);

				//2.3	指定輸出的路徑和設定輸出的格式化類
				FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
				job.setOutputFormatClass(TextOutputFormat.class);


				// 提交作業 退出
				System.exit(job.waitForCompletion(true) ? 0 : 1);
			
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
		
	public static class SecondSortMapper extends Mapper<Text, Text, CombinationKey, IntWritable>{
		/**
		 * 這裡要特殊說明一下,為什麼要將這些變數寫在map函式外邊
		 * 對於分散式的程式,我們一定要注意到記憶體的使用情況,對於MapReduce框架
		 * 每一行的原始記錄的處理都要呼叫一次map()函式,假設,這個map()函式要處理1一億
		 * 條輸入記錄,如果將這些變數都定義在map函式裡面則會導致這4個變數的物件控制代碼
		 * 非常的多(極端情況下將產生4*1億個控制代碼,當然java也是有自動的GC機制的,一定不會達到這麼多)
		 * 導致棧記憶體被浪費掉,我們將其寫在map函式外面,頂多就只有4個物件控制代碼
		 */
		private CombinationKey combinationKey = new CombinationKey();
		Text sortName = new Text();
		IntWritable score = new IntWritable();
		String[] splits = null;
		protected void map(Text key, Text value, Mapper<Text, Text, CombinationKey, IntWritable>.Context context) throws IOException, InterruptedException {
			System.out.println("---------------------進入map()函式---------------------");
			//過濾非法記錄(這裡用計數器比較好)
			if (key == null || value == null || key.toString().equals("")){
				return;
			}
			//構造相關屬性
			sortName.set(key.toString());
			score.set(Integer.parseInt(value.toString()));
			//設定聯合key
			combinationKey.setFirstKey(sortName);
			combinationKey.setSecondKey(score);
			
			//通過context把map處理後的結果輸出
			context.write(combinationKey, score);
			System.out.println("---------------------結束map()函式---------------------");
		}
		
	}
	
	
	public static class SecondSortReducer extends Reducer<CombinationKey, IntWritable, Text, Text>{
		
		StringBuffer sb = new StringBuffer();
		Text score = new Text();
		/**
		 * 這裡要注意一下reduce的呼叫時機和次數:
		 * reduce每次處理一個分組的時候會呼叫一次reduce函式。
		 * 所謂的分組就是將相同的key對應的value放在一個集合中
		 * 例如:<sort1,1> <sort1,2>
		 * 分組後的結果就是
		 * <sort1,{1,2}>這個分組會呼叫一次reduce函式
		 */
		protected void reduce(CombinationKey key, Iterable<IntWritable> values, Reducer<CombinationKey, IntWritable, Text, Text>.Context context)
				throws IOException, InterruptedException {
			
			
			//先清除上一個組的資料
			sb.delete(0, sb.length());
			
			for (IntWritable val : values){
				sb.append(val.get() + ",");
			}
			
			//取出最後一個逗號
			if (sb.length() > 0){
				sb.deleteCharAt(sb.length() - 1);
			}
			
			//設定寫出去的value
			score.set(sb.toString());
			
			//將聯合Key的第一個元素作為新的key,將score作為value寫出去
			context.write(key.getFirstKey(), score);
			
			System.out.println("---------------------進入reduce()函式---------------------");
			System.out.println("---------------------{[" + key.getFirstKey()+"," + key.getSecondKey() + "],[" +score +"]}");
			System.out.println("---------------------結束reduce()函式---------------------");
		}
	}
}

程式執行的結果:

五、處理流程

看到前面的程式碼,都知道我在各個元件上已經設定好了相應的標誌,用於追蹤整個MapReduce處理二次排序的處理流程。現在讓我們分別看看Map端和Reduce端的日誌情況。

(1)Map端日誌分析

15/01/19 15:32:29 INFO input.FileInputFormat: Total input paths to process : 1
15/01/19 15:32:29 WARN snappy.LoadSnappy: Snappy native library not loaded
15/01/19 15:32:30 INFO mapred.JobClient: Running job: job_local_0001
15/01/19 15:32:30 INFO mapred.Task:  Using ResourceCalculatorPlugin : null
15/01/19 15:32:30 INFO mapred.MapTask: io.sort.mb = 100
15/01/19 15:32:30 INFO mapred.MapTask: data buffer = 79691776/99614720
15/01/19 15:32:30 INFO mapred.MapTask: record buffer = 262144/327680
---------------------進入map()函式---------------------
---------------------進入自定義分割槽---------------------
---------------------結束自定義分割槽---------------------
---------------------結束map()函式---------------------
---------------------進入map()函式---------------------
---------------------進入自定義分割槽---------------------
---------------------結束自定義分割槽---------------------
---------------------結束map()函式---------------------
---------------------進入map()函式---------------------
---------------------進入自定義分割槽---------------------
---------------------結束自定義分割槽---------------------
---------------------結束map()函式---------------------
---------------------進入map()函式---------------------
---------------------進入自定義分割槽---------------------
---------------------結束自定義分割槽---------------------
---------------------結束map()函式---------------------
---------------------進入map()函式---------------------
---------------------進入自定義分割槽---------------------
---------------------結束自定義分割槽---------------------
---------------------結束map()函式---------------------
---------------------進入map()函式---------------------
---------------------進入自定義分割槽---------------------
---------------------結束自定義分割槽---------------------
---------------------結束map()函式---------------------
---------------------進入map()函式---------------------
---------------------進入自定義分割槽---------------------
---------------------結束自定義分割槽---------------------
---------------------結束map()函式---------------------
---------------------進入map()函式---------------------
---------------------進入自定義分割槽---------------------
---------------------結束自定義分割槽---------------------
---------------------結束map()函式---------------------
15/01/19 15:32:30 INFO mapred.MapTask: Starting flush of map output
------------------進入二次排序-------------------
------------------結束二次排序-------------------
------------------進入二次排序-------------------
------------------結束二次排序-------------------
------------------進入二次排序-------------------
------------------結束二次排序-------------------
------------------進入二次排序-------------------
------------------結束二次排序-------------------
------------------進入二次排序-------------------
------------------結束二次排序-------------------
------------------進入二次排序-------------------
------------------結束二次排序-------------------
------------------進入二次排序-------------------
------------------結束二次排序-------------------
------------------進入二次排序-------------------
------------------結束二次排序-------------------
------------------進入二次排序-------------------
------------------結束二次排序-------------------
------------------進入二次排序-------------------
------------------結束二次排序-------------------
------------------進入二次排序-------------------
------------------結束二次排序-------------------
------------------進入二次排序-------------------
------------------結束二次排序-------------------
15/01/19 15:32:30 INFO mapred.MapTask: Finished spill 0
15/01/19 15:32:30 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
15/01/19 15:32:30 INFO mapred.LocalJobRunner: 
15/01/19 15:32:30 INFO mapred.Task: Task 'attempt_local_0001_m_000000_0' done.
15/01/19 15:32:30 INFO mapred.Task:  Using ResourceCalculatorPlugin : null
15/01/19 15:32:30 INFO mapred.LocalJobRunner: 
從Map端的日誌,我們可以很容易的看出來每一條記錄開始時進入到map()函式進行處理,處理完了之後立馬就自定義分割槽函式中對其進行分割槽,當所有輸入資料經過map()函式和分割槽函式處理之後,就呼叫自定義二次排序函式對其進行排序。

(2)Reduce端日誌分析

15/01/19 15:32:30 INFO mapred.Merger: Merging 1 sorted segments
15/01/19 15:32:30 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 130 bytes
15/01/19 15:32:30 INFO mapred.LocalJobRunner: 
---------------------進入自定義分組---------------------
---------------------分組結果:0
---------------------結束自定義分組---------------------
---------------------進入自定義分組---------------------
---------------------分組結果:-1
---------------------結束自定義分組---------------------
---------------------進入reduce()函式---------------------
---------------------{[sort1,2],[1,2]}
---------------------結束reduce()函式---------------------
---------------------進入自定義分組---------------------
---------------------分組結果:0
---------------------結束自定義分組---------------------
---------------------進入自定義分組---------------------
---------------------分組結果:0
---------------------結束自定義分組---------------------
---------------------進入自定義分組---------------------
---------------------分組結果:-4
---------------------結束自定義分組---------------------
---------------------進入reduce()函式---------------------
---------------------{[sort2,88],[3,54,88]}
---------------------結束reduce()函式---------------------
---------------------進入自定義分組---------------------
---------------------分組結果:0
---------------------結束自定義分組---------------------
---------------------進入自定義分組---------------------
---------------------分組結果:0
---------------------結束自定義分組---------------------
---------------------進入reduce()函式---------------------
---------------------{[sort6,888],[22,58,888]}
---------------------結束reduce()函式---------------------
15/01/19 15:32:30 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
15/01/19 15:32:30 INFO mapred.LocalJobRunner: 
15/01/19 15:32:30 INFO mapred.Task: Task attempt_local_0001_r_000000_0 is allowed to commit now
15/01/19 15:32:30 INFO output.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://liaozhongmin:9000/out
15/01/19 15:32:30 INFO mapred.LocalJobRunner: reduce > reduce
15/01/19 15:32:30 INFO mapred.Task: Task 'attempt_local_0001_r_000000_0' done.
15/01/19 15:32:31 INFO mapred.JobClient:  map 100% reduce 100%
15/01/19 15:32:31 INFO mapred.JobClient: Job complete: job_local_0001
15/01/19 15:32:31 INFO mapred.JobClient: Counters: 19
15/01/19 15:32:31 INFO mapred.JobClient:   File Output Format Counters 
15/01/19 15:32:31 INFO mapred.JobClient:     Bytes Written=40
15/01/19 15:32:31 INFO mapred.JobClient:   FileSystemCounters
15/01/19 15:32:31 INFO mapred.JobClient:     FILE_BYTES_READ=446
15/01/19 15:32:31 INFO mapred.JobClient:     HDFS_BYTES_READ=140
15/01/19 15:32:31 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=131394
15/01/19 15:32:31 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=40
15/01/19 15:32:31 INFO mapred.JobClient:   File Input Format Counters 
15/01/19 15:32:31 INFO mapred.JobClient:     Bytes Read=70
15/01/19 15:32:31 INFO mapred.JobClient:   Map-Reduce Framework
15/01/19 15:32:31 INFO mapred.JobClient:     Reduce input groups=3
15/01/19 15:32:31 INFO mapred.JobClient:     Map output materialized bytes=134
15/01/19 15:32:31 INFO mapred.JobClient:     Combine output records=0
15/01/19 15:32:31 INFO mapred.JobClient:     Map input records=8
15/01/19 15:32:31 INFO mapred.JobClient:     Reduce shuffle bytes=0
15/01/19 15:32:31 INFO mapred.JobClient:     Reduce output records=3
15/01/19 15:32:31 INFO mapred.JobClient:     Spilled Records=16
15/01/19 15:32:31 INFO mapred.JobClient:     Map output bytes=112
15/01/19 15:32:31 INFO mapred.JobClient:     Total committed heap usage (bytes)=391118848
15/01/19 15:32:31 INFO mapred.JobClient:     Combine input records=0
15/01/19 15:32:31 INFO mapred.JobClient:     Map output records=8
15/01/19 15:32:31 INFO mapred.JobClient:     SPLIT_RAW_BYTES=99
15/01/19 15:32:31 INFO mapred.JobClient:     Reduce input records=8
首先,我們看了Reduce端的日誌,第一個資訊我應該很容易能夠很容易看出來,就是分組和reduce()函式處理都是在Shuffle完成之後才進行的。另外一點我們也非常容易看出,就是每次處理完一個分組資料就會去呼叫一次的reduce()函式對這個分組進行處理和輸出。此外,說明一些分組函式的返回值問題,當返回0時才會被分到同一個組中。另外一點我們也可以看出來,一個分組中每合併n個值就會有n-1分組函式返回0值,也就是說進行了n-1次比較。

六、總結

本文主要從MapReduce框架執行的流程,去分析瞭如何去實現二次排序,通過程式碼進行了實現,並且對整個流程進行了驗證。另外,要吐槽一下,網路上有很多文章都記錄了MapReudce處理二次排序問題,但是對MapReduce框架整個處理流程的描述錯漏很多,而且他們最終的流程描述也沒有證據可以支撐。所以,對於網路上的學習資源不能夠完全依賴,要融入自己的思想,並且要重要的觀點進行程式碼或者實踐的驗證。另外,今天在一個hadoop交流群上聽到少部分人在討論,有了hive我們就不用學習些MapReduce程式?對這這個問題我是這麼認為:我不相信寫不好MapReduce程式的程式設計師會寫好hive語句,最起碼的他們對整個執行流程是一無所知的,更不用說效能問題了,有可能連最常見的資料傾斜問題的弄不清楚。


 如果文章寫的有問題,歡迎指出,共同學習!