SparkShuffle的分類和執行過程的一些總結
什麼是Spark Shuffle?
reduceByKey會將上一個RDD中的每一個key對應的所有value聚合成一個value,然後生成一個新的RDD,元素型別是<key,value>對的形式,這樣每一個key對應一個聚合起來的value。
在聚合之前,相同的key可能在不同的分割槽中,這些分割槽也可能子不同的節點上,RDD是彈性的分散式資料集,RDD的partitiion很可能在不同的節點上
如何聚合
–Shuffle Write:上一個stage的每個map task就必須保證將自己處理的當前分割槽的資料相同的key寫入一個分割槽檔案中,可能會寫入多個不同的分割槽檔案中。
– Shuffle Read:reduce task就會從上一個stage的所有task所在的機器上尋找屬於己的那些分割槽檔案,這樣就可以保證每一個key所對應的value都會匯聚到同一個節點上去處理和聚合。
Spark中有三種Shuffle型別,HashShuffle、SortShuffle、鎢絲SortShuffle,Spark1.2之前是HashShuffle預設的分割槽器是HashPartitioner,Spark1.2引入SortShuffle預設的分割槽器是RangePartitioner,鎢絲SortShuffle是後來引入的,也是預設排序的,是對SortShuffle的優化
HashShuffle的執行過程
1、注意: 這種是沒有加consolidation優化的HashShuffle
執行流程
a) 每一個map task將不同結果寫到不同的buffer中,每個buffer的大小為32K。buffer起到資料快取的作用。
b) 每個buffer檔案最後對應一個磁碟小檔案。
c) reduce task來拉取對應的磁碟小檔案。
總結
① .map task的計算結果會根據分割槽器(預設是hashPartitioner)來決定寫入到哪一個磁碟小檔案中去。ReduceTask會去Map端拉取相應的磁碟小檔案。
② .產生的磁碟小檔案的個數:
M(map task的個數)* R(reducetask的個數)
存在的問題
產生的磁碟小檔案過多,會導致以下問題:
a) 在Shuffle Write過程中會產生很多寫磁碟小檔案的物件。
b) 在Shuffle Read過程中會產生很多讀取磁碟小檔案的物件。
c) 在JVM堆記憶體中物件過多會造成頻繁的gc,gc還無法解決執行所需要的記憶體 的話,就會OOM。
d) 在資料傳輸過程中會有頻繁的網路通訊,頻繁的網路通訊出現通訊故障的可能性大大增加,一旦網路通訊出現了故障會導致shuffle file cannot find 由於這個錯誤導致的task失敗,TaskScheduler不負責重試,由DAGScheduler負責重試Stage。
2、注意:這種是開啟consolidation優化機制的HashShuffle
總結
產生磁碟小檔案的個數:C(core的個數)*R(reduce的個數)
SortShuffle的執行過程
- 普通機制執行流程
a) map task 的計算結果會寫入到一個記憶體資料結構裡面,記憶體資料結構預設是5M
b) 在shuffle的時候會有一個定時器,不定期的去估算這個記憶體結構的大小,當記憶體結構中的資料超過5M時,比如現在記憶體結構中的資料為5.01M,那麼他會申請5.01*2-5=5.02M記憶體給記憶體資料結構。
c) 如果申請成功不會進行溢寫,如果申請不成功,這時候會發生溢寫磁碟。
d) 在溢寫之前記憶體結構中的資料會進行排序分割槽
e) 然後開始溢寫磁碟,寫磁碟是以batch的形式去寫,一個batch是1萬條資料,
f) map task執行完成後,會將這些磁碟小檔案合併成一個大的磁碟檔案,同時生成一個索引檔案。
g) reduce task去map端拉取資料的時候,首先解析索引檔案,根據索引檔案再去拉取對應的資料。
總結
產生磁碟小檔案的個數: 2*M(map task的個數)
2) bypass機制
bypass機制示意圖
總結
① .bypass執行機制的觸發條件如下:
shuffle reduce task的數量小於spark.shuffle.sort.bypassMergeThreshold的引數值。這個值預設是200。
② .產生的磁碟小檔案為:2*M(map task的個數)
Shuffle檔案定址
- MapOutputTracker
MapOutputTracker是Spark架構中的一個模組,是一個主從架構。管理磁碟小檔案的地址。
Ø MapOutputTrackerMaster是主物件,存在於Driver中。
Ø MapOutputTrackerWorker是從物件,存在於Excutor中。
- BlockManager
BlockManager塊管理者,是Spark架構中的一個模組,也是一個主從架構。
Ø BlockManagerMaster,主物件,存在於Driver中。
BlockManagerMaster會在叢集中有用到廣播變數和快取資料或者刪除快取資料的時候,通知BlockManagerSlave傳輸或者刪除資料。
Ø BlockManagerWorker,從物件,存在於Excutor中。
BlockManagerWorker會與BlockManagerWorker之間通訊。
¬ 無論在Driver端的BlockManager還是在Excutor端的BlockManager都含有四個物件:
① DiskStore:負責磁碟的管理。
② MemoryStore:負責記憶體的管理。
③ ConnectionManager:負責連線其他的BlockManagerWorker。
④ BlockTransferService:負責資料的傳輸。
Shuffle檔案定址圖
Shuffle檔案定址流程
a) 當map task執行完成後,會將task的執行情況和磁碟小檔案的地址封裝到MpStatus物件中,通過MapOutputTrackerWorker物件向Driver中的MapOutputTrackerMaster彙報。
b) 在所有的map task執行完畢後,Driver中就掌握了所有的磁碟小檔案的地址。
c) 在reduce task執行之前,會通過Excutor中MapOutPutTrackerWorker向Driver端的MapOutputTrackerMaster獲取磁碟小檔案的地址。
d) 獲取到磁碟小檔案的地址後,會通過BlockManager中的ConnectionManager連線資料所在節點上的ConnectionManager,然後通過BlockTransferService進行資料的傳輸。
e) BlockTransferService預設啟動5個task去節點拉取資料。預設情況下,5個task拉取資料量不能超過48M。