1. 程式人生 > >Spark RDD之Dependency

Spark RDD之Dependency

概述

Partition是資料切分的邏輯,而Dependency是在Transformation過程中Partition的演化過程,根據Dependency的型別判斷資料的處理方式,Dependency可以分為NarrowDependency(窄依賴)和ShuffleDependency(寬依賴)。Dependency是一個抽象類,只有一個屬性RDD,該RDD為對應RDD的父RDD,因此Dependency是對父RDD的包裝,Dependency的基類如下:

abstract class Dependency[T] extends Serializable {
  def rdd:
RDD[T] }

窄依賴

窄依賴是父RDD的partition只被子RDD的一個partition引用,允許流水線執行。窄依賴的每個父RDD的分割槽只會傳入到一個子RDD分割槽中,通常可以在一個節點內完成轉換,即子RDD的分割槽放在該子RDD分割槽的父RDD分割槽的同個節點。NarrowDependency依然是一個抽象類,其中partitionId為子RDD的分割槽Id,並增加了getParents方法,定義如下:

abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
  def getParents
(partitionId: Int): Seq[Int] override def rdd: RDD[T] = _rdd }

窄依賴主要有如下三種具體實現:

  1. OneToOneDependency
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
  override def getParents(partitionId: Int): List[Int] = List(partitionId)
}
  1. RangeDependency
class
RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int) extends NarrowDependency[T](rdd) { override def getParents(partitionId: Int): List[Int] = { if (partitionId >= outStart && partitionId < outStart + length) { List(partitionId - outStart + inStart) } else { Nil } } }
  1. PruneDependency
private[spark] class PruneDependency[T](rdd: RDD[T], partitionFilterFunc: Int => Boolean)
  extends NarrowDependency[T](rdd) {

  @transient
  val partitions: Array[Partition] = rdd.partitions
    .filter(s => partitionFilterFunc(s.index)).zipWithIndex
    .map { case(split, idx) => new PartitionPruningRDDPartition(idx, split) : Partition }

  override def getParents(partitionId: Int): List[Int] = {
    List(partitions(partitionId).asInstanceOf[PartitionPruningRDDPartition].parentSplit.index)
  }
}

Dependency中的核心方法為getParents。OneToOneDependency和RangeDependency都是一對一的,較容易理解。PruningDependency子RDD的partition中,包含不止一個父RDD的partition,子RDD獲取依賴的方法getDependencies可以獲取多個dependency,而dependency中包含rdd,getParents方法是Dependency中重寫的方法,因此getParents是在已經確定RDD的情況下,根據子RDD的partitionId獲取父RDD中partitionId的行為,表徵partition的資料流向和處理方式,PartitionId是針對某個RDD中的分割槽序號。

  • OneToOneDependency:父RDD的partitionId和子RDD的partitionId是一致的,只是存在於不同的RDD中。
  • RangeDependency:inStart是父RDD開始的下標,outStart是子RDD開始的下標,length是partition的個數。這種依賴的典型是Union操作,將兩個RDD合併成一個RDD。父RDD的partitionId根據以下公式進行計算:子RDD的partitionId-子RDD的開始下標+父RDD的開始下標(inStart存在意義的為:同一個RDD中的partition可能存在不同的節點上,對於一個節點來說,partition是有偏移量的,而不是從0開始的)。
  • PruningDependency:較為複雜,子RDD中的partition會從不止一個父RDD的partition中獲取資料,這種依賴的典型操作是filterByRange操作。該Dependency定義了一個partitions陣列,該陣列是通過對父RDD進行了自定義的過濾操作後重新排序獲得的,同時根據序號和partition重新new了Partition的子類PartitionPruningRDDPartition並返回,因此partitions中包含著過濾後的父RDD中的partitions和子RDD中重組的partitionId,而partition中有index屬性,getParents方法便是返回了該屬性以獲得父RDD的partitionId,其中作為引數傳入的partitionId(子RDD中partition的Id號)的個數,恰好是過濾後剩餘partition的個數。

寬依賴

寬依賴是父RDD的partition被子RDD的多個partition引用,需要在執行過程中將同一個父RDD的分割槽傳入到不同的子RDD分割槽中,中間可能涉及多個節點之間的資料傳輸。

Shuffle設計到網路傳輸,所以要有序列化serializer,為了減少網路傳輸,可以加map端聚合,通過mapSideCombine和aggregator控制,還有key排序相關的keyOrdering,以及重輸出的資料如何分割槽的partitioner,其他資訊包括k,v和combiner的class資訊以及shuffleId。shuffle是個相對複雜且開銷大的過程,Partition之間的關係在shuffle處停止,因此shuffle是劃分stage的依據。

class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient private val _rdd: RDD[_ <: Product2[K, V]],
    val partitioner: Partitioner,
    val serializer: Serializer = SparkEnv.get.serializer,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[K, V, C]] = None,
    val mapSideCombine: Boolean = false)
  extends Dependency[Product2[K, V]] {

  override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]

  private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
  private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
  
  private[spark] val combinerClassName: Option[String] =
    Option(reflect.classTag[C]).map(_.runtimeClass.getName)

  val shuffleId: Int = _rdd.context.newShuffleId()

  val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
    shuffleId, _rdd.partitions.length, this)

  _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}

當RDD分割槽丟失時(某個節點故障),spark會對資料進行重算:

  1. 對於窄依賴,由於父RDD的一個分割槽只對應一個子RDD分割槽,這樣只需要重算和子RDD分割槽對應的父RDD分割槽即可,所以這個重算對資料的利用率是100%的;
  2. 對於寬依賴,重算的父RDD分割槽對應多個子RDD分割槽,這樣實際上父RDD 中只有一部分的資料是被用於恢復這個丟失的子RDD分割槽的,另一部分對應子RDD的其它未丟失分割槽,這就造成了多餘的計算;更一般的,寬依賴中子RDD分割槽通常來自多個父RDD分割槽,極端情況下,所有的父RDD分割槽都要進行重新計算。