MapReduce:詳解Shuffle過程
* author : 冶秀剛
* mail : [email protected]
*/
Shuffle過程是MapReduce的核心,也被稱為奇蹟發生的地方。要想理解MapReduce, Shuffle是必須要了解的。我看過很多相關的資料,但每次看完都雲裡霧裡的繞著,很難理清大致的邏輯,反而越攪越混。前段時間在做MapReduce job 效能調優的工作,需要深入程式碼研究MapReduce的執行機制,這才對Shuffle探了個究竟。考慮到之前我在看相關資料而看不懂時很惱火,所以在這裡我盡最大的可能試著把Shuffle說清楚,讓每一位想了解它原理的朋友都能有所收穫。如果你對這篇文章有任何疑問或建議請留言到後面,謝謝!
Shuffle的正常意思是洗牌或弄亂,可能大家更熟悉的是Java API裡的Collections.shuffle(List)方法,它會隨機地打亂引數list裡的元素順序。如果你不知道MapReduce裡Shuffle是什麼,那麼請看這張圖:
這張是官方對Shuffle過程的描述。但我可以肯定的是,單從這張圖你基本不可能明白Shuffle的過程,因為它與事實相差挺多,細節也是錯亂的。後面我會具體描述Shuffle的事實情況,所以這裡你只要清楚Shuffle的大致範圍就成-怎樣把map task的輸出結果有效地傳送到reduce端。也可以這樣理解, Shuffle描述著資料從map task輸出到reduce task輸入的這段過程。
在Hadoop這樣的叢集環境中,大部分map task與reduce task的執行是在不同的節點上。當然很多情況下Reduce執行時需要跨節點去拉取其它節點上的map task結果。如果叢集正在執行的job有很多,那麼task的正常執行對叢集內部的網路資源消耗會很嚴重。這種網路消耗是正常的,我們不能限制,能做的就是最大化地減少不必要的消耗。還有在節點內,相比於記憶體,磁碟IO對job完成時間的影響也是可觀的。從最基本的要求來說,我們對Shuffle過程的期望可以有:
- 完整地從map task端拉取資料到reduce 端。
- 在跨節點拉取資料時,儘可能地減少對頻寬的不必要消耗。
- 減少磁碟IO對task執行的影響。
OK,看到這裡時,大家可以先停下來想想,如果是自己來設計這段Shuffle過程,那麼你的設計目標是什麼。我想能優化的地方主要在於減少拉取資料的量及儘量使用記憶體而不是磁碟。
我的分析是基於Hadoop0.21.0的原始碼,如果與你所認識的Shuffle過程有差別,不吝指出。我會以WordCount為例,並假設它有8個map task和3個reduce task。從上圖看出,Shuffle過程橫跨map與reduce兩端,所以下面我也會分兩部分來展開。
先看看map端的情況,如下圖:
上圖可能是某個map task的執行情況。拿它與官方圖的左半邊比較,會發現很多不一致。官方圖沒有清楚地說明partition, sort與combiner到底作用在哪個階段。我畫了這張圖,希望讓大家清晰地瞭解從map資料輸入到map端所有資料準備好的全過程。
整個流程我分了四步。簡單些可以這樣說,每個map task都有一個記憶體緩衝區,儲存著map的輸出結果,當緩衝區快滿的時候需要將緩衝區的資料以一個臨時檔案的方式存放到磁碟,當整個map task結束後再對磁碟中這個map task產生的所有臨時檔案做合併,生成最終的正式輸出檔案,然後等待reduce task來拉資料。
當然這裡的每一步都可能包含著多個步驟與細節,下面我對細節來一一說明:
1. 在map task執行時,它的輸入資料來源於HDFS的block,當然在MapReduce概念中,map task只讀取split。Split與block的對應關係可能是多對一,預設是一對一。在WordCount例子裡,假設map的輸入資料都是像“aaa”這樣的字串。
2. 在經過mapper的執行後,我們得知mapper的輸出是這樣一個key/value對: key是“aaa”, value是數值1。因為當前map端只做加1的操作,在reduce task裡才去合併結果集。前面我們知道這個job有3個reduce task,到底當前的“aaa”應該交由哪個reduce去做呢,是需要現在決定的。
MapReduce提供Partitioner介面,它的作用就是根據key或value及reduce的數量來決定當前的這對輸出資料最終應該交由哪個reduce task處理。預設對key hash後再以reduce task數量取模。預設的取模方式只是為了平均reduce的處理能力,如果使用者自己對Partitioner有需求,可以訂製並設定到job上。
在我們的例子中,“aaa”經過Partitioner後返回0,也就是這對值應當交由第一個reducer來處理。接下來,需要將資料寫入記憶體緩衝區中,緩衝區的作用是批量收集map結果,減少磁碟IO的影響。我們的key/value對以及Partition的結果都會被寫入緩衝區。當然寫入之前,key與value值都會被序列化成位元組陣列。
整個記憶體緩衝區就是一個位元組陣列,它的位元組索引及key/value儲存結構我沒有研究過。如果有朋友對它有研究,那麼請大致描述下它的細節吧。
3. 這個記憶體緩衝區是有大小限制的,預設是100MB。當map task的輸出結果很多時,就可能會撐爆記憶體,所以需要在一定條件下將緩衝區中的資料臨時寫入磁碟,然後重新利用這塊緩衝區。這個從記憶體往磁碟寫資料的過程被稱為Spill,中文可譯為溢寫,字面意思很直觀。這個溢寫是由單獨執行緒來完成,不影響往緩衝區寫map結果的執行緒。溢寫執行緒啟動時不應該阻止map的結果輸出,所以整個緩衝區有個溢寫的比例spill.percent。這個比例預設是0.8,也就是當緩衝區的資料已經達到閾值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢寫執行緒啟動,鎖定這80MB的記憶體,執行溢寫過程。Map task的輸出結果還可以往剩下的20MB記憶體中寫,互不影響。
當溢寫執行緒啟動後,需要對這80MB空間內的key做排序(Sort)。排序是MapReduce模型預設的行為,這裡的排序也是對序列化的位元組做的排序。
在這裡我們可以想想,因為map task的輸出是需要傳送到不同的reduce端去,而記憶體緩衝區沒有對將傳送到相同reduce端的資料做合併,那麼這種合併應該是體現是磁碟檔案中的。從官方圖上也可以看到寫到磁碟中的溢寫檔案是對不同的reduce端的數值做過合併。所以溢寫過程一個很重要的細節在於,如果有很多個key/value對需要傳送到某個reduce端去,那麼需要將這些key/value值拼接到一塊,減少與partition相關的索引記錄。
在針對每個reduce端而合併資料時,有些資料可能像這樣:“aaa”/1, “aaa”/1。對於WordCount例子,就是簡單地統計單詞出現的次數,如果在同一個map task的結果中有很多個像“aaa”一樣出現多次的key,我們就應該把它們的值合併到一塊,這個過程叫reduce也叫combine。但MapReduce的術語中,reduce只指reduce端執行從多個map task取資料做計算的過程。除reduce外,非正式地合併資料只能算做combine了。其實大家知道的,MapReduce中將Combiner等同於Reducer。
如果client設定過Combiner,那麼現在就是使用Combiner的時候了。將有相同key的key/value對的value加起來,減少溢寫到磁碟的資料量。Combiner會優化MapReduce的中間結果,所以它在整個模型中會多次使用。那哪些場景才能使用Combiner呢?從這裡分析,Combiner的輸出是Reducer的輸入,Combiner絕不能改變最終的計算結果。所以從我的想法來看,Combiner只應該用於那種Reduce的輸入key/value與輸出key/value型別完全一致,且不影響最終結果的場景。比如累加,最大值等。Combiner的使用一定得慎重,如果用好,它對job執行效率有幫助,反之會影響reduce的最終結果。
4. 每次溢寫會在磁碟上生成一個溢寫檔案,如果map的輸出結果真的很大,有多次這樣的溢寫發生,磁碟上相應的就會有多個溢寫檔案存在。當map task真正完成時,記憶體緩衝區中的資料也全部溢寫到磁碟中形成一個溢寫檔案。最終磁碟中會至少有一個這樣的溢寫檔案存在(如果map的輸出結果很少,當map執行完成時,只會產生一個溢寫檔案),因為最終的檔案只有一個,所以需要將這些溢寫檔案歸併到一起,這個過程就叫做Merge。Merge是怎樣的?如前面的例子,“aaa”從某個map task讀取過來時值是5,從另外一個map 讀取時值是8,因為它們有相同的key,所以得merge成group。什麼是group。對於“aaa”就是像這樣的:{“aaa”, [5, 8, 2, …]},陣列中的值就是從不同溢寫檔案中讀取出來的,然後再把這些值加起來。請注意,因為merge是將多個溢寫檔案合併到一個檔案,所以可能也有相同的key存在,在這個過程中如果client設定過Combiner,也會使用Combiner來合併相同的key。
至此,map端的所有工作都已結束,最終生成的這個檔案也存放在TaskTracker夠得著的某個本地目錄內。每個reduce task不斷地通過RPC從JobTracker那裡獲取map task是否完成的資訊,如果reduce task得到通知,獲知某臺TaskTracker上的map task執行完成,Shuffle的後半段過程開始啟動。
簡單地說,reduce task在執行之前的工作就是不斷地拉取當前job裡每個map task的最終結果,然後對從不同地方拉取過來的資料不斷地做merge,也最終形成一個檔案作為reduce task的輸入檔案。見下圖:
如map 端的細節圖,Shuffle在reduce端的過程也能用圖上標明的三點來概括。當前reduce copy資料的前提是它要從JobTracker獲得有哪些map task已執行結束,這段過程不表,有興趣的朋友可以關注下。Reducer真正執行之前,所有的時間都是在拉取資料,做merge,且不斷重複地在做。如前面的方式一樣,下面我也分段地描述reduce 端的Shuffle細節:
1. Copy過程,簡單地拉取資料。Reduce程序啟動一些資料copy執行緒(Fetcher),通過HTTP方式請求map task所在的TaskTracker獲取map task的輸出檔案。因為map task早已結束,這些檔案就歸TaskTracker管理在本地磁碟中。
2. Merge階段。這裡的merge如map端的merge動作,只是陣列中存放的是不同map端copy來的數值。Copy過來的資料會先放入記憶體緩衝區中,這裡的緩衝區大小要比map端的更為靈活,它基於JVM的heap size設定,因為Shuffle階段Reducer不執行,所以應該把絕大部分的記憶體都給Shuffle用。這裡需要強調的是,merge有三種形式:1)記憶體到記憶體 2)記憶體到磁碟 3)磁碟到磁碟。預設情況下第一種形式不啟用,讓人比較困惑,是吧。當記憶體中的資料量到達一定閾值,就啟動記憶體到磁碟的merge。與map 端類似,這也是溢寫的過程,這個過程中如果你設定有Combiner,也是會啟用的,然後在磁碟中生成了眾多的溢寫檔案。第二種merge方式一直在執行,直到沒有map端的資料時才結束,然後啟動第三種磁碟到磁碟的merge方式生成最終的那個檔案。
3. Reducer的輸入檔案。不斷地merge後,最後會生成一個“最終檔案”。為什麼加引號?因為這個檔案可能存在於磁碟上,也可能存在於記憶體中。對我們來說,當然希望它存放於記憶體中,直接作為Reducer的輸入,但預設情況下,這個檔案是存放於磁碟中的。至於怎樣才能讓這個檔案出現在記憶體中,之後的效能優化篇我再說。當Reducer的輸入檔案已定,整個Shuffle才最終結束。然後就是Reducer執行,把結果放到HDFS上。
上面就是整個Shuffle的過程。細節很多,我很多都略過了,只試著把要點說明白。當然,我可能也有理解或表述上的很多問題,不吝指點。我希望不斷地完善和修改這篇文章,能讓它通俗、易懂,看完就能知道Shuffle的方方面面。至於具體的實現原理,各位有興趣就自己去探索,如果不方便的話,留言給我,我再來研究並反饋。
相關推薦
MapReduce:詳解Shuffle過程
/** * author : 冶秀剛 * mail : [email protected] */ Shuffle過程是MapReduce的核心,也被稱為奇蹟發生的地方。要想理解MapReduce, Shuffle是必須要了解的。我看
Hadoop1.x: 詳解Shuffle過程---map和reduce資料互動的關鍵
文章來源: Shuffle描述著資料從map task輸出到reduce task輸入的這段過程。 個人理解: map執行的結果會儲存為本地的一個檔案中: 只要map執行 完成,記憶體中的map資料就一定會儲存到本地檔案,儲存這個檔案有個過程 叫做spi
【MapReduce詳解及原始碼解析(一)】——分片輸入、Mapper及Map端Shuffle過程
title: 【MapReduce詳解及原始碼解析(一)】——分片輸入、Mapper及Map端Shuffle過程 date: 2018-12-03 21:12:42 tags: Hadoop categories: 大資料 toc: true 點選檢視我的部落格:Josonlee’
大數據入門第七天——MapReduce詳解
使用 sys distrib sent 作業 asi users tor war 一、概述 1.map-reduce是什麽 Hadoop MapReduce is a software framework for easily writing applica
大數據入門第七天——MapReduce詳解(下)
nbsp targe input pre 切片 入門 技術 log 過程 一、mapTask並行度的決定機制 1.概述 一個job的map階段並行度由客戶端在提交job時決定 而客戶端對map階段並行度的規劃的基本邏輯為: 將待處理數據執行邏輯
大數據入門第八天——MapReduce詳解(三)
大數 blog eve 分享圖片 shuf open src hid span 1/mr的combiner 2/mr的排序 3/mr的shuffle 4/mr與yarn 5/mr運行模式 6/mr實現join 7/mr全局圖
MapReduce 詳解
宣告: https://blog.csdn.net/shujuelin/article/details/79119214 上面這篇部落格真的寫得很好,可以點選進去看 MapReduce是一個分散式運算程式的程式設計框架,是使用者開發"基於hadoop的資料分析應用" MapReduce
hadoop2-MapReduce詳解
本文是對Hadoop2.2.0版本的MapReduce進行詳細講解。請大家要注意版本,因為Hadoop的不同版本,原始碼可能是不同的。 以下是本文的大綱: 1.獲取原始碼2.WordCount案例分析3.客戶端原始碼分析4.小結5.Mapper詳解 5.1.map輸入 5.2.map輸出 5
MapReduce詳解!詳解!詳解!
理解 MapReduce 執行過程 以統計檔案中 單詞出現的個數為例 一共三個檔案 1.以整個檔案的角度進行圖解 ( 每個方塊就是一個檔案) 2.根據程式碼進行圖解 放上程式碼,僅供參考 WCMapper.java public class WCM
Hadoop詳解 ----------- shuffle原理、partitioner分割槽原理、Combiner程式設計、常見的MR演算法
Partitioner程式設計Partition簡介shuffle是通過分割槽partitioner 分配給Reduce的 一個Reducer對應一個記錄檔案Partitioner是shuffle的一部
Cloudera Manager 5.12.0圖文詳解安裝過程
這裡介紹的是cdh5的離線安裝方式,需要的檔案提前準備好,安裝過程會快一些。 安裝前提:機器配置記憶體一定要高,我這裡安裝的虛擬機器均是redhat7:記憶體分別是6G,4G,4G。 準備的檔案: mysql-connector-java-5.1.38.jar clou
MapReduce詳解
MapReduce簡介 MapReduce是一種分散式計算模型,是Google提出的,主要用於搜尋領域,解決海量資料的計算問題。 MR有兩個階段組成:Map和Reduce,使用者只需實現map()和reduce()兩個函式,即可實現分散式計算。 MapReduce做什麼
詳解stacking過程
之前一直對stacking一知半解,找到的資料也介紹的很模糊。。所以有多看了幾篇文章,然後來此寫篇部落格,加深一下印象,順便給各位朋友分享一下。 stacking的過程有一張圖非常經典,如下: 雖然他很直觀,但是沒有語言描述確實很難搞懂。 上半部分是用一個基礎模型進
阿里雲系列——7.阿里雲IIS系列詳解(過程+通用+最新)
先講IIS系列,Linux部署以後再繼續講 先開啟主機管理平臺,確認域名繫結 2.net版本設定一下 3.資料庫建立(兩種方法:1,平臺,2,SQLServer遠端連線) 使用者名稱之類的看主機管理頁面的顯示 4.網站釋出後上傳ftp伺服器 5
Hadoop核心元件—MapReduce詳解
Hadoop 分散式計算框架(MapReduce)。 MapReduce設計理念: - 分散式計算 - 移動計算,而不是移動資料 MapReduce計算框架 步驟1:split split切分Blo
hadoop之mapreduce詳解(基礎篇)
本篇文章主要從mapreduce執行作業的過程,shuffle,以及mapreduce作業失敗的容錯幾個方面進行詳解。 一、mapreduce作業執行過程 1.1、mapreduce介紹 MapReduce是一種程式設計模型,用於大規模資料集(大於1TB)的並行運
hadoop之mapreduce詳解(進階篇)
上篇文章hadoop之mapreduce詳解(基礎篇)我們瞭解了mapreduce的執行過程和shuffle過程,本篇文章主要從mapreduce的元件和輸入輸出方面進行闡述。 一、mapreduce作業控制模組以及其他功能 mapreduce包括作業控制模組,程式設計模型,資料處理引擎。這裡我們重點闡述
hadoop之mapreduce詳解(優化篇)
一、概述 優化前我們需要知道hadoop適合幹什麼活,適合什麼場景,在工作中,我們要知道業務是怎樣的,能才結合平臺資源達到最有優化。除了這些我們當然還要知道mapreduce的執行過程,比如從檔案的讀取,map處理,shuffle過程,reduce處理,檔案的輸出或者
MapReduce-shuffle過程詳解
等待 通知 10個 線程數 硬盤 res .sh 現在 溢出 Shuffle map端 map函數開始產生輸出時,並不是簡單地將它寫到磁盤。這個過程很復雜,它利用緩沖的方式寫到內存並出於效率的考慮進行預排序。每個map任務都有一個環形內存緩沖區用於存儲任務輸出。在默認
MapReduce和spark的shuffle過程詳解
存在 位置 方式 傳遞 第一個 2個 過濾 之前 第三方 面試常見問題,必備答案。 參考:https://blog.csdn.net/u010697988/article/details/70173104 mapReducehe和Spark之間的最大區別是前者較偏向於離