1. 程式人生 > >MapReduce優化----Shuffle過程剖析及效能優化

MapReduce優化----Shuffle過程剖析及效能優化

MapReduce確保每個reducer的輸入都按鍵排序。

Shuffle:系統執行排序的過程—將map輸出作為輸入傳給reducer(如圖1、圖2)。

圖1

圖2

如圖1、圖2所示,從map輸出到reduce輸入就是shuffle階段。但實際執行過程遠比上圖所示覆雜。

Shuffle 是指從Map 產生輸出開始,包括系統執行排序以及傳送Map 輸出到Reducer 作為輸入的過程。

詳細執行如圖3所示:

圖3

1.    Map端

當Map 開始產生輸出時,它並不是簡單的把資料寫到磁碟,因為頻繁的磁碟操作會導致效能嚴重下降。它的處理過程更復雜,資料首先是寫到記憶體中的一個緩衝區,並做了一些預排序,以提升效率。

每個Map 任務都有一個用來寫入輸出資料的迴圈記憶體緩衝區。這個緩衝區預設大小是100MB,可以通過io.sort.mb 屬性來設定具體大小。當緩衝區中的資料量達到一個特定閥值(io.sort.mb * io.sort.spill.percent,其中io.sort.spill.percent 預設是0.80)時,系統將會啟動一個後臺執行緒把緩衝區中的內容spill 到磁碟。在spill 過程中,Map 的輸出將會繼續寫入到緩衝區,但如果緩衝區已滿,Map 就會被阻塞直到spill 完成。spill 執行緒在把緩衝區的資料寫到磁碟前,會對它進行一個二次快速排序,首先根據資料所屬的partition 排序,然後每個partition 中再按Key 排序。輸出包括一個索引檔案和資料檔案。

如果設定了Combiner,將在排序輸出的基礎上執行。Combiner 就是一個Mini Reducer,它在執行Map 任務的節點本身執行,先對Map 的輸出做一次簡單Reduce,使得Map 的輸出更緊湊,更少的資料會被寫入磁碟和傳送到Reducer。

spill 檔案儲存在由mapred.local.dir指定的目錄中,Map 任務結束後刪除。

每當記憶體中的資料達到spill 閥值的時候,都會產生一個新的spill 檔案,所以在Map任務寫完它的最後一個輸出記錄時,可能會有多個spill 檔案。在Map 任務完成前,所有的spill 檔案將會被歸併排序為一個索引檔案和資料檔案。這是一個多路歸併過程,最大歸併路數由io.sort.factor

 控制(預設是10)。如果設定了Combiner,並且spill檔案的數量至少是3(由min.num.spills.for.combine 屬性控制),那麼Combiner 將在輸出檔案被寫入磁碟前執行以壓縮資料。

對寫入到磁碟的資料進行壓縮,通常是一個很好的方法,因為這樣做使得資料寫入磁碟的速度更快,節省磁碟空間,並減少需要傳送到Reducer 的資料量。預設輸出是不被壓縮的, 但可以很簡單的設定mapred.compress.map.output 為true 啟用該功能。壓縮所使用的庫由mapred.map.output.compression.codec 來設定。

當spill 檔案歸併完畢後,Map 將刪除所有的臨時spill 檔案,並告知TaskTracker 任務已完成。Reducers 通過HTTP來獲取對應的資料。用來傳輸partitions 資料的工作執行緒數由tasktracker.http.threads 控制,這個設定是針對每一個TaskTracker 的,並不是單個Map,預設值為40,在執行大作業的大叢集上可以增大以提升資料傳輸速率。

2.    Reduce端

2.1 copy階段

Map 的輸出檔案放置在執行Map 任務的TaskTracker 的本地磁碟上(注意:Map 輸出總是寫到本地磁碟,但Reduce 輸出不是,一般是寫到HDFS),它是執行Reduce 任務的TaskTracker 所需要的輸入資料。Reduce 任務的輸入資料分佈在叢集內的多個Map 任務的輸出中,Map 任務可能會在不同的時間內完成,只要完成的Map 任務數達到佔總Map任務數一定比例(mapred.reduce.slowstart.completed.maps 預設0.05),Reduce 任務就開始拷貝它的輸出。

         Reduce 任務擁有多個拷貝執行緒, 可以並行的獲取Map 輸出。可以通過設定mapred.reduce.parallel.copies 來改變執行緒數,預設是5。

如果Map 輸出足夠小,它們會被拷貝到Reduce TaskTracker 的記憶體中(緩衝區的大小

mapred.job.shuffle.input.buffer.percent 控制,指定了用於此目的的堆記憶體的百分比);如果緩衝區空間不足,會被拷貝到磁碟上。當記憶體中的緩衝區用量達到一定比例閥值(由mapred.job.shuffle.merge.percent 控制),或者達到了Map 輸出的閥值大小(由mapred.inmem.merge.threshold 控制),緩衝區中的資料將會被歸併然後spill到磁碟。

拷貝來的資料疊加在磁碟上,有一個後臺執行緒會將它們歸併為更大的排序檔案,這樣做節省了後期歸併的時間。對於經過壓縮的Map 輸出,系統會自動把它們解壓到記憶體方便對其執行歸併。

2.2 sort階段

當所有的Map 輸出都被拷貝後,Reduce 任務進入排序階段(更恰當的說應該是歸併階段,因為排序在Map 端就已經完成),這個階段會對所有的Map 輸出進行歸併排序,這個工作會重複多次才能完成。

假設這裡有50 個Map 輸出(可能有儲存在記憶體中的),並且歸併因子是10(由io.sort.factor 控制,就像Map 端的merge 一樣),那最終需要5 次歸併。每次歸併會把10個檔案歸併為一個,最終生成5 箇中間檔案。

注:每趟合併的檔案數實際上比示例中展示的更微妙。目標是合併最小數量的檔案以便滿足最後一趟的合併係數。因此如果是40個檔案,我們不會在四趟中,每趟合併10個檔案從而得到4個檔案。相反,第一趟只合並4個檔案,隨後三趟合併所有十個檔案。在最後一趟中,4個已合併的檔案和餘下的6個(未合併的)檔案合計10個檔案。這並沒有改變合併的次數,它只是一個優化措施,儘量減少寫到磁碟的資料量,因為最後一趟總是直接合併到reduce。

2.3 reduce階段

在Reduce 階段,Reduce 函式會作用在排序輸出的每一個key 上。這個階段的輸出被直接寫到輸出檔案系統,一般是HDFS。在HDFS 中,因為TaskTracker 節點也執行著一個DataNode 程序,所以第一個塊備份會直接寫到本地磁碟。

3.    配置調優

該配置調優方案主要是對以上Shuffle整個過程中涉及到的配置項按流程順序一一呈現並給以調優建議。

1. Map端

1) io.sort.mb

用於map輸出排序的記憶體緩衝區大小

型別:Int

預設:100mb

備註:如果能估算map輸出大小,就可以合理設定該值來儘可能減少溢位寫的次數,這對調優很有幫助。

2)io.sort.spill.percent

map輸出排序時的spill閥值(即使用比例達到該值時,將緩衝區中的內容spill 到磁碟)

型別:float

預設:0.80

3)io.sort.factor

歸併因子(歸併時的最多合併的流數),map、reduce階段都要用到

型別:Int

預設:10

備註:將此值增加到100是很常見的。

4)min.num.spills.for.combine

執行combiner所需的最少溢位寫檔案數(如果已指定combiner)

型別:Int

預設:3

5)mapred.compress.map.output

map輸出是否壓縮

型別:Boolean

預設:false

備註:如果map輸出的資料量非常大,那麼在寫入磁碟時壓縮資料往往是個很好的主意,因為這樣會讓寫磁碟的速度更快,節約磁碟空間,並且減少傳給reducer的資料量。

6)mapred.map.output.compression.codec

用於map輸出的壓縮編解碼器

型別:Classname

預設:org.apache.hadoop.io.compress.DefaultCodec

備註:推薦使用LZO壓縮。Intel內部測試表明,相比未壓縮,使用LZO壓縮的 TeraSort作業,執行時間減少60%,且明顯快於Zlib壓縮。

7) tasktracker.http.threads

每個tasktracker的工作執行緒數,用於將map輸出到reducer。

(注:這是叢集範圍的設定,不能由單個作業設定)

型別:Int

預設:40

備註:tasktracker開http服務的執行緒數。用於reduce拉取map輸出資料,大叢集可以將其設為40~50。

2. reduce端

1)mapred.reduce.slowstart.completed.maps

呼叫reduce之前,map必須完成的最少比例

型別:float

預設:0.05

2)mapred.reduce.parallel.copies

reducer在copy階段同時從mapper上拉取的檔案數

型別:int

預設:5

3)mapred.job.shuffle.input.buffer.percent

在shuffle的複製階段,分配給map輸出的緩衝區佔堆空間的百分比

型別:float

預設:0.70

4)mapred.job.shuffle.merge.percent

map輸出緩衝區(由mapred.job.shuffle.input.buffer.percent定義)使用比例閥值,當達到此閥值,緩衝區中的資料將會被歸併然後spill 到磁碟。

型別:float

預設:0.66

5)mapred.inmem.merge.threshold

map輸出緩衝區中檔案數

型別:int

預設:1000

備註:0或小於0的數意味著沒有閥值限制,溢位寫將有mapred.job.shuffle.merge.percent單獨控制。

6)mapred.job.reduce.input.buffer.percent

在reduce過程中,在記憶體中儲存map輸出的空間佔整個堆空間的比例。

型別:float

預設:0.0

備註:reduce階段開始時,記憶體中的map輸出大小不能大於該值。預設情況下,在reduce任務開始之前,所有的map輸出都合併到磁碟上,以便為reducer提供儘可能多的記憶體。然而,如果reducer需要的記憶體較少,則可以增加此值來最小化訪問磁碟的次數,以提高reduce效能。

3.效能調優補充

相對於大批量的小檔案,hadoop更合適處理少量的大檔案。一個原因是FileInputFormat生成的InputSplit是一個檔案或該檔案的一部分。如果檔案很小,並且檔案數量很多,那麼每次map任務只處理很少的輸入資料,每次map操作都會造成額外的開銷。

問題一:大批量的小檔案處理(如果可能,應該儘量避免該情況)

解決方案:用CombineFileInputFormat輸入檔案,它能把多個檔案打包到一個分片中以便每個mapper可以處理更多的資料。關鍵是,決定哪些塊放入同一個分片時,CombineFileInputFormat會考慮到節點和機架的因素,所以在典型MapReduce作業中處理輸入的速度並不會下降。

問題二:如何控制map數量以及設定合適的reduce數量