1. 程式人生 > >Spark RDD基本概念、寬窄依賴、轉換行為操作

Spark RDD基本概念、寬窄依賴、轉換行為操作

目錄

  • RDD概述
    • RDD的內部程式碼
    • 案例
    • 小總結
  • 轉換、行動運算元
  • 寬、窄依賴
  • Reference

本文介紹一下rdd的基本屬性概念、rdd的轉換/行動操作、rdd的寬/窄依賴。

RDD:Resilient Distributed Dataset 彈性分散式資料集,是Spark中的基本抽象。

RDD表示可以並行操作的元素的不變分割槽集合。

RDD提供了許多基本的函式(map、filter、reduce等)供我們進行資料處理。

RDD概述

通常來說,每個RDD有5個主要的屬性組成:

  • 分割槽列表

    RDD是由多個分割槽組成的,分割槽是邏輯上的概念。RDD的計算是以分割槽為單位進行的。

  • 用於計算每個分割槽的函式

    作用於每個分割槽資料的計算函式。

  • 對其他RDD的依賴關係列表

    RDD中儲存了對於父RDD的依賴,根據依賴關係組成了Spark的DAG(有向無環圖),實現了spark巧妙、容錯的程式設計模型

  • 針對鍵值型RDD的分割槽器

    分割槽器針對鍵值型RDD而言的,將key傳入分割槽器獲取唯一的分割槽id。在shuffle中,分割槽器有很重要的體現。

  • 對每個分割槽進行計算的首選位置列表

    根據資料本地性的特性,獲取計算的首選位置列表,儘可能的把計算分配到靠近資料的位置,減少資料的網路傳輸。

RDD的內部程式碼

先看看基本概念的程式碼:
//建立此RDD的SparkContext
def sparkContext: SparkContext = sc
// 唯一的id
val id: Int = sc.newRddId()
// rdd友善的名字
@transient var name: String = _
// 分割槽器
val partitioner: Option[Partitioner] = None
// 獲取依賴列表
// dependencies和partitions中都用到了checkpointRDD,如果進行了checkpoint,checkpointRDD表示進行checkpoint後的rdd
final def dependencies: Seq[Dependency[_]] = {
    // 一對一的窄依賴
    checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
        if (dependencies_ == null) {
            dependencies_ = getDependencies
        }
        dependencies_
    }
}
// 獲取分割槽列表
final def partitions: Array[Partition] = {
    checkpointRDD.map(_.partitions).getOrElse {
        if (partitions_ == null) {
            partitions_ = getPartitions
            partitions_.zipWithIndex.foreach { case (partition, index) =>
                require(partition.index == index,
                        s"partitions($index).partition == ${partition.index}, but it should equal $index")
            }
        }
        partitions_
    }
}
// 獲取分割槽的首選位置
final def preferredLocations(split: Partition): Seq[String] = {
    checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {
        getPreferredLocations(split)
    }
}
// 對應到每個分割槽的計算函式
def compute(split: Partition, context: TaskContext): Iterator[T]

主要就是圍繞上面5個重要屬性的一些操作

常用的函式/運算元
// 返回僅包含滿足過濾條件的元素的新RDD。
def filter(f: T => Boolean): RDD[T] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[T, T](
        this,
        (context, pid, iter) => iter.filter(cleanF),
        preservesPartitioning = true)
}
// 通過將函式應用於此RDD的所有元素來返回新的RDD。
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
// 首先向該RDD的所有元素應用函式,然後將結果展平,以返回新的RDD。
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}

我們可以發現幾乎每個運算元都會以當前RDD和對應的計算函式建立新的RDD,每個子RDD都持有父RDD的引用。

這就印證了RDD的不變性,也表明了RDD的計算是通過對RDD進行轉換實現的。

案例

val words = Seq("hello spark", "hello scala", "hello java")
val rdd = sc.makeRDD(words)
rdd
    .flatMap(_.split(" "))
    .map((_, 1))
    .reduceByKey(_ + _)
    .foreach(println(_))

上面是一個簡單的RDD的操作,我們先呼叫makeRDD建立了一個RDD,之後對rdd進行一頓運算元呼叫。

首先呼叫flatMap,flatMap內部會以當前rdd和我們傳入的_.split(" ")構建新的MapPartitionsRDD;

之後map,map以上步生成的MapPartitionsRDD和我們傳入的(_, 1)構造新的MapPartitionsRDD;

之後reduceByKey,reduceByKey構造新的RDD;

走到foreach,foreach是行動操作,觸發計算,輸出。

小總結

  • RDD內部的計算除action運算元以外,其他運算元都是懶執行,不會觸發計算,只是進行RDD的轉換。
  • RDD的計算是基於分割槽為單位計算的,我們傳進去的函式,作用於分割槽進行計算

轉換、行動運算元

從上面知道RDD是懶執行的,只有遇到行動運算元才執行計算。

轉換操作:在內部對根據父RDD建立新的RDD,不執行計算

行動操作:內部會呼叫sc.runJob,提交作業、劃分階段、執行作業。

一些常見的行動操作

foreach、foreachPartition、collect、reduce、count

除行動操作外,都是轉換操作

寬、窄依賴

寬窄依賴是shuffle和劃分排程的重要依據。

先看看spark中與依賴有關的幾個類(一層一層繼承關係):

Dependency依賴的頂級父類
    NarrowDependency 窄依賴
        OneToOneDependency 表示父RDD和子RDD分割槽之間的一對一依賴關係的窄依賴
        RangeDependency 表示父RDD和子RDD中分割槽範圍之間的一對一依賴關係的窄依賴
    ShuffleDependency 寬依賴

先說寬窄依賴的概念:

窄依賴:父RDD的每個分割槽只被一個子RDD分割槽使用

寬依賴:父RDD的每個分割槽都有可能被多個子RDD分割槽使用

其實就是父RDD的一個分割槽會被傳到幾個子RDD分割槽的區別。如果被傳到一個子RDD分割槽,就可以不需要移動資料(移動計算);如果被傳到多個子RDD分割槽,就需要進行資料的傳輸。

接下來看看Dependency內部的一些屬性及方法:

// 依賴對應的rdd,其實就是當前rdd的父rdd。寬依賴和窄依賴都有這個屬性
def rdd: RDD[T]
// 獲取子分割槽對應的父分割槽(窄依賴的方法)
def getParents(partitionId: Int): Seq[Int]

// 以下是寬依賴的屬性及方法
// 對應鍵值RDD的分割槽器
val partitioner: Partitioner
// 在資料傳輸時的序列化方法
val serializer: Serializer = SparkEnv.get.serializer
// 鍵的排序方式
val keyOrdering: Option[Ordering[K]] = None
// 一組用於聚合資料的功能
val aggregator: Option[Aggregator[K, V, C]] = None
// 是否需要map端預聚合
val mapSideCombine: Boolean = false
// 當前寬依賴的id
val shuffleId: Int = _rdd.context.newShuffleId()
// 向管理員註冊一個shuffle,並獲取一個控制代碼,以將其傳遞給任務
val shuffleHandle: ShuffleHandle =  _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.length, this)
一些常見的寬窄依賴

窄依賴:map、filter、union、mapPartitions、join(當分割槽器是HashPartitioner)

寬依賴:sortByKey、join(分割槽器不是HashPartitioner時)


最後說一下reduceByKey,順便說一下為什麼當分割槽器HashPartitioner時就是窄依賴。

reduceByKey是用來將key分組後,執行我們傳入的函式。

它是窄依賴,它內部預設會使用HashPartitioner分割槽。

同一個key進去HashPartitioner得到的分割槽id是一樣的,這樣進行計算前後同一個key得到的分割槽都一樣,父RDD的分割槽就只被子RDD的一個分割槽依賴,就不需要移動資料。

所以join、reduceByKey在分割槽器是HashPartitioner時是窄依賴。


end. 個人理解,如有偏差,歡迎交流指正。

Reference

  • 《圖解Spark核心技術與案例實戰》
  • 寬窄依賴:https://www.jianshu.com/p/5c2301dfa360

扶我起來,我還能學。



個人公眾號:碼農峰,定時推送行業資訊,持續釋出原創技術文章,歡迎大家關