1. 程式人生 > >【hadoop】MapReduce工作流程和MapTask、Shuffle、ReduceTask工作機制

【hadoop】MapReduce工作流程和MapTask、Shuffle、ReduceTask工作機制

MapReduce整個工作流程:



一、MapTask階段

1Read階段:MapTask通過使用者編寫的RecordReader,從輸入InputSplit中解析出

一個個key/value

2Map階段:該節點主要是將解析出的key/value交給使用者編寫map()函式處理,並

產生一系列新的key/value

3Collect收集階段:在使用者編寫map()函式中,當資料處理完成後,一般會呼叫

OutputCollector.collect()輸出結果。在該函式內部,它會將生成的key/value分割槽(呼叫

Partitioner),並寫入一個環形記憶體緩衝區中。

4Spill階段:即溢寫

,當環形緩衝區滿後,MapReduce 會將資料寫到本地磁碟上,

生成一個臨時檔案。需要注意的是,將資料寫入本地磁碟之前,先要對資料進行一次本地排

序,並在必要時對資料進行合併、壓縮等操作。

溢寫階段詳情:

步驟1:利用快速排序演算法對快取區內的資料進行排序,排序方式是,先按照分割槽編號

partition進行排序,然後按照key進行排序。這樣,經過排序後,資料以分割槽為單位聚集在

一起,且同一分割槽內所有資料按照key有序。

步驟2:按照分割槽編號由小到大依次將每個分割槽中的資料寫入任務工作目錄下的臨時文

output/spillN.outN表示當前溢寫次數)中。如果使用者設定了Combiner,則寫入檔案之

前,對每個分割槽中的資料進行一次聚集操作。

步驟3:將分割槽資料的元資訊寫到記憶體索引資料結構SpillRecord中,其中每個分割槽的元

資訊包括在臨時檔案中的偏移量、壓縮前資料大小和壓縮後資料大小。如果當前記憶體索引大

小超過1MB,則將記憶體索引寫到檔案output/spillN.out.index中。

5Combine 階段:當所有資料處理完成後,MapTask對所有臨時檔案進行一次合併,

以確保最終只會生成一個數據檔案。

當所有資料處理完後,MapTask會將所有臨時檔案合併成一個大檔案,並儲存到檔案

output/file.out中,同時生成相應的索引檔案output/file.out.index

在進行檔案合併過程中,MapTask以分割槽為單位進行合併。對於某個分割槽,它將採用多

輪遞迴合併的方式。每輪合併io.sort.factor(預設100)個檔案,並將產生的檔案重新加入

待合併列表中,對檔案排序後,重複以上過程,直到最終得到一個大檔案。

讓每個MapTask最終只生成一個數據檔案,可避免同時開啟大量檔案和同時讀取大量

小檔案產生的隨機讀取帶來的開銷。

資訊包括在臨時檔案中的偏移量、壓縮前資料大小和壓縮後資料大小。如果當前記憶體索引大

小超過1MB,則將記憶體索引寫到檔案output/spillN.out.index中。

5Combine 階段:當所有資料處理完成後,MapTask對所有臨時檔案進行一次合併,

以確保最終只會生成一個數據檔案。

當所有資料處理完後,MapTask會將所有臨時檔案合併成一個大檔案,並儲存到檔案

output/file.out中,同時生成相應的索引檔案output/file.out.index

在進行檔案合併過程中,MapTask以分割槽為單位進行合併。對於某個分割槽,它將採用多

輪遞迴合併的方式。每輪合併io.sort.factor(預設100)個檔案,並將產生的檔案重新加入

待合併列表中,對檔案排序後,重複以上過程,直到最終得到一個大檔案。

讓每個MapTask最終只生成一個數據檔案,可避免同時開啟大量檔案和同時讀取大量

小檔案產生的隨機讀取帶來的開銷。

二、Shuffle階段


1maptask收集我們的map()方法輸出的kv對,放到記憶體緩衝區中

2)從記憶體緩衝區不斷溢位本地磁碟檔案,可能會溢位多個檔案

3)多個溢位檔案會被合併成大的溢位檔案

4)在溢位過程中,及合併的過程中,都要呼叫partitioner進行分割槽和針對key進行排

5reducetask根據自己的分割槽號,去各個maptask機器上取相應的結果分割槽資料

6reducetask會取到同一個分割槽的來自不同maptask的結果檔案,reducetask會將這些

檔案再進行合併(歸併排序)

7)合併成大檔案後,shuffle的過程也就結束了,後面進入reducetask的邏輯運算過程

(從檔案中取出一個一個的鍵值對group,呼叫使用者自定義的reduce()方法)

3)注意

Shuffle中的緩衝區大小會影響到mapreduce程式的執行效率,原則上說,緩衝區越大,磁碟io的次數越少,執行速度就越快。

緩衝區的大小可以通過引數調整,引數:io.sort.mb預設100M

三、ReduceTask階段


1Copy階段:ReduceTask從各個MapTask上遠端拷貝一片資料,並針對某一片數據,如果其大小超過一定閾值,則寫到磁碟上,否則直接放到記憶體中。

2Merge階段:在遠端拷貝資料的同時,ReduceTask啟動了兩個後臺執行緒對記憶體和磁碟上的檔案進行合併,以防止記憶體使用過多或磁碟上檔案過多。

3Sort階段:按照MapReduce語義,使用者編寫reduce()函式輸入資料是按key進行聚集的一組資料。為了將key相同的資料聚在一起,Hadoop採用了基於排序的策略。由於各個MapTask已經實現對自己的處理結果進行了區域性排序,因此,ReduceTask只需對所有資料進行一次歸併排序即可。

4Reduce階段:reduce()函式將計算結果寫到HDFS上。