1. 程式人生 > >Spark 系列(三)—— 彈性式資料集RDDs

Spark 系列(三)—— 彈性式資料集RDDs

一、RDD簡介

RDD 全稱為 Resilient Distributed Datasets,是 Spark 最基本的資料抽象,它是隻讀的、分割槽記錄的集合,支援並行操作,可以由外部資料集或其他 RDD 轉換而來,它具有以下特性:

  • 一個 RDD 由一個或者多個分割槽(Partitions)組成。對於 RDD 來說,每個分割槽會被一個計算任務所處理,使用者可以在建立 RDD 時指定其分割槽個數,如果沒有指定,則預設採用程式所分配到的 CPU 的核心數;
  • RDD 擁有一個用於計算分割槽的函式 compute;
  • RDD 會儲存彼此間的依賴關係,RDD 的每次轉換都會生成一個新的依賴關係,這種 RDD 之間的依賴關係就像流水線一樣。在部分分割槽資料丟失後,可以通過這種依賴關係重新計算丟失的分割槽資料,而不是對 RDD 的所有分割槽進行重新計算;
  • Key-Value 型的 RDD 還擁有 Partitioner(分割槽器),用於決定資料被儲存在哪個分割槽中,目前 Spark 中支援 HashPartitioner(按照雜湊分割槽) 和 RangeParationer(按照範圍進行分割槽);
  • 一個優先位置列表 (可選),用於儲存每個分割槽的優先位置 (prefered location)。對於一個 HDFS 檔案來說,這個列表儲存的就是每個分割槽所在的塊的位置,按照“移動資料不如移動計算“的理念,Spark 在進行任務排程的時候,會盡可能的將計算任務分配到其所要處理資料塊的儲存位置。

RDD[T] 抽象類的部分相關程式碼如下:

// 由子類實現以計算給定分割槽
def compute(split: Partition, context: TaskContext): Iterator[T]

// 獲取所有分割槽
protected def getPartitions: Array[Partition]

// 獲取所有依賴關係
protected def getDependencies: Seq[Dependency[_]] = deps

// 獲取優先位置列表
protected def getPreferredLocations(split: Partition): Seq[String] = Nil

// 分割槽器 由子類重寫以指定它們的分割槽方式
@transient val partitioner: Option[Partitioner] = None

二、建立RDD

RDD 有兩種建立方式,分別介紹如下:

2.1 由現有集合建立

這裡使用 spark-shell 進行測試,啟動命令如下:

spark-shell --master local[4]

啟動 spark-shell 後,程式會自動建立應用上下文,相當於執行了下面的 Scala 語句:

val conf = new SparkConf().setAppName("Spark shell").setMaster("local[4]")
val sc = new SparkContext(conf)

由現有集合建立 RDD,你可以在建立時指定其分割槽個數,如果沒有指定,則採用程式所分配到的 CPU 的核心數:

val data = Array(1, 2, 3, 4, 5)
// 由現有集合建立 RDD,預設分割槽數為程式所分配到的 CPU 的核心數
val dataRDD = sc.parallelize(data) 
// 檢視分割槽數
dataRDD.getNumPartitions
// 明確指定分割槽數
val dataRDD = sc.parallelize(data,2)

執行結果如下:

2.2 引用外部儲存系統中的資料集

引用外部儲存系統中的資料集,例如本地檔案系統,HDFS,HBase 或支援 Hadoop InputFormat 的任何資料來源。

val fileRDD = sc.textFile("/usr/file/emp.txt")
// 獲取第一行文字
fileRDD.take(1)

使用外部儲存系統時需要注意以下兩點:

  • 如果在叢集環境下從本地檔案系統讀取資料,則要求該檔案必須在叢集中所有機器上都存在,且路徑相同;
  • 支援目錄路徑,支援壓縮檔案,支援使用萬用字元。

2.3 textFile & wholeTextFiles

兩者都可以用來讀取外部檔案,但是返回格式是不同的:

  • textFile:其返回格式是 RDD[String] ,返回的是就是檔案內容,RDD 中每一個元素對應一行資料;
  • wholeTextFiles:其返回格式是 RDD[(String, String)],元組中第一個引數是檔案路徑,第二個引數是檔案內容;
  • 兩者都提供第二個引數來控制最小分割槽數;
  • 從 HDFS 上讀取檔案時,Spark 會為每個塊建立一個分割槽。
def textFile(path: String,minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {...}
def wholeTextFiles(path: String,minPartitions: Int = defaultMinPartitions): RDD[(String, String)]={..}

三、操作RDD

RDD 支援兩種型別的操作:transformations(轉換,從現有資料集建立新資料集)和 actions(在資料集上執行計算後將值返回到驅動程式)。RDD 中的所有轉換操作都是惰性的,它們只是記住這些轉換操作,但不會立即執行,只有遇到 action 操作後才會真正的進行計算,這類似於函數語言程式設計中的惰性求值。

val list = List(1, 2, 3)
// map 是一個 transformations 操作,而 foreach 是一個 actions 操作
sc.parallelize(list).map(_ * 10).foreach(println)
// 輸出: 10 20 30

四、快取RDD

4.1 快取級別

Spark 速度非常快的一個原因是 RDD 支援快取。成功快取後,如果之後的操作使用到了該資料集,則直接從快取中獲取。雖然快取也有丟失的風險,但是由於 RDD 之間的依賴關係,如果某個分割槽的快取資料丟失,只需要重新計算該分割槽即可。

Spark 支援多種快取級別 :

Storage Level
(儲存級別)
Meaning(含義)
MEMORY_ONLY 預設的快取級別,將 RDD 以反序列化的 Java 物件的形式儲存在 JVM 中。如果記憶體空間不夠,則部分分割槽資料將不再快取。
MEMORY_AND_DISK 將 RDD 以反序列化的 Java 物件的形式儲存 JVM 中。如果記憶體空間不夠,將未快取的分割槽資料儲存到磁碟,在需要使用這些分割槽時從磁碟讀取。
MEMORY_ONLY_SER
將 RDD 以序列化的 Java 物件的形式進行儲存(每個分割槽為一個 byte 陣列)。這種方式比反序列化物件節省儲存空間,但在讀取時會增加 CPU 的計算負擔。僅支援 Java 和 Scala 。
MEMORY_AND_DISK_SER
類似於 MEMORY_ONLY_SER,但是溢位的分割槽資料會儲存到磁碟,而不是在用到它們時重新計算。僅支援 Java 和 Scala。
DISK_ONLY 只在磁碟上快取 RDD
MEMORY_ONLY_2,
MEMORY_AND_DISK_2, etc
與上面的對應級別功能相同,但是會為每個分割槽在叢集中的兩個節點上建立副本。
OFF_HEAP MEMORY_ONLY_SER 類似,但將資料儲存在堆外記憶體中。這需要啟用堆外記憶體。

啟動堆外記憶體需要配置兩個引數:

  • spark.memory.offHeap.enabled :是否開啟堆外記憶體,預設值為 false,需要設定為 true;
  • spark.memory.offHeap.size : 堆外記憶體空間的大小,預設值為 0,需要設定為正值。

4.2 使用快取

快取資料的方法有兩個:persistcachecache 內部呼叫的也是 persist,它是 persist 的特殊化形式,等價於 persist(StorageLevel.MEMORY_ONLY)。示例如下:

// 所有儲存級別均定義在 StorageLevel 物件中
fileRDD.persist(StorageLevel.MEMORY_AND_DISK)
fileRDD.cache()

4.3 移除快取

Spark 會自動監視每個節點上的快取使用情況,並按照最近最少使用(LRU)的規則刪除舊資料分割槽。當然,你也可以使用 RDD.unpersist() 方法進行手動刪除。

五、理解shuffle

5.1 shuffle介紹

在 Spark 中,一個任務對應一個分割槽,通常不會跨分割槽操作資料。但如果遇到 reduceByKey 等操作,Spark 必須從所有分割槽讀取資料,並查詢所有鍵的所有值,然後彙總在一起以計算每個鍵的最終結果 ,這稱為 Shuffle

5.2 Shuffle的影響

Shuffle 是一項昂貴的操作,因為它通常會跨節點操作資料,這會涉及磁碟 I/O,網路 I/O,和資料序列化。某些 Shuffle 操作還會消耗大量的堆記憶體,因為它們使用堆記憶體來臨時儲存需要網路傳輸的資料。Shuffle 還會在磁碟上生成大量中間檔案,從 Spark 1.3 開始,這些檔案將被保留,直到相應的 RDD 不再使用並進行垃圾回收,這樣做是為了避免在計算時重複建立 Shuffle 檔案。如果應用程式長期保留對這些 RDD 的引用,則垃圾回收可能在很長一段時間後才會發生,這意味著長時間執行的 Spark 作業可能會佔用大量磁碟空間,通常可以使用 spark.local.dir 引數來指定這些臨時檔案的儲存目錄。

5.3 導致Shuffle的操作

由於 Shuffle 操作對效能的影響比較大,所以需要特別注意使用,以下操作都會導致 Shuffle:

  • 涉及到重新分割槽操作: 如 repartitioncoalesce
  • 所有涉及到 ByKey 的操作:如 groupByKeyreduceByKey,但 countByKey 除外;
  • 聯結操作:如 cogroupjoin

五、寬依賴和窄依賴

RDD 和它的父 RDD(s) 之間的依賴關係分為兩種不同的型別:

  • 窄依賴 (narrow dependency):父 RDDs 的一個分割槽最多被子 RDDs 一個分割槽所依賴;
  • 寬依賴 (wide dependency):父 RDDs 的一個分割槽可以被子 RDDs 的多個子分割槽所依賴。

如下圖,每一個方框表示一個 RDD,帶有顏色的矩形表示分割槽:

區分這兩種依賴是非常有用的:

  • 首先,窄依賴允許在一個叢集節點上以流水線的方式(pipeline)對父分割槽資料進行計算,例如先執行 map 操作,然後執行 filter 操作。而寬依賴則需要計算好所有父分割槽的資料,然後再在節點之間進行 Shuffle,這與 MapReduce 類似。
  • 窄依賴能夠更有效地進行資料恢復,因為只需重新對丟失分割槽的父分割槽進行計算,且不同節點之間可以平行計算;而對於寬依賴而言,如果資料丟失,則需要對所有父分割槽資料進行計算並再次 Shuffle。

六、DAG的生成

RDD(s) 及其之間的依賴關係組成了 DAG(有向無環圖),DAG 定義了這些 RDD(s) 之間的 Lineage(血統) 關係,通過血統關係,如果一個 RDD 的部分或者全部計算結果丟失了,也可以重新進行計算。那麼 Spark 是如何根據 DAG 來生成計算任務呢?主要是根據依賴關係的不同將 DAG 劃分為不同的計算階段 (Stage):

  • 對於窄依賴,由於分割槽的依賴關係是確定的,其轉換操作可以在同一個執行緒執行,所以可以劃分到同一個執行階段;
  • 對於寬依賴,由於 Shuffle 的存在,只能在父 RDD(s) 被 Shuffle 處理完成後,才能開始接下來的計算,因此遇到寬依賴就需要重新劃分階段。

參考資料

  1. 張安站 . Spark 技術內幕:深入解析 Spark 核心架構設計與實現原理[M] . 機械工業出版社 . 2015-09-01
  2. RDD Programming Guide
  3. RDD:基於記憶體的叢集計算容錯抽象

更多大資料系列文章可以參見 GitHub 開源專案: 大資料入門指南