1. 程式人生 > >Spark分析之Dependency

Spark分析之Dependency

在Spark中,每一個RDD是對於資料集在某一狀態下的表現形式,比如說:map、filter、group by等都算一次操作,這個狀態有可能是從前一狀態轉換而來的;

因此換句話說一個RDD可能與之前的RDD(s)有依賴關係;RDD之間存在依賴關係;


根據依賴關係的不同,可以將RDD分成兩種不同的型別:寬依賴和窄依賴

窄依賴:一個父RDD的partition至多被子RDD的某個partition使用一次

寬依賴:一個父RDD的partition會被子RDD的partition使用多次,需要shuffle操作

圖中方框描述:外面的大方框是一個RDD,裡面的小方塊是RDD中的partition,多個partition組成一個RDD

窄依賴

定義:一個父RDD的partition至多被子RDD的某個partition使用一次; 不需要shuffle,partition範圍不會改變,一個partition經過transform後還是一個partition,雖然內容發生了變化;可以進行pipeline計算,快速完成;

在某個節點上可以一次性全部計算完所有的父partition(pipeline流水式的計算方式):

a.map().filter().reduceByKey() 這樣多步操作一次性計算完畢,而不需要第一步執行完後儲存起來,第二步再去讀取再計算再儲存。。。。。。

窄依賴可以在單節點上完成運算,非常高效。

容錯:某個partition掛了,快速將丟失的partition平行計算出來

容錯和計算速度都比寬依賴強。

窄依賴又分為兩種:

OneToOneDependency:一對一的依賴,一父一子,最典型的是map/filter。

RangeDependency:一定範圍的RDD直接對應,最典型的是Union。

  parent RDD的某個分割槽的partitions對應到child RDD中某個區間的partitions;
  union:多個parent RDD合併到一個chind RDD,故每個parent RDD都對應到child RDD中的一個區間;
  注意:union不會把多個partition合併成一個partition,而是簡單的把多個RDD的partitions放到一個RDD中,partition不會發生變化。

寬依賴

定義:一個父RDD的partition會被子RDD的partition使用多次;只能前面的算好後才能進行後續的計算;只有等到父partition的所有資料都傳輸到各個節點後才能計算(經典的mapreduce場景)

容錯:某個partition掛了,要計算前面所有的父partition,代價很大

spark是把map部分的資料計算完成後物化到map端的磁碟上,掛了之後直接從磁碟中讀取即可。

複製程式碼
class ShuffleDependency[K, V](
    @transient rdd: RDD[_ <: Product2[K, V]],
    val partitioner: Partitioner,
    val serializerClass: String = null)
  extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {
  val shuffleId: Int = rdd.context.newShuffleId()
}
複製程式碼

首先:需要基於PairRDD,因為一般需要依據key進行shuffle,所以資料結構往往是key-value;
其次:由於需要shuffle,所以就需要給出partitioner;
然後:shuffle不像map可以在local執行,往往需要網路傳輸或儲存,所以需要serializerClass;

最後:每個shuffle需要分配一個全域性的id,context.newShuffleId()的實現就是把全域性id累加;