Spark RDD基本概念、寬窄依賴、轉換行為操作
目錄
- RDD概述
- RDD的內部程式碼
- 案例
- 小總結
- 轉換、行動運算元
- 寬、窄依賴
- Reference
本文介紹一下rdd的基本屬性概念、rdd的轉換/行動操作、rdd的寬/窄依賴。
RDD:Resilient Distributed Dataset 彈性分散式資料集,是Spark中的基本抽象。
RDD表示可以並行操作的元素的不變分割槽集合。
RDD提供了許多基本的函式(map、filter、reduce等)供我們進行資料處理。
RDD概述
通常來說,每個RDD有5個主要的屬性組成:
分割槽列表
RDD是由多個分割槽組成的,分割槽是邏輯上的概念。RDD的計算是以分割槽為單位進行的。
用於計算每個分割槽的函式
作用於每個分割槽資料的計算函式。
對其他RDD的依賴關係列表
RDD中儲存了對於父RDD的依賴,根據依賴關係組成了Spark的DAG(有向無環圖),實現了spark巧妙、容錯的程式設計模型
針對鍵值型RDD的分割槽器
分割槽器針對鍵值型RDD而言的,將key傳入分割槽器獲取唯一的分割槽id。在shuffle中,分割槽器有很重要的體現。
對每個分割槽進行計算的首選位置列表
根據資料本地性的特性,獲取計算的首選位置列表,儘可能的把計算分配到靠近資料的位置,減少資料的網路傳輸。
RDD的內部程式碼
先看看基本概念的程式碼:
//建立此RDD的SparkContext def sparkContext: SparkContext = sc // 唯一的id val id: Int = sc.newRddId() // rdd友善的名字 @transient var name: String = _ // 分割槽器 val partitioner: Option[Partitioner] = None // 獲取依賴列表 // dependencies和partitions中都用到了checkpointRDD,如果進行了checkpoint,checkpointRDD表示進行checkpoint後的rdd final def dependencies: Seq[Dependency[_]] = { // 一對一的窄依賴 checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse { if (dependencies_ == null) { dependencies_ = getDependencies } dependencies_ } } // 獲取分割槽列表 final def partitions: Array[Partition] = { checkpointRDD.map(_.partitions).getOrElse { if (partitions_ == null) { partitions_ = getPartitions partitions_.zipWithIndex.foreach { case (partition, index) => require(partition.index == index, s"partitions($index).partition == ${partition.index}, but it should equal $index") } } partitions_ } } // 獲取分割槽的首選位置 final def preferredLocations(split: Partition): Seq[String] = { checkpointRDD.map(_.getPreferredLocations(split)).getOrElse { getPreferredLocations(split) } } // 對應到每個分割槽的計算函式 def compute(split: Partition, context: TaskContext): Iterator[T]
主要就是圍繞上面5個重要屬性的一些操作
常用的函式/運算元
// 返回僅包含滿足過濾條件的元素的新RDD。
def filter(f: T => Boolean): RDD[T] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[T, T](
this,
(context, pid, iter) => iter.filter(cleanF),
preservesPartitioning = true)
}
// 通過將函式應用於此RDD的所有元素來返回新的RDD。
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
// 首先向該RDD的所有元素應用函式,然後將結果展平,以返回新的RDD。
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}
我們可以發現幾乎每個運算元都會以當前RDD和對應的計算函式建立新的RDD,每個子RDD都持有父RDD的引用。
這就印證了RDD的不變性,也表明了RDD的計算是通過對RDD進行轉換實現的。
案例
val words = Seq("hello spark", "hello scala", "hello java")
val rdd = sc.makeRDD(words)
rdd
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.foreach(println(_))
上面是一個簡單的RDD的操作,我們先呼叫makeRDD建立了一個RDD,之後對rdd進行一頓運算元呼叫。
首先呼叫flatMap,flatMap內部會以當前rdd
和我們傳入的_.split(" ")
構建新的MapPartitionsRDD;
之後map,map以上步生成的MapPartitionsRDD
和我們傳入的(_, 1)
構造新的MapPartitionsRDD;
之後reduceByKey,reduceByKey構造新的RDD;
走到foreach,foreach是行動操作,觸發計算,輸出。
小總結
- RDD內部的計算除action運算元以外,其他運算元都是懶執行,不會觸發計算,只是進行RDD的轉換。
- RDD的計算是基於分割槽為單位計算的,我們傳進去的函式,作用於分割槽進行計算
轉換、行動運算元
從上面知道RDD是懶執行的,只有遇到行動運算元才執行計算。
轉換操作:在內部對根據父RDD建立新的RDD,不執行計算
行動操作:內部會呼叫sc.runJob
,提交作業、劃分階段、執行作業。
一些常見的行動操作
foreach、foreachPartition、collect、reduce、count
除行動操作外,都是轉換操作
寬、窄依賴
寬窄依賴是shuffle和劃分排程的重要依據。
先看看spark中與依賴有關的幾個類(一層一層繼承關係):
Dependency依賴的頂級父類
NarrowDependency 窄依賴
OneToOneDependency 表示父RDD和子RDD分割槽之間的一對一依賴關係的窄依賴
RangeDependency 表示父RDD和子RDD中分割槽範圍之間的一對一依賴關係的窄依賴
ShuffleDependency 寬依賴
先說寬窄依賴的概念:
窄依賴:父RDD的每個分割槽只被一個子RDD分割槽使用
寬依賴:父RDD的每個分割槽都有可能被多個子RDD分割槽使用
其實就是父RDD的一個分割槽會被傳到幾個子RDD分割槽的區別。如果被傳到一個子RDD分割槽,就可以不需要移動資料(移動計算);如果被傳到多個子RDD分割槽,就需要進行資料的傳輸。
接下來看看Dependency內部的一些屬性及方法:
// 依賴對應的rdd,其實就是當前rdd的父rdd。寬依賴和窄依賴都有這個屬性
def rdd: RDD[T]
// 獲取子分割槽對應的父分割槽(窄依賴的方法)
def getParents(partitionId: Int): Seq[Int]
// 以下是寬依賴的屬性及方法
// 對應鍵值RDD的分割槽器
val partitioner: Partitioner
// 在資料傳輸時的序列化方法
val serializer: Serializer = SparkEnv.get.serializer
// 鍵的排序方式
val keyOrdering: Option[Ordering[K]] = None
// 一組用於聚合資料的功能
val aggregator: Option[Aggregator[K, V, C]] = None
// 是否需要map端預聚合
val mapSideCombine: Boolean = false
// 當前寬依賴的id
val shuffleId: Int = _rdd.context.newShuffleId()
// 向管理員註冊一個shuffle,並獲取一個控制代碼,以將其傳遞給任務
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.length, this)
一些常見的寬窄依賴
窄依賴:map、filter、union、mapPartitions、join(當分割槽器是HashPartitioner)
寬依賴:sortByKey、join(分割槽器不是HashPartitioner時)
最後說一下reduceByKey,順便說一下為什麼當分割槽器HashPartitioner時就是窄依賴。
reduceByKey是用來將key分組後,執行我們傳入的函式。
它是窄依賴,它內部預設會使用HashPartitioner分割槽。
同一個key進去HashPartitioner得到的分割槽id是一樣的,這樣進行計算前後同一個key得到的分割槽都一樣,父RDD的分割槽就只被子RDD的一個分割槽依賴,就不需要移動資料。
所以join、reduceByKey在分割槽器是HashPartitioner時是窄依賴。
end. 個人理解,如有偏差,歡迎交流指正。
Reference
- 《圖解Spark核心技術與案例實戰》
- 寬窄依賴:https://www.jianshu.com/p/5c2301dfa360
扶我起來,我還能學。
個人公眾號:碼農峰,定時推送行業資訊,持續釋出原創技術文章,歡迎大家關