1. 程式人生 > >MapReduce中的Shuffle和Sort分析

MapReduce中的Shuffle和Sort分析

MapReduce 是現今一個非常流行的分散式計算框架,它被設計用於平行計算海量資料。第一個提出該技術框架的是Google 公司,而Google 的靈感則來自於函數語言程式設計語言,如LISP,Scheme,ML 等。

MapReduce 框架的核心步驟主要分兩部分:Map 和Reduce。當你向MapReduce 框架提交一個計算作業時,它會首先把計算作業拆分成若干個Map 任務,然後分配到不同的節點上去執行,每一個Map 任務處理輸入資料中的一部分,當Map 任務完成後,它會生成一些中間檔案,這些中間檔案將會作為Reduce 任務的輸入資料。Reduce 任務的主要目標就是把前面若干個Map 的輸出彙總到一起並輸出。

本文的重點是剖析MapReduce 的核心過程——Shuffle和Sort。在本文中,Shuffle是指從Map 產生輸出開始,包括系統執行排序以及傳送Map 輸出到Reducer 作為輸入的過程。在這裡我們將去探究Shuffle是如何工作的,因為對基礎的理解有助於對MapReduce 程式進行調優。

首先從Map 端開始分析。當Map 開始產生輸出時,它並不是簡單的把資料寫到磁碟,因為頻繁的磁碟操作會導致效能嚴重下降。它的處理過程更復雜,資料首先是寫到記憶體中的一個緩衝區,並做了一些預排序,以提升效率。

每個Map 任務都有一個用來寫入輸出資料的迴圈記憶體緩衝區。這個緩衝區預設大小是100MB,可以通過io.sort.mb 屬性來設定具體大小。當緩衝區中的資料量達到一個特定閥值(io.sort.mb * io.sort.spill.percent,其中io.sort.spill.percent 預設是0.80)時,系統將會啟動一個後臺執行緒把緩衝區中的內容spill 到磁碟。在spill 過程中,Map 的輸出將會繼續寫入到緩衝區,但如果緩衝區已滿,Map 就會被阻塞直到spill 完成。spill 執行緒在把緩衝區的資料寫到磁碟前,會對它進行一個二次快速排序,首先根據資料所屬的partition 排序,然後每個partition 中再按Key 排序。輸出包括一個索引檔案和資料檔案。如果設定了Combiner,將在排序輸出的基礎上執行。Combiner 就是一個Mini Reducer,它在執行Map 任務的節點本身執行,先對Map 的輸出做一次簡單Reduce,使得Map 的輸出更緊湊,更少的資料會被寫入磁碟和傳送到Reducer。spill 檔案儲存在由mapred.local.dir指定的目錄中,Map 任務結束後刪除。

每當記憶體中的資料達到spill 閥值的時候,都會產生一個新的spill 檔案,所以在Map任務寫完它的最後一個輸出記錄時,可能會有多個spill 檔案。在Map 任務完成前,所有的spill 檔案將會被歸併排序為一個索引檔案和資料檔案,如圖3 所示。這是一個多路歸併過程,最大歸併路數由io.sort.factor 控制(預設是10)。如果設定了Combiner,並且spill檔案的數量至少是3(由min.num.spills.for.combine 屬性控制),那麼Combiner 將在輸出檔案被寫入磁碟前執行以壓縮資料。

對寫入到磁碟的資料進行壓縮(這種壓縮同Combiner 的壓縮不一樣)通常是一個很好的方法,因為這樣做使得資料寫入磁碟的速度更快,節省磁碟空間,並減少需要傳送到Reducer 的資料量。預設輸出是不被壓縮的, 但可以很簡單的設定mapred.compress.map.output 為true 啟用該功能。壓縮所使用的庫由mapred.map.output.compression.codec 來設定,

目前主要有以下幾個壓縮格式:

DEFLATE 無DEFLATE .deflate 不支援不可以

gzip gzip DEFLATE .gz 不支援不可以

ZIP zip DEFLATE .zip 支援可以

bzip2 bzip2 bzip2 .bz2 不支援可以

LZO lzop LZO .lzo 不支援不可以

bbs.hadoopor.com ——–hadoop 技術論壇

當spill 檔案歸併完畢後,Map 將刪除所有的臨時spill 檔案,並告知TaskTracker 任務已完成。Reducers 通過HTTP 來獲取對應的資料。用來傳輸partitions 資料的工作執行緒數由tasktracker.http.threads 控制,這個設定是針對每一個TaskTracker 的,並不是單個Map,預設值為40,在執行大作業的大叢集上可以增大以提升資料傳輸速率。

現在讓我們轉到Shuffle的Reduce 部分。Map 的輸出檔案放置在執行Map 任務的TaskTracker 的本地磁碟上(注意:Map 輸出總是寫到本地磁碟,但Reduce 輸出不是,一般是寫到HDFS),它是執行Reduce 任務的TaskTracker 所需要的輸入資料。Reduce 任務的輸入資料分佈在叢集內的多個Map 任務的輸出中,Map 任務可能會在不同的時間內完成,只要有其中的一個Map 任務完成,Reduce 任務就開始拷貝它的輸出。這個階段稱之為拷貝階段。Reduce 任務擁有多個拷貝執行緒, 可以並行的獲取Map 輸出。可以通過設定mapred.reduce.parallel.copies 來改變執行緒數,預設是5。

Reducer 是怎麼知道從哪些TaskTrackers 中獲取Map 的輸出呢?當Map 任務完成之後,會通知它們的父TaskTracker,告知狀態更新,然後TaskTracker 再轉告JobTracker。這些通知資訊是通過心跳通訊機制傳輸的。因此針對一個特定的作業,JobTracker 知道Map 輸出與TaskTrackers 的對映關係。Reducer 中有一個執行緒會間歇的向JobTracker 詢問Map 輸出的地址,直到把所有的資料都取到。在Reducer 取走了Map 輸出之後,TaskTrackers 不會立即刪除這些資料,因為Reducer 可能會失敗。它們會在整個作業完成後,JobTracker告知它們要刪除的時候才去刪除。

如果Map 輸出足夠小,它們會被拷貝到Reduce TaskTracker 的記憶體中(緩衝區的大小

由mapred.job.shuffle.input.buffer.percent 控制,制定了用於此目的的堆記憶體的百分比);如果緩衝區空間不足,會被拷貝到磁碟上。當記憶體中的緩衝區用量達到一定比例閥值(由mapred.job.shuffle.merge.threshold 控制),或者達到了Map 輸出的閥值大小(由mapred.inmem.merge.threshold 控制),緩衝區中的資料將會被歸併然後spill 到磁碟。

拷貝來的資料疊加在磁碟上,有一個後臺執行緒會將它們歸併為更大的排序檔案,這樣做節省了後期歸併的時間。對於經過壓縮的Map 輸出,系統會自動把它們解壓到記憶體方便對其執行歸併。

當所有的Map 輸出都被拷貝後,Reduce 任務進入排序階段(更恰當的說應該是歸併階段,因為排序在Map 端就已經完成),這個階段會對所有的Map 輸出進行歸併排序,這個工作會重複多次才能完成。

假設這裡有50 個Map 輸出(可能有儲存在記憶體中的),並且歸併因子是10(由io.sort.factor 控制,就像Map 端的merge 一樣),那最終需要5 次歸併。每次歸併會把10個檔案歸併為一個,最終生成5 箇中間檔案。在這一步之後,系統不再把5 箇中間檔案歸併壓縮格式工具演算法副檔名支援分卷是否可分割成一個,而是排序後直接“喂”給Reduce 函式,省去向磁碟寫資料這一步。最終歸併的資料可以是混合資料,既有記憶體上的也有磁碟上的。由於歸併的目的是歸併最少的檔案數目,使得在最後一次歸併時總檔案個數達到歸併因子的數目,所以每次操作所涉及的檔案個數在實際中會更微妙些。譬如,如果有40 個檔案,並不是每次都歸併10 個最終得到4 個檔案,相反第一次只歸併4 個檔案,然後再實現三次歸併,每次10 個,最終得到4 個歸併好的檔案和6 個未歸併的檔案。要注意,這種做法並沒有改變歸併的次數,只是最小化寫入磁碟的資料優化措施,因為最後一次歸併的資料總是直接送到Reduce 函式那裡。

在Reduce 階段,Reduce 函式會作用在排序輸出的每一個key 上。這個階段的輸出被直接寫到輸出檔案系統,一般是HDFS。在HDFS 中,因為TaskTracker 節點也執行著一個DataNode 程序,所以第一個塊備份會直接寫到本地磁碟。

到此,MapReduce 的Shuffle和Sort分析完畢。