1. 程式人生 > >MapReduce Shuffle 和 Spark Shuffle 原理概述

MapReduce Shuffle 和 Spark Shuffle 原理概述

Shuffle簡介

Shuffle的本意是洗牌、混洗的意思,把一組有規則的資料儘量打亂成無規則的資料。而在MapReduce中,Shuffle更像是洗牌的逆過程,指的是將map端的無規則輸出按指定的規則“打亂”成具有一定規則的資料,以便reduce端接收處理。其在MapReduce中所處的工作階段是map輸出後到reduce接收前,具體可以分為map端和reduce端前後兩個部分。

在shuffle之前,也就是在map階段,MapReduce會對要處理的資料進行分片(split)操作,為每一個分片分配一個MapTask任務。接下來map會對每一個分片中的每一行資料進行處理得到鍵值對(key,value)此時得到的鍵值對又叫做“中間結果”。此後便進入reduce階段,由此可以看出Shuffle階段的作用是處理“中間結果”。

由於Shuffle涉及到了磁碟的讀寫和網路的傳輸,因此Shuffle效能的高低直接影響到了整個程式的執行效率。

MapReduce Shuffle

Hadoop的核心思想是MapReduce,但shuffle又是MapReduce的核心。shuffle的主要工作是從Map結束到Reduce開始之間的過程。shuffle階段又可以分為Map端的shuffle和Reduce端的shuffle。

Map端的shuffle

下圖是MapReduce Shuffle的官方流程:

因為頻繁的磁碟I/O操作會嚴重的降低效率,因此“中間結果”不會立馬寫入磁碟,而是優先儲存到map節點的“環形記憶體緩衝區”,在寫入的過程中進行分割槽(partition),也就是對於每個鍵值對來說,都增加了一個partition屬性值,然後連同鍵值對一起序列化成位元組陣列寫入到緩衝區(緩衝區採用的就是位元組陣列,預設大小為100M)。

當寫入的資料量達到預先設定的闕值後便會啟動溢寫出執行緒將緩衝區中的那部分資料溢位寫(spill)到磁碟的臨時檔案中,並在寫入前根據key進行排序(sort)和合並(combine,可選操作)。

溢位寫過程按輪詢方式將緩衝區中的內容寫到mapreduce.cluster.local.dir屬性指定的本地目錄中。當整個map任務完成溢位寫後,會對磁碟中這個map任務產生的所有臨時檔案(spill檔案)進行歸併(merge)操作生成最終的正式輸出檔案,此時的歸併是將所有spill檔案中的相同partition合併到一起,並對各個partition中的資料再進行一次排序(sort),生成key和對應的value-list,檔案歸併時,如果溢寫檔案數量超過引數min.num.spills.for.combine的值(預設為3)時,可以再次進行合併。

至此map端的工作已經全部結束,最終生成的檔案也會儲存在TaskTracker能夠訪問的位置。每個reduce task不間斷的通過RPC從JobTracker那裡獲取map task是否完成的資訊,如果得到的資訊是map task已經完成,那麼Shuffle的後半段開始啟動。

Reduce端的shuffle

當mapreduce任務提交後,reduce task就不斷通過RPC從JobTracker那裡獲取map task是否完成的資訊,如果獲知某臺TaskTracker上的map task執行完成,Shuffle的後半段過程就開始啟動。Reduce端的shuffle主要包括三個階段,copy、merge和reduce。

每個reduce task負責處理一個分割槽的檔案,以下是reduce task的處理流程:

  1. reduce task從每個map task的結果檔案中拉取對應分割槽的資料。因為資料在map階段已經是分好區了,並且會有一個額外的索引檔案記錄每個分割槽的起始偏移量。所以reduce task取數的時候直接根據偏移量去拉取資料就ok。

  2. reduce task從每個map task拉取分割槽資料的時候會進行再次合併,排序,按照自定義的reducer的邏輯程式碼去處理。

  3. 最後就是Reduce過程了,在這個過程中產生了最終的輸出結果,並將其寫到HDFS上。

為什麼要排序

  1. key存在combine操作,排序之後相同的key放到一塊顯然方便做合併操作。

  2. reduce task是按key去處理資料的。 如果沒有排序那必須從所有資料中把當前相同key的所有value資料拿出來,然後進行reduce邏輯處理。顯然每個key到這個邏輯都需要做一次全量資料掃描,影響效能,有了排序很方便的得到一個key對於的value集合。

  3. reduce task按key去處理資料時,如果key按順序排序,那麼reduce task就按key順序去讀取,顯然當讀到的key是檔案末尾的key那麼就標誌資料處理完畢。如果沒有排序那還得有其他邏輯來記錄哪些key處理完了,哪些key沒有處理完。

雖有千萬種理由需要這麼做,但是很耗資源,並且像排序其實我們有些業務並不需要排序。

為什麼要檔案合併

  1. 因為記憶體放不下就會溢寫檔案,就會發生多次溢寫,形成很多小檔案,如果不合並,顯然會小檔案氾濫,叢集需要資源開銷去管理這些小檔案資料。

  2. 任務去讀取檔案的數增多,開啟的檔案控制代碼數也會增多。

  3. mapreduce是全域性有序。單個檔案有序,不代表全域性有序,只有把小檔案合併一起排序才會全域性有序。

    Spark的Shuffle

    Spark的Shuffle是在MapReduce Shuffle基礎上進行的調優。其實就是對排序、合併邏輯做了一些優化。在Spark中Shuffle write相當於MapReduce 的map,Shuffle read相當於MapReduce 的reduce。

Spark豐富了任務型別,有些任務之間資料流轉不需要通過Shuffle,但是有些任務之間還是需要通過Shuffle來傳遞資料,比如寬依賴的group by key以及各種by key運算元。寬依賴之間會劃分stage,而Stage之間就是Shuffle,如下圖中的stage0,stage1和stage3之間就會產生Shuffle。

在Spark的中,負責shuffle過程的執行、計算和處理的元件主要就是ShuffleManager,也即shuffle管理器。ShuffleManager隨著Spark的發展有兩種實現的方式,分別為HashShuffleManager和SortShuffleManager,因此spark的Shuffle有Hash Shuffle和Sort Shuffle兩種。

Spark Shuffle發展史
Spark 0.8及以前 Hash Based Shuffle
Spark 0.8.1 為Hash Based Shuffle引入File Consolidation機制
Spark 0.9 引入ExternalAppendOnlyMap
Spark 1.1 引入Sort Based Shuffle,但預設仍為Hash Based Shuffle
Spark 1.2 預設的Shuffle方式改為Sort Based Shuffle
Spark 1.4 引入Tungsten-Sort Based Shuffle
Spark 1.6 Tungsten-sort併入Sort Based Shuffle
Spark 2.0 Hash Based Shuffle退出歷史舞臺

在Spark的版本的發展,ShuffleManager在不斷迭代,變得越來越先進。
在Spark 1.2以前,預設的shuffle計算引擎是HashShuffleManager。該ShuffleManager而HashShuffleManager有著一個非常嚴重的弊端,就是會產生大量的中間磁碟檔案,進而由大量的磁碟IO操作影響了效能。因此在Spark 1.2以後的版本中,預設的ShuffleManager改成了SortShuffleManager。

SortShuffleManager相較於HashShuffleManager來說,有了一定的改進。主要就在於,每個Task在進行shuffle操作時,雖然也會產生較多的臨時磁碟檔案,但是最後會將所有的臨時檔案合併(merge)成一個磁碟檔案,因此每個Task就只有一個磁碟檔案。在下一個stage的shuffle read task拉取自己的資料時,只要根據索引讀取每個磁碟檔案中的部分資料即可。

Hash Shuffle

HashShuffleManager的執行機制主要分成兩種,一種是普通執行機制,另一種是合併的執行機制。合併機制主要是通過複用buffer來優化Shuffle過程中產生的小檔案的數量。Hash shuffle是不具有排序的Shuffle。

普通機制的Hash Shuffle

最開始使用的Hash Based Shuffle,每個Mapper會根據Reducer的數量建立對應的bucket,bucket的數量是M * R,M是map的數量,R是Reduce的數量。
如下圖所示:2個core 4個map task 3 個reduce task,會產生4*3=12個小檔案。

優化後的Hash Shuffle

普通機制Hash Shuffle會產生大量的小檔案(M * R),對檔案系統的壓力也很大,也不利於IO的吞吐量,後來做了優化(設定spark.shuffle.consolidateFiles=true開啟,預設false),把在同一個core上的多個Mapper輸出到同一個檔案,這樣檔案數就變成core * R 個了。
如下圖所示:2個core 4個map task 3 個reduce task,會產生2*3=6個小檔案。

Hash shuffle合併機制的問題:
如果 Reducer 端的並行任務或者是資料分片過多的話則 Core * Reducer Task 依舊過大,也會產生很多小檔案。進而引出了更優化的sort shuffle。
在Spark 1.2以後的版本中,預設的ShuffleManager改成了SortShuffleManager。

Sort Shuffle

SortShuffleManager的執行機制主要分成兩種,一種是普通執行機制,另一種是bypass執行機制。當shuffle read task的數量小於等於spark.shuffle.sort.bypassMergeThreshold引數的值時(預設為200),就會啟用bypass機制。

普通機制的Sort Shuffle

這種機制和mapreduce差不多,在該模式下,資料會先寫入一個記憶體資料結構中,此時根據不同的shuffle運算元,可能選用不同的資料結構。如果是reduceByKey這種聚合類的shuffle運算元,那麼會選用Map資料結構,一邊通過Map進行聚合,一邊寫入記憶體;如果是join這種普通的shuffle運算元,那麼會選用Array資料結構,直接寫入記憶體。接著,每寫一條資料進入記憶體資料結構之後,就會判斷一下,是否達到了某個臨界閾值。如果達到臨界閾值的話,那麼就會嘗試將記憶體資料結構中的資料溢寫到磁碟,然後清空記憶體資料結構。

在溢寫到磁碟檔案之前,會先根據key對記憶體資料結構中已有的資料進行排序。排序過後,會分批將資料寫入磁碟檔案。預設的batch數量是10000條,也就是說,排序好的資料,會以每批1萬條資料的形式分批寫入磁碟檔案。
一個task將所有資料寫入記憶體資料結構的過程中,會發生多次磁碟溢寫操作,也會產生多個臨時檔案。最後會將之前所有的臨時磁碟檔案都進行合併,由於一個task就只對應一個磁碟檔案因此還會單獨寫一份索引檔案,其中標識了下游各個task的資料在檔案中的start offset與end offset。
SortShuffleManager由於有一個磁碟檔案merge的過程,因此大大減少了檔案數量,由於每個task最終只有一個磁碟檔案所以檔案個數等於上游shuffle write個數。

bypass機制的Sort Shuffle

bypass執行機制的觸發條件如下:
1)shuffle map task數量小於spark.shuffle.sort.bypassMergeThreshold引數的值,預設值200。
2)不是聚合類的shuffle運算元(比如reduceByKey)。

此時task會為每個reduce端的task都建立一個臨時磁碟檔案,並將資料按key進行hash然後根據key的hash值,將key寫入對應的磁碟檔案之中。當然,寫入磁碟檔案時也是先寫入記憶體緩衝,緩衝寫滿之後再溢寫到磁碟檔案的。最後,同樣會將所有臨時磁碟檔案都合併成一個磁碟檔案,並建立一個單獨的索引檔案。

該過程的磁碟寫機制其實跟未經優化的HashShuffleManager是一模一樣的,因為都要建立數量驚人的磁碟檔案,只是在最後會做一個磁碟檔案的合併而已。因此少量的最終磁碟檔案,也讓該機制相對未經優化的HashShuffleManager來說,shuffle read的效能會更好。

而該機制與普通SortShuffleManager執行機制的不同在於:
第一,磁碟寫機制不同;
第二,不會進行排序。也就是說,啟用該機制的最大好處在於,shuffle write過程中,不需要進行資料的排序操作,也就節省掉了這部分的效能開銷。

Spark Shuffle總結

Shuffle 過程本質上都是將 Map 端獲得的資料使用分割槽器進行劃分,並將資料傳送給對應的 Reducer 的過程。

Shuffle作為處理連線map端和reduce端的樞紐,其shuffle的效能高低直接影響了整個程式的效能和吞吐量。map端的shuffle一般為shuffle的Write階段,reduce端的shuffle一般為shuffle的read階段。Hadoop和spark的shuffle在實現上面存在很大的不同,spark的shuffle分為兩種實現,分別為HashShuffle和SortShuffle。

HashShuffle又分為普通機制和合並機制,普通機制因為其會產生MR個數的巨量磁碟小檔案而產生大量效能低下的Io操作,從而效能較低,因為其巨量的磁碟小檔案還可能導致OOM,HashShuffle的合併機制通過重複利用buffer從而將磁碟小檔案的數量降低到CoreR個,但是當Reducer 端的並行任務或者是資料分片過多的時候,依然會產生大量的磁碟小檔案。

SortShuffle也分為普通機制和bypass機制,普通機制在記憶體資料結構(預設為5M)完成排序,會產生2M個磁碟小檔案。而當shuffle map task數量小於spark.shuffle.sort.bypassMergeThreshold引數的值。或者運算元不是聚合類的shuffle運算元(比如reduceByKey)的時候會觸發SortShuffle的bypass機制,SortShuffle的bypass機制不會進行排序,極大的提高了其效能。

在Spark 1.2以前,預設的shuffle計算引擎是HashShuffleManager,因為HashShuffleManager會產生大量的磁碟小檔案而效能低下,在Spark 1.2以後的版本中,預設的ShuffleManager改成了SortShuffleManager。

SortShuffleManager相較於HashShuffleManager來說,有了一定的改進。主要就在於,每個Task在進行shuffle操作時,雖然也會產生較多的臨時磁碟檔案,但是最後會將所有的臨時檔案合併(merge)成一個磁碟檔案,因此每個Task就只有一個磁碟檔案。在下一個stage的shuffle read task拉取自己的資料時,只要根據索引讀取每個磁碟檔案中的部分資料即可。

Spark與MapReduce Shuffle的異同

  1. 從整體功能上看,兩者並沒有大的差別。 都是將 mapper(Spark 裡是 ShuffleMapTask)的輸出進行 partition,不同的 partition 送到不同的 reducer(Spark 裡 reducer 可能是下一個 stage 裡的 ShuffleMapTask,也可能是 ResultTask)。Reducer 以記憶體作緩衝區,邊 shuffle 邊 aggregate 資料,等到資料 aggregate 好以後進行 reduce(Spark 裡可能是後續的一系列操作)。

  2. 從流程的上看,兩者差別不小。 Hadoop MapReduce 是 sort-based,進入 combine和 reduce的 records 必須先 sort。這樣的好處在於 combine/reduce可以處理大規模的資料,因為其輸入資料可以通過外排得到(mapper 對每段資料先做排序,reducer 的 shuffle 對排好序的每段資料做歸併)。以前 Spark 預設選擇的是 hash-based,通常使用 HashMap 來對 shuffle 來的資料進行合併,不會對資料進行提前排序。如果使用者需要經過排序的資料,那麼需要自己呼叫類似 sortByKey的操作。在Spark 1.2之後,sort-based變為預設的Shuffle實現。

  3. 從流程實現角度來看,兩者也有不少差別。 Hadoop MapReduce 將處理流程劃分出明顯的幾個階段:map, spill, merge, shuffle, sort, reduce等。每個階段各司其職,可以按照過程式的程式設計思想來逐一實現每個階段的功能。在 Spark 中,沒有這樣功能明確的階段,只有不同的 stage 和一系列的 transformation,所以 spill, merge, aggregate 等操作需要蘊含在 transformation中。