1. 程式人生 > >Spark一些基礎原理——Shuffle

Spark一些基礎原理——Shuffle

自學背景知識:Spark基本工作原理Job、Task、Stage、MapReduce

lv0

Shuffle(洗牌)是介於MapReduce框架的中間階段,Map負責實現其寫入,Reduce實現其讀取。大致過程就是將Map在記憶體中的快取進行分割槽、排序、溢位到磁碟的資料進行抓取合併以重新持久化到磁碟與記憶體中,便於執行Reduce任務。(後接lv1-1)
其任務執行過程是在Stage階段判斷是否是寬依賴,存在寬依賴才會進行Shuffle。(後接lv1-2)
而對於帶有寬依賴的RDD執行compute時會讀取上一個Stage輸出的Shuffle資料。(後接lv1-3)

lv1-1

過去Spark用的是基於Hash的,所帶來的問題就是中間檔案過多比較低效,中間檔案數量與MapTask,ReduceTask的Core數正相關。目前Spark是基於sort。即將Mapper階段的Task全部寫到一個Data檔案中,同時生成一個索引檔案。Reduce階段的每一個Task根據索引讀取資料。從而降低隨機磁碟I/O與記憶體開銷。後續又引入了外部排序機制,即將資料放入記憶體改為記錄級寫入,以及動態資源分配等等。
Spark存在過HashBasedShuffle(已不用)、SortedBasedShuffle(預設)、TungstenSortedBasedShuffle(已併入前者由系統自動識別是否需要啟用)三種方式。(後接lv2-1)

lv1-2

判斷寬依賴是否存在,是基於存在ShuffleDependency寬依賴。屆時會將Job劃分成多個Stage,通過劃分Stage關鍵點-構建ShuffleDependency時進行Shuffle註冊,註冊到ShuffleManager-得到ShuffleHandle-ShuffleManager根據ShuffleHandle得到Writer和Reader以實現後續資料讀寫。
而每個Job最終會生成一個ResultStage與若干個ShuffleMapStage。前者是最終結果,後者對應的各ShuffleMapTask需要根據最終資料所對應的分割槽器(Partitioner)對資料進行分組(通過Range進行分割槽而非Hash,通過Key值對分割槽內的資料進行排序),並將資料持久化。

lv1-3

這個過程是需要BlockManger與ShufferManger進行互動,並通過MapOutputTracker進行任務追蹤。通過ShufferManger的getReader方法通過追蹤器獲取上一條資料在BlockManger中的位置以讀取資料(預設的讀取資料塊大小限制為48m,讀取一般優先考慮本地性),通過運算元計算(即先聚合、再排序(如果定義了的話))後再將MapStatus資訊註冊到追蹤器中,資料寫入部分則是通過getWriter獲取資料與buckets的對映關係,將RDD元素拆分到多個buckets中。

lv2-1

HashBasedShuffle是為了解決hadoop中基於Sort的MapReduce中不必要排序所帶來的開銷問題而設計的,但卻引發了更多中間檔案的問題,在2.X版本已無此方式。它將每一個MapTask中的資料在各個Reducer端分切成與Reducer數量相同的檔案(若開啟檔案合併,則會對生成檔案進行合併)
SortedBasedShuffle是根據寬依賴把一系列運算元Pipeline劃分成不同的Stage,每一個Stage內部會進行Pipeline的Stage與Stage之間、以及Stage與Stage之間進行Shuffle操作。(簡單理解為各Map內部洗牌(排序),Map與Reduce之間也要洗牌(彙總排序),這便涉及到了兩次讀寫。而這個Map在這裡是一個記憶體的緩衝區Buffer,超出記憶體範圍會自動落到硬碟,其效能瓶頸往往在於其內部洗牌是否發生在本地,是否有大量的MaperTask)排序方式分為序列化排序與反序列化排序兩種,其中前者是針對Tungsten,後者是針對其他,執行方式是根據分割槽數是否小於配置屬性,若小於則不執行聚合而執行類似於Hash方式的洗牌,若不小於則優先執行前者,執行不了再執行後者。而執行前者,也就是TungstenSortedBasedShuffle的條件是Shuffle依賴中無聚合操作或沒有對輸出進行排序、序列化器支援序列化值的重定位、輸出分割槽數小於16777216個(2的24次方)、單條記錄長度不得超過128MB等。