1. 程式人生 > >Shuffle調優之合併map端輸出檔案(三)

Shuffle調優之合併map端輸出檔案(三)

什麼樣的情況下,會發生shuffle? 在spark中,觸發Action運算元就會發生shuffle,主要是以下幾個運算元:groupByKey、reduceByKey、countByKey、join等等。 什麼是shuffle? groupByKey,要把分佈在叢集各個節點上的資料中的同一個key,對應的values,都給集中到一塊兒,集中到叢集中同一個節點上,更嚴密一點說,就是集中到一個節點的一個executor的一個task中。然後呢,集中一個key對應的values之後,才能交給我們來進行處理,<key, Iterable>;reduceByKey,運算元函式去對values集合進行reduce操作,最後變成一個value; countByKey,需要在一個task中,獲取到一個key對應的所有的value,然後進行計數,統計總共有多少個value; join,RDD<key, value>,RDD<key, value>,只要是兩個RDD中,key相同對應的2個value,都能到一個節點的executor的task中,給我們進行處理。 shuffle具體怎麼工作的呢? 在某個action觸發job的時候,DAGScheduler,會負責劃分job為多個stage。劃分的依據就是從後往前回溯,遇到窄依賴加入本stage,遇見寬依賴進行Stage切分,如果發現有會觸發shuffle操作的運算元,比如reduceByKey,就將這個操作的前半部分,以及之前所有的RDD和transformation操作,劃分為一個stage;shuffle操作的後半部分,以及後面的,直到action為止的RDD和transformation操作,劃分為另外一個stage。 每一個shuffle的前半部分stage的task,每個task都會建立下一個stage的task數量相同的檔案,比如下一個stage會有100個task,那麼當前stage每個task都會建立100份檔案;會將同一個key對應的values,一定是寫入同一個檔案中的;不同節點上的task,也一定會將同一個key對應的values,寫入下一個stage,同一個task對應的檔案中。 shuffle的後半部分stage的task,每個task都會從各個節點上的task寫的屬於自己的那一份檔案中,拉取key, value對;然後task會有一個記憶體緩衝區,然後會用HashMap,進行key, values的匯聚;(key ,values);

下面附上spark任務排程流程圖: 在這裡插入圖片描述

task會用我們自己定義的聚合函式,比如reduceByKey(+),把所有values進行一對一的累加;聚合出來最終的值。就完成了shuffle。 shuffle前半部分的task在寫入資料到磁碟檔案之前,都會先寫入一個一個的記憶體緩衝,記憶體緩衝滿溢之後,再spill溢寫到磁碟檔案中。 問題來了:預設的這種shuffle行為,對效能有什麼樣的惡劣影響呢? 比如:100個節點(每個節點一個executor):100個executor 每個executor:2個cpu core 總共1000個task:每個executor平均10個task 每個節點,10個task,每個節點會輸出多少份map端檔案?10 * 1000=1萬個檔案 總共有多少份map端輸出檔案?100 * 10000 = 100萬。 第一個stage,每個task,都會給第二個stage的每個task建立一份map端的輸出檔案 第二個stage,每個task,會到各個節點上面去,拉取第一個stage每個task輸出的,屬於自己的那一份檔案。 shuffle中的寫磁碟的操作,基本上就是shuffle中效能消耗最為嚴重的部分。 通過上面的分析,一個普通的生產環境的spark job的一個shuffle環節,會寫入磁碟100萬個檔案。 磁碟IO對效能和spark作業執行速度的影響,是極其驚人和嚇人的。 基本上,spark作業的效能,都消耗在shuffle中了,雖然不只是shuffle的map端輸出檔案這一個部分,但是這裡也是非常大的一個性能消耗點。

優化方法: new SparkConf().set(“spark.shuffle.consolidateFiles”, “true”) 開啟shuffle map端輸出檔案合併的機制;預設情況下,是不開啟的,就是會發生如上所述的大量map端輸出檔案的操作,嚴重影響效能。

開啟了map端輸出檔案的合併機制之後: 第一個stage,同時就執行cpu core個task,比如cpu core是2個,並行執行2個task;每個task都建立下一個stage的task數量個檔案; 第一個stage,並行執行的2個task執行完以後;就會執行另外兩個task;另外2個task不會再重新建立輸出檔案;而是複用之前的task建立的map端輸出檔案,將資料寫入上一批task的輸出檔案中。 第二個stage,task在拉取資料的時候,就不會去拉取上一個stage每一個task為自己建立的那份輸出檔案了;而是拉取少量的輸出檔案,每個輸出檔案中,可能包含了多個task給自己的map端輸出。

另外需要注意的(map端輸出檔案合併):

只有並行執行的task會去建立新的輸出檔案;下一批並行執行的task,就會去複用之前已有的輸出檔案;但是有一個例外,比如2個task並行在執行,但是此時又啟動要執行2個task;那麼這個時候的話,就無法去複用剛才的2個task建立的輸出檔案了;而是還是隻能去建立新的輸出檔案。 要實現輸出檔案的合併的效果,必須是一批task先執行,然後下一批task再執行,才能複用之前的輸出檔案;否則 多批task同時起來執行,還是做不到複用的。 開啟了map端輸出檔案合併機制之後,生產環境上的例子,會有什麼樣的變化? 100個節點(每個節點一個executor):100個executor 每個executor:2個cpu core 總共1000個task:每個executor平均10個task 每個節點,2個cpu core,有多少份輸出檔案呢?2 * 1000 = 2000個 總共100個節點,總共建立多少份輸出檔案呢?100 * 2000 = 20萬個檔案

相比較開啟合併機制之前的情況,100萬個

map端輸出檔案,在生產環境中,立減5倍!

合併map端輸出檔案,對咱們的spark的效能有哪些方面的影響呢? 1、map task寫入磁碟檔案的IO,減少:100萬檔案 -> 20萬檔案 2、第二個stage,原本要拉取第一個stage的task數量份檔案,1000個task,第二個stage的每個task,都要拉取1000份檔案,走網路傳輸;合併以後,100個節點,每個節點2個cpu core,第二個stage的每個task,主要拉取100 * 2 = 200個檔案即可;網路傳輸的效能消耗是不是也大大減少

大家不要小看這個map端輸出檔案合併機制。實際上,在資料量比較大,你自己本身做了前面的效能調優,executor上去->cpu core上去->並行度(task數量)上去,shuffle沒調優,shuffle就很糟糕了;大量的map端輸出檔案的產生。對效能有比較惡劣的影響。 這個時候,去開啟這個機制,可以很有效的提升效能。