MapReduce框架原理-MapTask和ReduceTask工作機制
MapTask工作機制
並行度決定機制
1)問題引出
maptask的並行度決定map階段的任務處理併發度,進而影響到整個job的處理速度。那麼,mapTask並行任務是否越多越好呢?
2)MapTask並行度決定機制
一個job的map階段MapTask並行度(個數),由客戶端提交job時的切片個數決定。
切片(邏輯上的切分)大小預設等於128M,和block大小相等,原因是如果不按照block大小進行切分,可能會涉及到一些不同節點之間資料的傳輸。
MapTask工作機制
總結
- read階段:讀取數並行度決定機制據成key-value
- map階段:將讀取的key-value進行處理,生成新的key-value
- collect階段:將map的資料寫到環形緩衝區(分割槽)中
- spill溢寫階段:環形緩衝區資料滿80%後溢寫磁碟,只不過溢寫之前需要進行排序
- combine階段:合併小檔案(而不是執行Combiner業務邏輯):歸併排序,將一些多次產生的小檔案進行合併,形成一個大檔案
【注意】MapTask的數量是由切片數決定的,雖然Maptask不能直接設定,但是我們可以通過設定切片個數去完成MapTask數量的指定
詳細步驟
(1)Read階段:Map Task通過使用者編寫的RecordReader,從輸入InputSplit中解析出一個個key/value。
(2)Map階段:該節點主要是將解析出的key/value交給使用者編寫map()函式處理,併產生一系列新的key/value。
(3)Collect收集階段:在使用者編寫map()函式中,當資料處理完成後,一般會呼叫OutputCollector.collect()輸出結果。在該函式內部,它會將生成的key/value分割槽(呼叫Partitioner),並寫入一個環形記憶體緩衝區中。
(4)Spill階段:即"溢寫",當環形緩衝區滿後,MapReduce會將資料寫到本地磁碟上,生成一個臨時檔案。需要注意的是,將資料寫入本地磁碟之前,先要對資料進行一次本地排序,並在必要時對資料進行合併、壓縮等操作。
溢寫階段詳情:
步驟1:利用快速排序演算法對快取區內的資料進行排序,排序方式是,先按照分割槽編號partition進行排序,然後按照key進行排序。這樣,經過排序後,資料以分割槽為單位聚集在一起,且同一分割槽內所有資料按照key有序。
步驟2:按照分割槽編號由小到大依次將每個分割槽中的資料寫入任務工作目錄下的臨時檔案output/spillN.out(N表示當前溢寫次數)中。如果使用者設定了Combiner,則寫入檔案之前,對每個分割槽中的資料進行一次聚集操作。
步驟3:將分割槽資料的元資訊寫到記憶體索引資料結構SpillRecord中,其中每個分割槽的元資訊包括在臨時檔案中的偏移量、壓縮前資料大小和壓縮後資料大小。如果當前記憶體索引大小超過1MB,則將記憶體索引寫到檔案output/spillN.out.index中。
(5)Combine階段:當所有資料處理完成後,MapTask對所有臨時檔案進行一次合併,以確保最終只會生成一個數據檔案。
當所有資料處理完後,MapTask會將所有臨時檔案合併成一個大檔案,並儲存到檔案output/file.out中,同時生成相應的索引檔案output/file.out.index。
在進行檔案合併過程中,MapTask以分割槽為單位進行合併。對於某個分割槽,它將採用多輪遞迴合併的方式。每輪合併io.sort.factor(預設100)個檔案,並將產生的檔案重新加入待合併列表中,對檔案排序後,重複以上過程,直到最終得到一個大檔案。
讓每個MapTask最終只生成一個數據檔案,可避免同時開啟大量檔案和同時讀取大量小檔案產生的隨機讀取帶來的開銷。
ReduceTask工作機制
ReduceTask工作機制
總結
- copy階段:將不同的MapTask計算出來的資料拷貝到指定的ReduceTask,預設是放在記憶體中的。如果記憶體容量不足,那麼寫在磁碟中
- merge階段:主要將從不同MapTask拷貝過來的資料檔案進行一次合併,形成一個整體的大檔案
- sort階段:分組排序階段。主要是用來保證key相同的邏輯判斷。sort階段可以不存在
- reduce階段:將一組相同的key執行一次業務邏輯即可
【注意】ReduceTask的數量和MapTask的數量不一樣,ReduceTask數量可以手動指定,但是數量指定是有一定要求的。一般預設情況下,ReduceTask的數量應該和map,collect階段寫出的分割槽數目對應。
詳細步驟
(1)Copy階段:ReduceTask從各個MapTask上遠端拷貝一片資料,並針對某一片資料,如果其大小超過一定閾值,則寫到磁碟上,否則直接放到記憶體中。
(2)Merge階段:在遠端拷資料的同時,ReduceTask啟動了兩個後臺執行緒對記憶體和磁碟上的檔案進行合併,以防止記憶體使用過多或磁碟上檔案過多。
(3)Sort階段:按照MapReduce語義,使用者編寫reduce()函式輸入資料是按key進行聚集的一組資料。為了將key相同的資料聚在一起,Hadoop採用了基於排序的策略。由於各個MapTask已經實現對自己的處理結果進行了區域性排序,因此,ReduceTask只需對所有資料進行一次歸併排序即可。
(4)Reduce階段:reduce()函式將計算結果寫到HDFS上。
注意
從Map的collect階段到reduce的sort階段統稱為shuffle階段。
其中Map中的collect、spill、combine階段稱之為Map階段的shuffle
Reduce中的copy、merge、sort階段稱之為Reduce階段的shuffle
設定ReduceTask
reducetask的並行度同樣影響整個job的執行併發度和執行效率,但與maptask的併發數由切片數決定不同,Reducetask數量的決定是可以直接手動設定:
//預設值是1,手動設定為4 job.setNumReduceTasks(4); |
實驗:測試reducetask多少合適。
實驗環境:1個master節點,16個slave節點:CPU:8GHZ,記憶體:2G
實驗結論:
表1 改變reduce task (資料量為1GB)
Map task =16 |
||||||||||
Reduce task |
1 |
5 |
10 |
15 |
16 |
20 |
25 |
30 |
45 |
60 |
總時間 |
892 |
146 |
110 |
92 |
88 |
100 |
128 |
101 |
145 |
104 |
注意事項
-
一旦setNumReduceTask設為0的話,就代表沒有reduce階段。也就意味著map階段處理完成資料後,直接把資料放到最終結果檔案中,不經過reduce處理了。在這種情況下,我們指定map階段輸出的key-value值的型別時就不能寫如下這個方式了:
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
應該直接寫:
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
此時結果資料輸出檔案就不再是part-r-00000了,而是part-m-00000。
- ReduceTask預設值就是1,所以輸出檔案個數為一個。
- 如果資料分佈不均勻,就有可能在Reduce階段產生資料傾斜
- ReduceTask數量並不是任意設定,還要考慮業務邏輯需求,有些情況下,需要計算全域性彙總結果,就只能有1個ReduceTask。
- 具體多少個ReduceTask,需要根據叢集效能而定。
- 如果分割槽數不是1,但是ReduceTask為1,是否執行分割槽過程。答案是:不執行分割槽過程。因為在MapTask的原始碼中,執行分割槽的前提是先判斷ReduceNum個數是否大於1。不大於1肯定不執行。