shuffle和sort分析
阿新 • • 發佈:2017-07-14
理解 不同 http 寫入 mapr 中一 重復 進入 ons
MapReduce中的Shuffle和Sort分析
MapReduce 是現今一個非常流行的分布式計算框架,它被設計用於並行計算海量數據。第一個提出該技術框架的是Google 公司,而Google 的靈感則來自於函數式編程語言,如LISP,Scheme,ML 等。MapReduce 框架的核心步驟主要分兩部分:Map 和Reduce。當你向MapReduce 框架提交一個計算作業時,它會首先把計算作業拆分成若幹個Map 任務,然後分配到不同的節點上去執行,每一個Map 任務處理輸入數據中的一部分,當Map 任務完成後,它會生成一些中間文件,這些中間文件將會作為Reduce 任務的輸入數據。Reduce 任務的主要目標就是把前面若幹個Map 的輸出匯總到一起並輸出。從高層抽象來看,MapReduce的數據流圖如圖1 所示: 本文的重點是剖析MapReduce的核心過程----Shuffle和Sort。在本文中,Shuffle是指從Map產生輸出開始,包括系統執行排序以及傳送Map輸出到Reducer作為輸入的過程。在這裏我們將去探究Shuffle是如何工作的,因為對基礎的理解有助於對MapReduce程序進行調優。 首先從Map端開始分析,當Map開始產生輸出的時候,他並不是簡單的把數據寫到磁盤,因為頻繁的操作會導致性能嚴重下降,他的處理更加復雜,數據首先是寫到內存中的一個緩沖區,並作一些預排序,以提升效率,如圖: 每個Map任務都有一個用來寫入輸出數據的循環內存緩沖區,這個緩沖區默認大小是100M,可以通過io.sort.mb屬性來設置具體的大小,當緩沖區中的數據量達到一個特定的閥值(io.sort.mb * io.sort.spill.percent,其中io.sort.spill.percent 默認是0.80)時,系統將會啟動一個後臺線程把緩沖區中的內容spill 到磁盤。在spill過程中,Map的輸出將會繼續寫入到緩沖區,但如果緩沖區已經滿了,Map就會被阻塞直道spill完成。spill線程在把緩沖區的數據寫到磁盤前,會對他進行一個二次排序,首先根據數據所屬的partition排序,然後每個partition中再按Key排序。輸出包括一個索引文件和數據文件,如果設定了Combiner,將在排序輸出的基礎上進行。Combiner就是一個Mini Reducer,它在執行Map任務的節點本身運行,先對Map的輸出作一次簡單的Reduce,使得Map的輸出更緊湊,更少的數據會被寫入磁盤和傳送到Reducer。Spill文件保存在由mapred.local.dir指定的目錄中,Map任務結束後刪除。 每當內存中的數據達到spill閥值的時候,都會產生一個新的spill文件,所以在Map任務寫完他的最後一個輸出記錄的時候,可能會有多個spill文件,在Map任務完成前,所有的spill文件將會被歸並排序為一個索引文件和數據文件。如圖3 所示。這是一個多路歸並過程,最大歸並路數由io.sort.factor 控制(默認是10)。如果設定了Combiner,並且spill文件的數量至少是3(由min.num.spills.for.combine 屬性控制),那麽Combiner 將在輸出文件被寫入磁盤前運行以壓縮數據。 對寫入到磁盤的數據進行壓縮(這種壓縮同Combiner 的壓縮不一樣)通常是一個很好的方法,因為這樣做使得數據寫入磁盤的速度更快,節省磁盤空間,並減少需要傳送到Reducer 的數據量。默認輸出是不被壓縮的, 但可以很簡單的設置mapred.compress.map.output為true 啟用該功能。壓縮所使用的庫由mapred.map.output.compression.codec來設定 當spill 文件歸並完畢後,Map 將刪除所有的臨時spill 文件,並告知TaskTracker 任務已完成。Reducers 通過HTTP 來獲取對應的數據。用來傳輸partitions 數據的工作線程個數由tasktracker.http.threads 控制,這個設定是針對每一個TaskTracker 的,並不是單個Map,默認值為40,在運行大作業的大集群上可以增大以提升數據傳輸速率。 現在讓我們轉到Shuffle的Reduce部分。Map的輸出文件放置在運行Map任務的TaskTracker的本地磁盤上(註意:Map輸出總是寫到本地磁盤,但是Reduce輸出不是,一般是寫到HDFS),它是運行Reduce任務的TaskTracker所需要的輸入數據。Reduce任務的輸入數據分布在集群內的多個Map任務的輸出中,Map任務可能會在不同的時間內完成,只要有其中一個Map任務完成,Reduce任務就開始拷貝他的輸出。這個階段稱為拷貝階段,Reduce任務擁有多個拷貝線程,可以並行的獲取Map輸出。可以通過設定mapred.reduce.parallel.copies來改變線程數。 Reduce是怎麽知道從哪些TaskTrackers中獲取Map的輸出呢?當Map任務完成之後,會通知他們的父TaskTracker,告知狀態更新,然後TaskTracker再轉告JobTracker,這些通知信息是通過心跳通信機制傳輸的,因此針對以一個特定的作業,jobtracker知道Map輸出與tasktrackers的映射關系。Reducer中有一個線程會間歇的向JobTracker詢問Map輸出的地址,直到把所有的數據都取到。在Reducer取走了Map輸出之後,TaskTracker不會立即刪除這些數據,因為Reducer可能會失敗,他們會在整個作業完成之後,JobTracker告知他們要刪除的時候才去刪除。 如果Map輸出足夠小,他們會被拷貝到Reduce TaskTracker的內存中(緩沖區的大小由mapred.job.shuffle.input.buffer.percnet控制),或者達到了Map輸出的閥值的大小(由mapred.inmem.merge.threshold控制),緩沖區中的數據將會被歸並然後spill到磁盤。 拷貝來的數據疊加在磁盤上,有一個後臺線程會將它們歸並為更大的排序文件,這樣做節省了後期歸並的時間。對於經過壓縮的Map 輸出,系統會自動把它們解壓到內存方便對其執行歸並。 當所有的Map 輸出都被拷貝後,Reduce 任務進入排序階段(更恰當的說應該是歸並階段,因為排序在Map 端就已經完成),這個階段會對所有的Map 輸出進行歸並排序,這個工作會重復多次才能完成。 假設這裏有50 個Map 輸出(可能有保存在內存中的),並且歸並因子是10(由io.sort.factor控制,就像Map 端的merge 一樣),那最終需要5 次歸並。每次歸並會把10個文件歸並為一個,最終生成5 個中間文件。在這一步之後,系統不再把5 個中間文件歸並成一個,而是排序後直接“餵”給Reduce 函數,省去向磁盤寫數據這一步。最終歸並的數據可以是混合數據,既有內存上的也有磁盤上的。由於歸並的目的是歸並最少的文件數目,使得在最後一次歸並時總文件個數達到歸並因子的數目,所以每次操作所涉及的文件個數在實際中會更微妙些。譬如,如果有40 個文件,並不是每次都歸並10 個最終得到4 個文件,相反第一次只歸並4 個文件,然後再實現三次歸並,每次10 個,最終得到4 個歸並好的文件和6 個未歸並的文件。要註意,這種做法並沒有改變歸並的次數,只是最小化寫入磁盤的數據優化措施,因為最後一次歸並的數據總是直接送到Reduce 函數那裏。在Reduce 階段,Reduce 函數會作用在排序輸出的每一個key 上。這個階段的輸出被直接寫到輸出文件系統,一般是HDFS。在HDFS 中,因為TaskTracker 節點也運行著一個DataNode 進程,所以第一個塊備份會直接寫到本地磁盤。到此,MapReduce 的Shuffle 和Sort 分析完畢。shuffle和sort分析