spark 中劃分stage的思路
窄依賴指父RDD的每一個分區最多被一個子RDD的分區所用,表現為
- 一個父RDD的分區對應於一個子RDD的分區
- 兩個父RDD的分區對應於一個子RDD 的分區。
寬依賴指子RDD的每個分區都要依賴於父RDD的所有分區,這是shuffle類操作
Stage:
一個Job會被拆分為多組Task,每組任務被稱為一個Stage就像Map Stage, Reduce Stage。Stage的劃分,簡單的說是以shuffle和result這兩種類型來劃分。在Spark中有兩類task,一類是shuffleMapTask,一類是resultTask,第一類task的輸出是shuffle所需數據,第二類task的輸出是result,stage的劃分也以此為依據,shuffle之前的所有變換是一個stage,shuffle之後的操作是另一個stage。
比如 rdd.parallize(1 to 10).foreach(println) 這個操作沒有shuffle,直接就輸出了,那麽只有它的task是resultTask,stage也只有一個;
如果是rdd.map(x => (x, 1)).reduceByKey(_ + _).foreach(println), 這個job因為有reduce,所以有一個shuffle過程,那麽reduceByKey之前的是一個stage,執行shuffleMapTask,輸出shuffle所需的數據,reduceByKey到最後是一個stage,直接就輸出結果了。如果job中有多次shuffle,那麽每個shuffle之前都是一個stage.
會根據RDD之間的依賴關系將DAG圖劃分為不同的階段,對於窄依賴,由於partition依賴關系的確定性,partition的轉換處理就可以在同一個線程裏完成,窄依賴就被spark劃分到同一個stage中,而對於寬依賴,只能等父RDD shuffle處理完成後,下一個stage才能開始接下來的計算。之所以稱之為ShuffleMapTask是因為它需要將自己的計算結果通過shuffle到下一個stage中
Stage劃分思路
因此spark劃分stage的整體思路是:從後往前推,遇到寬依賴就斷開,劃分為一個stage;遇到窄依賴就將這個RDD加入該stage中。
在spark中,Task的類型分為2種:ShuffleMapTask和ResultTask;簡單來說,DAG的最後一個階段會為每個結果的partition生成一個ResultTask,即每個Stage裏面的Task的數量是由該Stage中最後一個RDD的Partition的數量所決定的!
而其余所有階段都會生成ShuffleMapTask;之所以稱之為ShuffleMapTask是因為它需要將自己的計算結果通過shuffle到下一個stage中。
總結
map,filter為窄依賴,
groupbykey為款依賴
遇到一個寬依賴就分一個stage
spark 中劃分stage的思路