1. 程式人生 > 實用技巧 >Hadoop——MapReduce過程詳解

Hadoop——MapReduce過程詳解

在這裡插入圖片描述
1、MapReduce程式讀取檔案的輸入目錄上存放的相應檔案

2、客戶端在submit()方法執行之前獲取要處理的資料資訊,根據叢集中的配置形成一個任務分配規劃

3、客戶端提交切片資訊給Yarn,Yarn中的resourcemanager啟動MRAppmaster

----------------------maptask開始

4、MRAPPmaster啟動後根據本次job的描述資訊計算出需要maptask的例項物件。首先,讀取資料元件InputFormat(預設TextInputFormat)getSplits方法對輸入目錄中檔案進行邏輯切片規劃得到splits,有多少個split就對應啟動多少個MapTask。split與block的對應關係預設是一對一。

5、將輸入檔案切分為splits之後,由RecordReader物件(預設LineRecordReader)進行讀取,以\n作為分隔符,讀取一行資料,返回<key,value>。Key表示每行首字元偏移值,value表示這一行文字內容。

6、將剛才獲取的<k,v>鍵值對當作輸入引數傳入使用者自己繼承的Mapper類中,執行使用者重寫的map函式。RecordReader讀取一行,這裡呼叫一次。

---------------------shuffle開始

7、map邏輯完之後,生成新的<k,v>鍵值對,把它們通過context.write進行collect資料收集。首先對資料進行分割槽處理,預設使用HashPartitioner。

(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks
預設分割槽是根據key的hashCode對ReduceTasks個數取模得到的

8、然後把資料傳送到環形緩衝區,緩衝區預設是100MB,環形緩衝區達到80%時,觸發spill溢寫,在spill之前會資料進行排序,排序按照key的索引進行字典排序,排序的手段是快速排序

9、如果job設定過Combiner,那麼現在就是使用Combiner的時候了。將有相同key的key/value對的value加起來,減少溢寫到磁碟的資料量。Combiner會優化MapReduce的中間結果,所以它在整個模型中會多次使用。

那哪些場景才能使用Combiner呢?從這裡分析,Combiner的輸出是Reducer的輸入,Combiner絕不能改變最終的計算結果。Combiner只應該用於那種Reduce的輸入key/value與輸出key/value型別完全一致,且不影響最終結果的場景。比如累加,最大值等。注意求平均值不能使用Combiner,會影響最終結果

9、合併溢寫檔案:每次溢寫會在磁碟上生成一個臨時檔案(寫之前判斷是否有combiner),如果map的輸出結果真的很大,有多次這樣的溢寫發生,磁碟上相應的就會有多個臨時檔案存在。當整個資料處理結束之後開始對磁碟中的臨時檔案進行merge合併,因為最終的檔案只有一個,寫入磁碟,並且為這個檔案提供了一個索引檔案,以記錄每個reduce對應資料的偏移量。

---------------------maptask結束

---------------------reducetask開始

10、Copy階段:ReduceTask從各個MapTask上遠端拷貝一片資料,並針對某一片資料,如果其大小超過一定閾值,則寫到磁碟上,否則直接放到記憶體中。
11、Merge階段。這裡的merge如map端的merge動作,只是陣列中存放的是不同map端copy來的數值。Copy過來的資料會先放入記憶體緩衝區中,這裡的緩衝區大小要比map端的更為靈活。merge有三種形式:記憶體到記憶體;記憶體到磁碟;磁碟到磁碟。預設情況下第一種形式不啟用。當記憶體中的資料量到達一定閾值,就啟動記憶體到磁碟的merge。與map 端類似,這也是溢寫的過程,這個過程中如果你設定有Combiner,也是會啟用的,然後在磁碟中生成了眾多的溢寫檔案。第二種merge方式一直在執行,直到沒有map端的資料時才結束,然後啟動第三種磁碟到磁碟的merge方式生成最終的檔案。

12、合併排序。把分散的資料合併成一個大的資料後,還會再對合並後的資料排序。

-----------------------shuffle結束

13、對排序後的鍵值對呼叫reduce方法,鍵相等的鍵值對呼叫一次reduce方法,每次呼叫會產生零個或者多個鍵值對,最後把這些輸出的鍵值對寫入到HDFS檔案中。

----------------------reducetask結束