Spark分析之Dependency
在Spark中,每一個RDD是對於資料集在某一狀態下的表現形式,比如說:map、filter、group by等都算一次操作,這個狀態有可能是從前一狀態轉換而來的;
因此換句話說一個RDD可能與之前的RDD(s)有依賴關係;RDD之間存在依賴關係;
根據依賴關係的不同,可以將RDD分成兩種不同的型別:寬依賴和窄依賴。
窄依賴:一個父RDD的partition至多被子RDD的某個partition使用一次;
寬依賴:一個父RDD的partition會被子RDD的partition使用多次,需要shuffle操作;
圖中方框描述:外面的大方框是一個RDD,裡面的小方塊是RDD中的partition,多個partition組成一個RDD
窄依賴
定義:一個父RDD的partition至多被子RDD的某個partition使用一次; 不需要shuffle,partition範圍不會改變,一個partition經過transform後還是一個partition,雖然內容發生了變化;可以進行pipeline計算,快速完成;
在某個節點上可以一次性全部計算完所有的父partition(pipeline流水式的計算方式):
a.map().filter().reduceByKey() 這樣多步操作一次性計算完畢,而不需要第一步執行完後儲存起來,第二步再去讀取再計算再儲存。。。。。。
窄依賴可以在單節點上完成運算,非常高效。
容錯:某個partition掛了,快速將丟失的partition平行計算出來。
容錯和計算速度都比寬依賴強。
窄依賴又分為兩種:OneToOneDependency:一對一的依賴,一父一子,最典型的是map/filter。
RangeDependency:一定範圍的RDD直接對應,最典型的是Union。
parent RDD的某個分割槽的partitions對應到child RDD中某個區間的partitions;
union:多個parent RDD合併到一個chind RDD,故每個parent RDD都對應到child RDD中的一個區間;
注意:union不會把多個partition合併成一個partition,而是簡單的把多個RDD的partitions放到一個RDD中,partition不會發生變化。
寬依賴
定義:一個父RDD的partition會被子RDD的partition使用多次;只能前面的算好後才能進行後續的計算;只有等到父partition的所有資料都傳輸到各個節點後才能計算(經典的mapreduce場景)
容錯:某個partition掛了,要計算前面所有的父partition,代價很大。
spark是把map部分的資料計算完成後物化到map端的磁碟上,掛了之後直接從磁碟中讀取即可。
class ShuffleDependency[K, V]( @transient rdd: RDD[_ <: Product2[K, V]], val partitioner: Partitioner, val serializerClass: String = null) extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) { val shuffleId: Int = rdd.context.newShuffleId() }
首先:需要基於PairRDD,因為一般需要依據key進行shuffle,所以資料結構往往是key-value;
其次:由於需要shuffle,所以就需要給出partitioner;
然後:shuffle不像map可以在local執行,往往需要網路傳輸或儲存,所以需要serializerClass;
最後:每個shuffle需要分配一個全域性的id,context.newShuffleId()的實現就是把全域性id累加;