1. 程式人生 > >Spark RDD 特徵及其依賴

Spark RDD 特徵及其依賴

1 RDD特徵

分割槽(partition)

有一個數據分片列表,能夠將資料進行切分,切分後的資料能夠進行平行計算,是資料集的原子組成部分

函式 compute

計算每個分片,得出一個可遍歷的結果,用於說明在父RDD上執行何種計算

依賴 dependency

計算每個RDD對父RDD的依賴列表,源RDD沒有依賴,通過依賴關係描述血統(lineage)

優先位置(可選)

每一個分片的優先計算位置(preferred location),HDFS的Block所在的位置就應該是優先計算為止(移動計算而不移動資料)

分割槽策略(可選)

描述分割槽位置和資料存放位置,鍵值對的RDD根據雜湊值進行分割槽,類似於MR中的Partitioner介面,根據KEY來分配位置

2 RDD依賴

RDD 的容錯機制是通過記錄更新來實現的,且記錄的是粗粒度的轉換操作。在外部,我們將記錄的資訊稱為血統(Lineage)關係,而到了原始碼級別,Apache Spark 記錄的則是 RDD 之間的依賴(Dependency)關係。在一次轉換操作中,建立得到的新 RDD 稱為子 RDD,提供資料的 RDD 稱為父 RDD,父 RDD 可能會存在多個,我們把子 RDD 與父 RDD 之間的關係稱為依賴關係,或者可以說是子 RDD 依賴於父 RDD。
依賴只儲存父 RDD 資訊,轉換操作的其他資訊,如資料處理函式,會在建立 RDD 時候,儲存在新的 RDD 內。依賴在 Apache Spark 原始碼中的對應實現是 Dependency 抽象類

/**
 * :: DeveloperApi ::
 * Base class for dependencies.
 */
@DeveloperApi
abstract class Dependency[T] extends Serializable {
  def rdd: RDD[T]
}

每個 Dependency 子類內部都會儲存一個 RDD 物件,對應一個父 RDD,如果一次轉換轉換操作有多個父 RDD,就會對應產生多個 Dependency 物件,所有的 Dependency 物件儲存在子 RDD 內部,通過遍歷 RDD 內部的 Dependency 物件,就能獲取該 RDD 所有依賴的父 RDD。

窄依賴 (narrow dependency)

指一個父RDD的分割槽最多被一個子RDD所使用,表現為一個父RDD的分割槽對應於 一個子RDD的分割槽(第一類),或多個父RDD的分割槽對應於一個子RDD的分割槽(第二類)。
換句話說,父RDD中的每個分割槽都是不可分割,它必須被整個得交付給子RDD中的一個分割槽。下圖展示了幾類常見的窄依賴及其對應的轉換操作
在這裡插入圖片描述
窄依賴的實現在 NarrowDependency 抽象類中。

/**
 * :: DeveloperApi ::
 * Base class for dependencies where each partition of the child RDD depends on a small number
 * of partitions of the parent RDD. Narrow dependencies allow for pipelined execution.
 */
@DeveloperApi
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
  /**
   * Get the parent partitions for a child partition.
   * @param partitionId a partition of the child RDD
   * @return the partitions of the parent RDD that the child partition depends upon
   */
  def getParents(partitionId: Int): Seq[Int]

  override def rdd: RDD[T] = _rdd
}

NarrowDependency 要求子類實現 getParent 方法,用於獲取一個分割槽資料來源於父 RDD 中的哪些分割槽(雖然要求返回 Seq[Int],實際上卻只有一個元素)。
窄依賴可進一步分類成一對一依賴和範圍依賴,對應實現分別是 OneToOneDependency 類和***RangeDependency*** 類。

寬依賴(wide dependency)(也成為shuffle依賴,Shuffle Dependency)

Shuffle 依賴中,父 RDD 中的分割槽可能會被多個子 RDD 分割槽使用。因為父 RDD 中一個分割槽內的資料會被分割,傳送給子 RDD 的所有分割槽,因此 Shuffle 依賴也意味著父 RDD 與子 RDD 之間存在著 Shuffle 過程。下圖展示了幾類常見的Shuffle依賴及其對應的轉換操作。
在這裡插入圖片描述
依賴關係是兩個 RDD 之間的依賴,因此若一次轉換操作中父 RDD 有多個,則可能會同時包含窄依賴和 Shuffle 依賴,下圖所示的 Join 操作,RDD a 和 RDD c 採用了相同的分割槽器,兩個 RDD 之間是窄依賴,Rdd b 的分割槽器與 RDD c 不同,因此它們之間是 Shuffle 依賴,具體實現可參見 CoGroupedRDD 類的 getDependencies 方法。這裡能夠再次發現:一個依賴對應的是兩個 RDD,而不是一次轉換操作
在這裡插入圖片描述

  override def getDependencies: Seq[Dependency[_]] = {
    rdds.map { rdd: RDD[_ <: Product2[K, _]] =>
      /* I: Partitioner 相同,則是 OneToOneDepdencency */
      if (rdd.partitioner == Some(part)) {
        logDebug("Adding one-to-one dependency with " + rdd)
        new OneToOneDependency(rdd)
      } else {
        /* I: Partitioner 不同,則是 ShuffleDependency */
        logDebug("Adding shuffle dependency with " + rdd)
        new ShuffleDependency[K, Any, CoGroupCombiner](rdd, part, serializer)
      }
    }
  }
一對一依賴

一對一依賴表示子 RDD 分割槽的編號與父 RDD 分割槽的編號完全一致的情況,若兩個 RDD 之間存在著一對一依賴,則子 RDD 的分割槽個數、分割槽內記錄的個數都將繼承自父 RDD。
一對一依賴的實現很簡單,如下所示。

/**
 * :: DeveloperApi ::
 * Represents a one-to-one dependency between partitions of the parent and child RDDs.
 */
@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
  override def getParents(partitionId: Int) = List(partitionId)
}
範圍依賴

範圍依賴是依賴關係中的一個特例,只被用於表示 UnionRDD 與父 RDD 之間的依賴關係。相比一對一依賴,除了第一個父 RDD,其他父 RDD 和子 RDD 的分割槽編號不再一致,Apache Spark 統一將unionRDD 與父 RDD 之間(包含第一個 RDD)的關係都叫做範圍依賴。範圍依賴的實現如下。

/**
 * :: DeveloperApi ::
 * Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
 * @param rdd the parent RDD
 * @param inStart the start of the range in the parent RDD
 * @param outStart the start of the range in the child RDD
 * @param length the length of the range
 */
@DeveloperApi
  class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
  extends NarrowDependency[T](rdd) {

  override def getParents(partitionId: Int) = {
    if (partitionId >= outStart && partitionId < outStart + length) {
      List(partitionId - outStart + inStart)
    } else {
      Nil
    }
  }
}

RangeDepdencency 類中 getParents 的一個示例如下圖所示,對於 UnionRDD 中編號為 3 的分割槽,可以計算得到其資料來源於父 RDD 中編號為 2 的分割槽。
在這裡插入圖片描述

Shuffle 依賴

Shuffle 依賴的對應實現為 ShuffleDependency 類,其原始碼如下。

/**
 * :: DeveloperApi ::
 * Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle,
 * the RDD is transient since we don't need it on the executor side.
 *
 * @param _rdd the parent RDD
 * @param partitioner partitioner used to partition the shuffle output
 * @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If set to None,
 *                   the default serializer, as specified by `spark.serializer` config option, will
 *                   be used.
 */
@DeveloperApi
class ShuffleDependency[K, V, C](
    @transient _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Option[Serializer] = None,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[K, V, C]] = None,
    val mapSideCombine: Boolean = false)
  extends Dependency[Product2[K, V]] {

  override def rdd = _rdd.asInstanceOf[RDD[Product2[K, V]]]

  val shuffleId: Int = _rdd.context.newShuffleId()

  val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
      shuffleId, _rdd.partitions.size, this)

  _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}

ShuffleDependency 類中幾個成員的作用如下:
● rdd:用於表示 Shuffle 依賴中,子 RDD 所依賴的父 RDD。
● shuffleId:Shuffle 的 ID 編號,在一個 Spark 應用程式中,每個 Shuffle 的編號都是唯一的。
● shuffleHandle:Shuffle 控制代碼,ShuffleHandle 內部一般包含 Shuffle ID、Mapper 的個數以及對應的 Shuffle 依賴,在執行 ShuffleMapTask 時候,任務可以通過 ShuffleManager 獲取得到該控制代碼,並進一步得到 Shuffle 相關資訊。
● partitioner:分割槽器,用於決定 Shuffle 過程中 Reducer 的個數(實際上是子 RDD 的分割槽個數)以及 Map 端的一條資料記錄應該分配給哪一個 Reducer,也可以被用在 CoGroupedRDD 中,確定父 RDD 與子 RDD 之間的依賴關係型別。
● serializer:序列化器。用於 Shuffle 過程中 Map 端資料的序列化和 Reduce 端資料的反序列化。
● KeyOrdering:鍵值排序策略,用於決定子 RDD 的一個分割槽內,如何根據鍵值對 型別資料記錄進行排序。
● Aggregator:聚合器,內部包含了多個聚合函式,比較重要的函式有 createCombiner:V => C,mergeValue: (C, V) => C以及 mergeCombiners: (C, C) => C。例如,對於 groupByKey 操作,createCombiner 表示把第一個元素放入到集合中,mergeValue 表示一個元素新增到集合中,mergeCombiners 表示把兩個集合進行合併。這些函式被用於 Shuffle 過程中資料的聚合。
● mapSideCombine:用於指定 Shuffle 過程中是否需要在 map 端進行 combine 操作。如果指定該值為 true,由於 combine 操作需要用到聚合器中的相關聚合函式,因此 Aggregator 不能為空,否則 Apache Spark 會丟擲異常。例如:groupByKey 轉換操作對應的ShuffleDependency 中,mapSideCombine = false,而 reduceByKey 轉換操作中,mapSideCombine = true。

依賴與容錯機制

介紹完依賴的類別和實現之後,回過頭來,從分割槽的角度繼續探究 Apache Spark 是如何通過依賴關係來實現容錯機制的。下圖給出了一張依賴關係圖,fileRDD 經歷了 map、reduce 以及filter 三次轉換操作,得到了最終的 RDD,其中,map、filter 操作對應的依賴為窄依賴,reduce 操作對應的是 Shuffle 依賴。
在這裡插入圖片描述

假設最終 RDD 第一塊分割槽內的資料因為某些原因丟失了,由於 RDD 內的每一個分割槽都會記錄其對應的父 RDD 分割槽的資訊,因此沿著下圖所示的依賴關係往回走,我們就能找到該分割槽資料最終來源於 fileRDD 的所有分割槽,再沿著依賴關係往後計算路徑中的每一個分割槽資料,即可得到丟失的分割槽資料。
在這裡插入圖片描述
這個例子並不是特別嚴謹,按照我們的思維,只有執行了持久化,儲存在儲存介質中的 RDD 分割槽才會出現數據丟失的情況,但是上例中最終的 RDD 並沒有執行持久化操作。事實上,Apache Spark 將沒有被持久化資料重新被計算,以及持久化的資料第一次被計算,也等價視為資料“丟失”,在 1.7 節中我們會看到這一點。
依賴與平行計算
在上一節中我們看到,在 RDD 中,可以通過計算鏈(Computing Chain)來計算某個 RDD 分割槽內的資料,我們也知道分割槽是平行計算的基本單位,這時候可能會有一種想法:能否把 RDD 每個分割槽內資料的計算當成一個並行任務,每個並行任務包含一個計算鏈,將一個計算鏈交付給一個 CPU 核心去執行,叢集中的 CPU 核心一起把 RDD 內的所有分割槽計算出來。
答案是可以,這得益於 RDD 內部分割槽的資料依賴相互之間並不會干擾,而 Apache Spark 也是這麼做的,但在實現過程中,仍有很多實際問題需要去考慮。進一步觀察窄依賴、Shuffle 依賴在做平行計算時候的異同點。
先來看下方左側的依賴圖,依賴圖中所有的依賴關係都是窄依賴(包括一對一依賴和範圍依賴),可以看到,不僅計算鏈是獨立不干擾的(所以可以平行計算),所有計算鏈內的每個分割槽單元的計算工作也不會發生重複,如右側的圖所示。這意味著除非執行了持久化操作,否則計算過程中產生的中間資料我們沒有必要保留 —— 因為當前分割槽的資料只會給計算鏈中的下一個分割槽使用,而不用專門保留給其他計算鏈使用。
在這裡插入圖片描述

再來觀察 Shuffle 依賴的計算鏈,如圖下方左側的圖中,既有窄依賴,又有 Shuffle 依賴,由於 Shuffle 依賴中,子 RDD 一個分割槽的資料依賴於父 RDD 內所有分割槽的資料,當我們想計算末 RDD 中一個分割槽的資料時,Shuffle 依賴處需要把父 RDD 所有分割槽的資料計算出來,如右側的圖所示(紫色表示最後兩個分割槽計算鏈共同經過的地方) —— 而這些資料,在計算末 RDD 另外一個分割槽的資料時候,同樣會被用到。
在這裡插入圖片描述
如果我們做到計算鏈的平行計算的話,這就意味著,要麼 Shuffle 依賴處父 RDD 的資料在每次需要使用的時候都重複計算一遍,要麼想辦法把父 RDD 資料儲存起來,提供給其餘分割槽的資料計算使用。
Apache Spark 採用的是第二種辦法,但儲存資料的方法可能與想象中的會有所不同,Spark 把計算鏈從 Shuffle 依賴處斷開,劃分成不同的階段(Stage),階段之間存在依賴關係(其實就是 Shuffle 依賴),從而可以構建一張不同階段之間的有向無環圖(DAG)。