1. 程式人生 > >spark RDD概念及其運算元舉例講解

spark RDD概念及其運算元舉例講解

 

作為之前對spark RDD可以說是完全不懂的小白,在閱讀部分網友的部落格的基礎上,我從自己理解的角度和方式來記錄一下自己學習spark RDD的過程。

目錄

一、RDD介紹

1.1 RDD是什麼

1.2 Spark與RDD的關係

1.3 為什麼會產生RDD

1.4 RDD底層實現原理

1.5 RDD的操作

二、通過例子學習spark RDD的操作

2.1 Action函式

foreach、foreachPartition、collect、subtract、reduce、treeReduce、fold、aggregate、aggregateByKey、count、countByValue、take、first、takeOrdered、top、max、isEmpty、saveAsTextFile、keyBy、keys

2.2 Transformations函式

map、floatMap、mapPartitions、mapPartitionsWithIndex、filter、distinct、sample、union,++、intersection、glom、zip、zipParititions、zipWithIndex、sortBy、sortByKey 

三、Spark RDD的容錯機制

3.1 checkpoint機制

3.2 Lineage機制

參考文獻


一、RDD介紹

1.1 RDD是什麼

RDD:Spark的核心概念是RDD (resilient distributed dataset),指的是一個只讀的,可分割槽的彈性分散式資料集,這個資料集的全部或部分可以快取在記憶體中,在多次計算間重用。

我們看下官方對rdd特性的介紹:

  1. A list of partitions
  2. A function for computing each split
  3. A list of dependencies on other RDDs
  4. Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
  5. Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

下面對RDD的五個特性進行解釋:

  1. 有一個分片列表。就是能被切分,和hadoop一樣的,能夠切分的資料才能平行計算。
  2. 每一個切片(分割槽)使用同一個函式計算。 
  3. 對其他的RDD的依賴列表, 即RDD具有血統。依賴還具體分為寬依賴和窄依賴,但並不是所有的RDD都有依賴。 
  4. 可選:key-value型的RDD是根據雜湊來分割槽的,類似於mapreduce當中的Paritioner介面,控制key分到哪個reduce。 
  5. 可選:每一個分片的優先計算位置(preferred locations),對於每一個切片(分割槽)都會偏向於選擇就近的進行計算。

看一下這5個特性的函式原型: 

  //特性1 
  protected def getPartitions: Array[Partition]  
  //特性2 對一個分片進行計算,得出一個可遍歷的結果
  def compute(split: Partition, context: TaskContext): Iterator[T]
  //特性3 只計算一次,計算RDD對父RDD的依賴
  protected def getDependencies: Seq[Dependency[_]] = deps
  //特性4 可選的,分割槽的方法
  @transient val partitioner: Option[Partitioner] = None
  //特性5 可選的,指定優先位置,輸入引數是split分片,輸出結果是一組優先的節點位置
  protected def getPreferredLocations(split: Partition): Seq[String] = Nil

1.2 Spark與RDD的關係

(1)為什麼會有Spark?因為傳統的平行計算模型無法有效的解決迭代計算(iterative)和互動式計算(interactive);而Spark的使命便是解決這兩個問題,這也是他存在的價值和理由。

(2)Spark如何解決迭代計算?其主要實現思想就是RDD,把所有計算的資料儲存在分散式的記憶體中。迭代計算通常情況下都是對同一個資料集做反覆的迭代計算,資料在記憶體中將大大提升IO操作。這也是Spark涉及的核心:記憶體計算。

(3)Spark如何實現互動式計算?因為Spark是用scala語言實現的,Spark和scala能夠緊密的整合,所以Spark可以完美的運用scala的直譯器,使得其中的scala可以向操作本地集合物件一樣輕鬆操作分散式資料集。

(4)Spark和RDD的關係?可以理解為:RDD是一種具有容錯性基於記憶體的叢集計算抽象方法,Spark則是這個抽象方法的實現

1.3 為什麼會產生RDD

(1)傳統的MapReduce雖然具有自動容錯、平衡負載和可拓展性的優點,但是其最大缺點是採用非迴圈式的資料流模型,使得在迭代計算式要進行大量的磁碟IO操作。RDD正是解決這一缺點的抽象方法

(2)RDD是Spark提供的最重要的抽象的概念,它是一種有容錯機制的特殊集合,可以分佈在叢集的節點上,以函數語言程式設計操作集合的方式,進行各種並行操作。它提供了一種只讀、只能由已存在的RDD變換而來的共享記憶體,然後將所有資料都載入到記憶體中,方便進行多次重用。

a.它是分散式的,可以分佈在多臺機器上,進行計算。 
b.它是彈性的,計算過程中記憶體不夠時它會和磁碟進行資料交換(快取管理)。 
c.這些限制可以極大的降低自動容錯開銷 
d.實質是一種更為通用的迭代平行計算框架,使用者可以顯示地控制計算的中間結果,然後將其自由運用於之後的計算。

1.4 RDD底層實現原理

RDD是一個分散式資料集,顧名思義,其資料應該分部儲存於多臺機器上。事實上,每個RDD的資料都以Block的形式儲存於多臺機器上,下圖是Spark的RDD儲存架構圖,其中每個Executor會啟動一個BlockManagerSlave,並管理一部分Block;而Block的元資料由Driver節點的BlockManagerMaster儲存。BlockManagerSlave生成Block後向BlockManagerMaster註冊該Block,BlockManagerMaster管理RDD與Block的關係,當RDD不再需要儲存的時候,將向BlockManagerSlave傳送指令刪除相應的Block。
 

1.5 RDD的操作

a.Actions:對資料集計算後返回一個數值value給驅動程式;例如:Reduce將資料集的所有元素用某個函式聚合後,將最終結果返回給程式。

b.Transformation:根據資料集建立一個新的資料集,計算後返回一個新RDD;例如:Map將資料的每個元素經過某個函式計算後,返回一個新的分散式資料集。
 

RDD支援兩種操作:轉換(transformation)動作(actions)。

轉換(transformation):根據資料集建立一個新的資料集,計算後返回一個新RDD;而動作(actions)在資料集上執行計算後,返回一個值給驅動程式。 例如,map就是一種轉換,它將資料集每一個元素都傳遞給函式,並返回一個新的分佈資料集表示結果。另一方面,reduce是一種動作,通過一些函式將所有的元素疊加起來,並將最終結果返回給Driver程式。

Spark中的所有轉換都是惰性的,也就是說,它們並不會直接計算結果。相反的,它們只是記住應用到基礎資料集(上的這些轉換動作。只有當發生一個要求返回結果給Driver的動作時,這些轉換才會真正執行。這個設計讓Spark更加有效率的執行。例如,我們可以實現:通過map建立的一個新資料集,並在reduce中使用,最終只返回reduce的結果給driver,而不是整個大的新資料集。

預設情況下,每一個轉換過的RDD都會在執行一個動作時被重新計算。不過,可以使用persist(或者cache)方法,持久化一個RDD在記憶體中。在這種情況下,Spark將會在叢集中,儲存相關元素,下次查詢這個RDD時,它將能更快速訪問。在磁碟上持久化資料集,或在叢集間複製資料集也是支援的。

二、通過例子學習spark RDD的操作

在這裡我介紹下常見的Action和Transformations函式的操作。

在spark新版中,也許會有更多的action和transformation,可以參照spark的主頁 。詳見參考文獻【1】。

2.1 Action函式

Action函式會產生任務,並會把任務提交到spark叢集中。 
注意:這些函式一般的返回值都是Unit。

主要包括以下Action函式:

foreach、foreachPartition、collect、subtract、reduce、treeReduce、fold、aggregate、aggregateByKey、count、countByValue、take、first、takeOrdered、top、max、isEmpty、saveAsTextFile、keyBy、keys


foreach

功能說明 

在RDD的所有元素使用函式。

函式原型

def foreach(f: T => Unit): Unit 

使用例子

注意:若spark是叢集模型,會看不到foreach的輸出。

scala> val r1 = sc.parallelize(Array("a", "b", "c", "d"))
r1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[19] at parallelize at <console>:24

scala> r1.foreach(item => println("hello " + item))

若是在單機的spark上,可以看到以下效果:

scala> val r1 = sc.parallelize(Array("a", "b", "c", "d"))
r1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> r1.foreach(item => println("hello " + item))
[Stage 0:>                                                          (0 + 0) / 2]hello a
hello b
hello c
hello d

foreachPartition

功能說明 

在RDD的每個分割槽上使用同一個處理函式。

函式原型

def foreachPartition(f: Iterator[T] => Unit): Unit

使用例子

檢視每個分割槽的資料量大小。

scala> val r1 = sc.parallelize(Array("a", "b", "c", "d", "e", "f", "g", "h", "i"), 3)
r1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at <console>:24

scala> r1.foreachPartition(c=>println(c.length))
3
3
3

collect

功能說明 

把RDD的所有元素都返回到本地的陣列中。 
該函式可以用於列印RDD的內容,在除錯的時候非常有用。但注意:若資料量太大,可能導致OOM。

函式原型

def collect(): Array[T] 
def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U]

使用例子

scala> val r1 = sc.parallelize(Array("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k"), 3)
r1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at parallelize at <console>:24

scala> r1.foreachPartition(c=>println(c.length))
3
4
4

scala> r1.collect()
res23: Array[String] = Array(a, b, c, d, e, f, g, h, i, j, k)

subtract

功能說明 

執行標準的集合相減操作:A-B。 不去重
注意:在使用該函式時,要保證兩個RDD的元素型別是相同的。

函式原型

def subtract(other: RDD[T]): RDD[T]
def subtract(other: RDD[T], numPartitions: Int): RDD[T]
def subtract(other: RDD[T], p: Partitioner): RDD[T]

使用例子

scala> val r1 = sc.parallelize(1 to 10, 3)
r1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at <console>:24

scala> val r2 = sc.parallelize(5 to 15, 3)
r2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24

scala> val r3 = r1.subtract(r2)
res21: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[28] at subtract at <console>:29

scala> r2.collect()
res26: Array[Int] = Array(5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15)

scala> r1.collect()
res27: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> r3.collect()
res28: Array[Int] = Array(3, 1, 4, 2)

reduce

功能說明 

reduce將RDD中元素前兩個傳給輸入函式,產生一個新的return值,新產生的return值與RDD中下一個元素(第三個元素)組成兩個元素,再被傳給輸入函式,直到最後只有一個值為止。

函式原型

def reduce(f: (T, T) => T): T 

使用例子

val c = sc.parallelize(1 to 10)
c.reduce((x, y) => x + y)//結果55

具體過程,RDD有1 2 3 4 5 6 7 8 9 10個元素, 
1+2=3 
3+3=6 
6+4=10 
10+5=15 
15+6=21 
21+7=28 
28+8=36 
36+9=45 
45+10=55

另外的一個例子

scala> val a = sc.parallelize(1 to 100, 3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[33] at parallelize at <console>:24

scala> a.reduce(_+_)
res29: Int = 5050

// 和map結合使用
scala> a.map(_*2).reduce(_+_)
res30: Int = 10100

treeReduce

功能說明 

和reduce函式的功能相似,不同的是該函式通過tree的方式對結果進行聚合。

函式原型

def  treeReduce(f: (T, T) ⇒ T, depth: Int = 2): T

使用例子

val z = sc.parallelize(List(1,2,3,4,5,6), 2)
z.treeReduce(_+_)
res49: Int = 21

fold

功能說明 

聚合每個分割槽的值。每個分割槽內的聚合變數用zeroValue初始化,然後聚合所有分割槽的結果。 

函式原型

def fold(zeroValue: T)(op: (T, T) => T): T 

使用例子

說到fold()函式,就不得不提一下之前介紹過的reduce()函式,它倆的區別就在於一個初始值。
我們知道,reduce()函式是這樣寫的:

rdd.reduce(func)

引數是一個函式,這個函式的對rdd中的所有資料進行某種操作,比如:

val l = List(1,2,3,4)
l.reduce((x, y) => x + y)

對於這個x,它代指的是返回值,而y是對rdd各元素的遍歷。意思是對 l中的資料進行累加。
flod()函式相比reduce()加了一個初始值引數:

rdd.fold(value)(func)

如下:

val l = List(1,2,3,4)
l.fold(0)((x, y) => x + y)

這個計算其實 0 + 1 + 2 + 3 + 4,而reduce()的計算是:1 + 2 + 3 + 4,沒有初始值,或者說rdd的第一個元素值是它的初始值。

如果,我們設定多個分割槽呢?看下面的例子

scala> val a = sc.parallelize(1 to 100, 3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[33] at parallelize at <console>:24

scala> a.fold(0)(_+_)
res38: Int = 5050

scala> a.fold(1)(_+_)
res38: Int = 5054

scala> a.fold(2)(_+_)
res38: Int = 5058

可以看到分割槽為3個時,1+(1+2+....+100)+1+1+1=5054


aggregate

功能說明 

aggregate函式允許使用者將兩種不同的reduce functions應用於RDD。 在每個分割槽中應用第一個reduce函式,以將每個分割槽內的資料減少為單個結果。第二個reduce函式用於將所有分割槽的不同縮減結果組合在一起,以得出最終結果。內部分割槽與跨分割槽約減具有兩個單獨的約減功能的能力增加了很大的靈活性。 例如,第一個reduce函式可以是max函式,第二個可以是sum函式。 使用者還指定一個初始值。 但這裡要注意:

  • 不要為分割槽計算或組合分割槽假定任何執行順序。
  • 初始值適用於所有的reduce函式。

函式原型

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U 

使用例子

剛才說到reduce()fold(),這兩個函式有一個問題,那就是它們的返回值必須與rdd的資料型別相同。比如上面的例子,l的資料是Int,那麼reduce()flod()返回的也必須是Int
aggregate()函式就打破了這個限制。比如返回(Int, Int)。這很有用,比如要計算平均值的時候。
要算平均值,有兩個值是要求的,一個是rdd的各元素的累加和,另一個是元素計數,我初始化為(0, 0)
那麼就是:

val l = List(1,2,3,4)
l.aggregate(0, 0)(seqOp, combOp)

那麼seqOpcombOp怎麼寫呢?
我們將seqOp寫為:

(x, y) => (x._1 + y, x._2 + 1)

如何理解呢?在講到reduce()函式的時候有:

val l = List(1,2,3,4)
l.reduce((x, y) => x + y)

對於這個x,它代指的是返回值,而y是對rdd各元素的遍歷。
aggregate()這也一樣,x代表返回值,且這裡的返回值是(Int, Int),它有兩個元素,可以用x._1x._2來代指這兩個元素的,y代表對rdd的元素遍歷,可以用x._1 + y表示各個元素的累加和,x._2 + 1就是元素計數。遍歷完成後返回的(Int, Int)就是累加和和元素計數。
按理說有這麼一個函式就應該結束了,那麼後邊那個combOp有什麼作用呢
因為rdd的計算是分散式計算,這個函式是將累加器進行合併的。
例如第一個節點遍歷1和2, 返回的是(3, 2),第二個節點遍歷3和4, 返回的是(7, 2),那麼將它們合併的話就是3 + 7, 2 + 2,用程式寫就是

(x, y) => (x._1 + y._1, x._2 + y._2)

最後程式是這樣的:

val l = List(1,2,3,4)
r = l.aggregate(0, 0)((x, y) => (x._1 + y, x._2 + 1), (x, y) => (x._1 + y._1, x._2 + y._2))
m = r._1 / r._2.toFload

aggregateByKey

功能說明 

aggregate 是聚合意思,直觀理解就是按照Key進行聚合。

轉化: RDD[(K,V)] ==> RDD[(K,U)]

可以看出是返回值的型別不需要和原來的RDD的Value型別一致。

在聚合過程中提供一箇中立的初始值zeroValue。

函式原型

def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) 

使用例子

先看一個有多個分割槽的例子:

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}

/**
  * Created by Edward on 2016/10/27.
  */
object AggregateByKey {
  def main(args: Array[String]) {
    val sparkConf: SparkConf = new SparkConf().setAppName("AggregateByKey")
      .setMaster("local")
    val sc: SparkContext = new SparkContext(sparkConf)

    val data = List((1, 3), (1, 2), (1, 4), (2, 3))
    var rdd = sc.parallelize(data,2)//資料拆分成兩個分割槽

    //合併在不同partition中的值,a,b的資料型別為zeroValue的資料型別
    def comb(a: String, b: String): String = {
      println("comb: " + a + "\t " + b)
      a + b
    }
    //合併在同一個partition中的值, a的資料型別為zeroValue的資料型別,b的資料型別為原value的資料型別
    def seq(a: String, b: Int): String = {
      println("seq: " + a + "\t " + b)
      a + b
    }

    rdd.foreach(println)
    
    //zeroValue 中立值,定義返回value的型別,並參與運算
    //seqOp 用來在一個partition中合併值的
    //comb 用來在不同partition中合併值的
    val aggregateByKeyRDD: RDD[(Int, String)] = rdd.aggregateByKey("100")(seq,comb)

    //列印輸出
    aggregateByKeyRDD.foreach(println)
  

    sc.stop()
  }
}

結果:


//將資料拆分成兩個分割槽

//分割槽一資料
(1,3)
(1,2)
//分割槽二資料
(1,4)
(2,3)

//分割槽一相同key的資料進行合併
seq: 100     3   //(1,3)開始和中立值進行合併  合併結果為 1003
seq: 1003     2   //(1,2)再次合併 結果為 10032

//分割槽二相同key的資料進行合併
seq: 100     4  //(1,4) 開始和中立值進行合併 1004
seq: 100     3  //(2,3) 開始和中立值進行合併 1003

將兩個分割槽的結果進行合併
//key為2的,只在一個分割槽存在,不需要合併 (2,1003)
(2,1003)

//key為1的, 在兩個分割槽存在,並且資料型別一致,合併
comb: 10032     1004
(1,100321004)

再看一個例子:

scala> val data = sc.parallelize(List((1,2),(1,4),(2,3)))

scala> data.aggregateByKey(3)((x,y)=>math.max(x,y) ,(z,m)=>z+m)

scala> val result = data.aggregateByKey(3)((x,y)=>math.max(x,y) ,(z,m)=>z+m)

scala> result.collectres2: Array[(Int, Int)] = Array((1,7), (2,3))


count

功能說明 

返回RDD中所有元素的個數。

函式原型

def count(): Long 

使用例子

scala> val a = sc.parallelize(1 to 100, 3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:24

scala> a.count()
res30: Long = 100

countByValue

功能說明 

返回RDD中每個元素的個數,並按java的Map資料結構的方式返回。

函式原型

def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long]

使用例子

scala> val a = sc.parallelize(1 to 10, 3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:24

scala> a.countByValue
res35: scala.collection.Map[Int,Long] = Map(5 -> 1, 10 -> 1, 1 -> 1, 6 -> 1, 9 -> 1, 2 -> 1, 7 -> 1, 3 -> 1, 8 -> 1, 4 -> 1)

scala> val r1 = sc.parallelize(Array("a", "b", "c", "d", "e", "f", "g", "c", "b", "a", "a"), 3)
r1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at <console>:24

scala> r1.countByValue
res0: scala.collection.Map[String,Long] = Map(e -> 1, f -> 1, a -> 3, b -> 2, g -> 1, c -> 2, d -> 1)

take

功能說明 

從RDD中返回n個元素。 
該函式先會先掃描一個分割槽,若分割槽的元素個數不夠,則會掃描另一個分割槽,直到元素個數足夠。 
注意:該函式會把所有的資料儲存到driver端,所以若take的資料量太大,可能會導致OOM。

函式原型

def take(num: Int): Array[T] 

使用例子

scala> val r1 = sc.parallelize(Array("a", "b", "c", "d", "e", "f", "g", "c", "b", "a", "a"), 3)
r1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at <console>:24

scala> r1.take(3)
res4: Array[String] = Array(a, b, c)

first

功能說明 

查詢RDD的第一個資料項並返回它。

函式原型

def first()

使用例子

scala> val r1 = sc.parallelize(Array("a", "b", "c", "d", "e", "f", "g", "c", "b", "a", "a"), 3)
r1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at <console>:24

scala> r1.first
res1: String = a

takeOrdered

功能說明 

返回RDD的n個元素,並對這n個元素進行排序。

函式原型

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

使用例子

scala> val r1 = sc.parallelize(Array("a", "b", "c", "d", "e", "f", "g", "c", "b", "a", "a"), 3)
r1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at <console>:24

scala> r1.takeOrdered(3)
res6: Array[String] = Array(a, a, a)

scala> val b = sc.parallelize(List("dog", "cat", "ape", "salmon", "gnu"), 2)
b: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[7] at parallelize at <console>:24

scala> b.takeOrdered(2)
res7: Array[String] = Array(ape, cat)

scala> b.takeOrdered(3)
res8: Array[String] = Array(ape, cat, dog)

top

功能說明 

返回RDD的前n個元素。 
返回元素的順序是按takeOrdered相反的順序排列的。

函式原型

def top(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
    takeOrdered(num)(ord.reverse)
  }

使用例子

scala> val b = sc.parallelize(List("dog", "cat", "ape", "salmon", "gnu"), 2)
b: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[7] at parallelize at <console>:24

scala> b.top(3)
res12: Array[String] = Array(salmon, gnu, dog)

max

功能說明 

返回RDD中最大的元素。

函式原型

def max()(implicit ord: Ordering[T]): T 

使用例子

scala> val b = sc.parallelize(List("dog", "cat", "ape", "salmon", "gnu"), 2)
scala> b.max()
res13: String = salmon

isEmpty

功能說明 

判斷一個RDD是否為空。

函式原型

def isEmpty(): Boolean

使用例子

scala> val b = sc.parallelize(List("dog", "cat", "ape", "salmon", "gnu"), 2)
b: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[7] at parallelize at <console>:24

scala> b.isEmpty()
res16: Boolean = false

saveAsTextFile

功能說明 

把RDD的內容儲存到text檔案中。

函式原型

def saveAsTextFile(path: String): Unit
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit

使用例子

scala> val b = sc.parallelize(List("dog", "cat", "ape", "salmon", "gnu"), 3)
scala> b.saveAsTextFile("/user/zxh/testdata/rddresult1/")
$ hadoop fs -cat /user/zxh/testdata/rddresult1/*
dog
cat
ape
salmon
gnu

keyBy

功能說明 

通過在每個資料項上應用一個函式來構造兩部分元組(鍵 - 值對)。函式的結果成為鍵,原始資料項成為新建立的元組的值。

函式原型

def keyBy[K](f: T => K): RDD[(K, T)]

使用例子

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.keyBy(_.length)
b.collect
res26: Array[(Int, String)] = Array((3,dog), (6,salmon), (6,salmon), (3,rat), (8,elephant))

keys

功能說明 

從所有包含的元組中提取key,並將它們返回到新的RDD中。

函式原型

def keys: RDD[K]

使用例子

scala> val b = sc.parallelize(List("dog", "cat", "ape", "salmon", "gnu"), 3)
b: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> b.collect()
res2: Array[String] = Array(dog, cat, ape, salmon, gnu)

scala> val c = b.keyBy(_.length)
c: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[4] at keyBy at <console>:26

scala> c.collect()
res4: Array[(Int, String)] = Array((3,dog), (3,cat), (3,ape), (6,salmon), (3,gnu))

scala> c.keys
res5: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at keys at <console>:29

scala> c.keys.collect()
res6: Array[Int] = Array(3, 3, 3, 6, 3)


2.2 Transformations函式

所有的Transformations函式完成後會返回一個新的RDD

會介紹以下的Transformations函式

map、floatMap、mapPartitions、mapPartitionsWithIndex、filter、distinct、sample、union,++、intersection、glom、zip、zipParititions、zipWithIndex、sortBy、sortByKey 

在講解部分例子的時候測試的資料如下:

$ hadoop fs -cat /user/zxh/pdata/pdata
3350,province_name,上海,5.0
3349,province_name,四川,4.0
3348,province_name,湖南,11.0
3348,province_name,河北,11.0

map

功能說明 

在RDD的每個item上使用transformation函式,結果返回一個新的RDD。

函式原型

def map[U: ClassTag](f: T => U): RDD[U]

使用例子

// 構建一個rdd
scala> val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"))
// 通過map計算rdd每個成員的長度
scala> val b = a.map(_.length)
// 列印rdd
scala> b.collect().foreach(println)

在spark-shell中執行以上程式,得到的結果如下:

3
6
6
3
8

檢視一下得到的rdd的型別

scala> b
res5: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at map at <console>:26

可以看出該rdd的型別是MapPartitionsRDD,該RDD是通過在父RDD上通過map運算而得到的。


floatMap

功能說明 

類似於map,與map相比flatMap有兩個很大的區別: 
(1) flatMap允許在map函式的基礎上擴充套件成多個成員。 
(2) flatMap會將一個長度為N的RDD轉換成一個N個元素的集合,然後再把這N個元素合成到一個單個RDD的結果集。

函式原型

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] 

使用例子

scala> val a = sc.parallelize(1 to 10)
// 擴充套件a的元素,把a的每個元素擴充套件成flatMap中的元素
scala> val b = a.flatMap(1 to _)
// 得到最後的c的結果,可以看到把a的每個元素都擴充套件成1 到 該元素 的多個值。
scala> val c = b.collect()
c: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

flatMap 和 map

通過例子來檢視兩者的區別:

flatMap的例子

val lines = sc.textFile("/user/zxh/pdata/pdata")
val rs1 = lines.flatMap(line=>line.split(","))
rs1.collect()

flatMap的輸出如下:

res5: Array[String] = Array(3350, province_name, 上海, 5.0, 3349, province_name, 四川, 4.0, 3348, province_name, 湖南, 11.0, 3348, province_name, 河北, 11.0)

map的例子

val lines = sc.textFile("/user/zxh/pdata/pdata")
val rs2 = lines.map(line => line.split(","))
rs2.collect()
res6: Array[Array[String]] = Array(Array(3350, province_name, 上海, 5.0), Array(3349, province_name, 四川, 4.0), Array(3348, province_name, 湖南, 11.0), Array(3348, province_name, 河北, 11.0))

小結 :可以看到flatMap把最後的結果都合併到一個RDD的集合中了,而map是在每個item上輸出是什麼就保留什麼元素,不合合併到一個集合中。


mapPartitions

功能說明 

該函式和map函式類似,只不過對映函式的引數由RDD中的每一個元素變成了RDD中每一個分割槽的迭代器。如果在對映的過程中需要頻繁建立額外的物件,使用mapPartitions要比map高效的多。

比如,將RDD中的所有資料通過JDBC連線寫入資料庫,如果使用map函式,可能要為每一個元素都建立一個connection,這樣開銷很大,如果使用mapPartitions,那麼只需要針對每一個分割槽建立一個connection。

引數preservesPartitioning表示是否保留父RDD的partitioner分割槽資訊。

函式原型

def mapPartitions[U](f: (Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]

使用例子

var rdd1 = sc.makeRDD(1 to 5,2)
//rdd1有兩個分割槽
scala> var rdd3 = rdd1.mapPartitions{ x => {
     | var result = List[Int]()
     |     var i = 0
     |     while(x.hasNext){
     |       i += x.next()
     |     }
     |     result.::(i).iterator
     | }}
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[84] at mapPartitions at :23
 
//rdd3將rdd1中每個分割槽中的數值累加
scala> rdd3.collect
res65: Array[Int] = Array(3, 12)
scala> rdd3.partitions.size
res66: Int = 2

mapPartitionsWithIndex

功能說明 

函式作用同mapPartitions,不過提供了兩個引數,第一個引數為分割槽的索引。

函式原型

def mapPartitionsWithIndex[U](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]

使用例子

var rdd1 = sc.makeRDD(1 to 5,2)
//rdd1有兩個分割槽
var rdd2 = rdd1.mapPartitionsWithIndex{
        (x,iter) => {
          var result = List[String]()
            var i = 0
            while(iter.hasNext){
              i += iter.next()
            }
            result.::(x + "|" + i).iterator
           
        }
      }
//rdd2將rdd1中每個分割槽的數字累加,並在每個分割槽的累加結果前面加了分割槽索引
scala> rdd2.collect
res13: Array[String] = Array(0|3, 1|12)

filter

功能說明 

在RDD物件物件上使用filter函式,並返回滿足條件的新的RDD。

函式原型

def filter(f: T => Boolean): RDD[T]

使用例子

例子1

scala> val a = sc.parallelize(1 to 10)
scala> val b = a.filter(_>2)
scala> b.collect()
    res3: Array[Int] = Array(3, 4, 5, 6, 7, 8, 9, 10)

例子2

val lines = sc.textFile("/user/zxh/pdata/pdata")
val r = lines.flatMap(line=>line.split(",")).filter(_.length>5)
scala> r.collect()
res9: Array[String] = Array(province_name, province_name, province_name, province_name)

// 按元素長度進行過濾
scala> val r = lines.flatMap(line=>line.split(",")).filter(_.length>3)
r: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[33] at filter at <console>:26
scala> r.collect()
res11: Array[String] = Array(3350, province_name, 3349, province_name, 3348, province_name, 11.0, 3348, province_name, 11.0)

// 在每個元素中使用函式contains進行過濾
scala> val r = lines.flatMap(line=>line.split(",")).filter(_.contains("33"))
r: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[39] at filter at <console>:26
scala> r.collect()
res14: Array[String] = Array(3350, 3349, 3348, 3348)

distinct

功能說明 

對RDD的元素去重。 返回一個包含每個唯一值一次的新RDD。

函式原型

def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

使用例子

scala> val r = lines.flatMap(line=>line.split(",")).filter(_.length>3)
r: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[59] at filter at <console>:26

// 這裡輸出的是有重複的元素
scala> r.collect()
res20: Array[String] = Array(3350, province_name, 3349, province_name, 3348, province_name, 11.0, 3348, province_name, 11.0)

// 呼叫distinct()後這輸出的是去重後的元素
scala> r.distinct().collect()
res21: Array[String] = Array(province_name, 3348, 11.0, 3350, 3349)

sample

功能說明 

隨機選擇RDD專案的一部分資料,並將其返回到新的RDD中。

函式原型

 def sample(
      withReplacement: Boolean,
      fraction: Double,
      seed: Long = Utils.random.nextLong): RDD[T]

withReplacement:是否要替換

fraction:原來的RDD元素大小的百分比

seed:隨機數產生器的seed

使用例子

val a = sc.parallelize(1 to 10000, 3)
scala> a.sample(false,0.1,0).count()
res4: Long = 1032

scala> a.sample(false,0.3,0).count()
res5: Long = 2997

scala> a.sample(false,0.2,0).count()
res6: Long = 2018

union,++

功能說明 

執行標準集合操作:A聯合B。 
若元素有重複,會保留重複的元素。

函式原型

def union(other: RDD[T]): RDD[T] 

使用例子

scala> val a = sc.parallelize(1 to 5)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24

scala> val b = sc.parallelize(6 to 9)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:24

scala> a.union(b).collect()
res7: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)

scala> (a ++ b).collect()
res9: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)

intersection

功能說明 

該函式返回兩個RDD的交集,並且去重

函式原型

def intersection(other: RDD[T]): RDD[T]
def intersection(other: RDD[T], numPartitions: Int): RDD[T]
def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]

使用例子

scala> var rdd1 = sc.makeRDD(1 to 2,1)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[45] at makeRDD at :21
 
scala> rdd1.collect
res42: Array[Int] = Array(1, 2)
 
scala> var rdd2 = sc.makeRDD(2 to 3,1)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[46] at makeRDD at :21
 
scala> rdd2.collect
res43: Array[Int] = Array(2, 3)
 
scala> rdd1.intersection(rdd2).collect
res45: Array[Int] = Array(2)
 
scala> var rdd3 = rdd1.intersection(rdd2)
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[59] at intersection at :25
 
scala> rdd3.partitions.size
res46: Int = 1
 
scala> var rdd3 = rdd1.intersection(rdd2,2)
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[65] at intersection at :25
 
scala> rdd3.partitions.size
res47: Int = 2

glom

功能說明 

建立一個新的RDD,該RDD把會將各個分割槽的所有元素合併到同一個陣列中,若有多個分割槽,就會得到一個有多個數組的集合。

函式原型

def glom(): RDD[Array[T]]

使用例子

scala> val a = sc.parallelize(1 to 10, 3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24

scala> a.collect()
res1: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

// 檢視一下分割槽的個數
scala> a.partitions.length
res2: Int = 3

// 呼叫了glom合併分割槽的資料
scala> val b = a.glom()
b: org.apache.spark.rdd.RDD[Array[Int]] = MapPartitionsRDD[3] at glom at <console>:26

// 每個分割槽的資料組成了一個array
scala> b.collect()
res4: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10))

zip

功能說明 

將兩個分割槽中的第n個分割槽相互組合,從而連線兩個RDD。 生成的RDD將由兩部分元組組成,這些元組被解釋為鍵-值(key-value)對。 
注意:使用該函式時,兩個RDD的分割槽和元素個數必須一樣,否則將會報錯。見例子2。

函式原型

def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]

使用例子

scala> val a = sc.parallelize(1 to 10, 3)
scala> val b = sc.parallelize(11 to 20, 3)
scala> val c = a.zip(b)
c: org.apache.spark.rdd.RDD[(Int, Int)] = ZippedPartitionsRDD2[9] at zip at <console>:28
scala> c.collect()
res8: Array[(Int, Int)] = Array((1,11), (2,12), (3,13), (4,14), (5,15), (6,16), (7,17), (8,18), (9,19), (10,20))

zipParititions

功能說明 

和zip的功能相似,但可以提供更多的控制。

函式原型

def zipPartitions[B: ClassTag, V: ClassTag]
      (rdd2: RDD[B], preservesPartitioning: Boolean)
      (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V]

def zipPartitions[B: ClassTag, V: ClassTag]
      (rdd2: RDD[B])
      (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] 

def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag]
      (rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)
      (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V]

def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag]
      (rdd2: RDD[B], rdd3: RDD[C])
      (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] 

def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag]
      (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean)
      (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V]

def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag]
      (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])
      (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V]

使用例子

val a = sc.parallelize(0 to 9, 3)
val b = sc.parallelize(10 to 19, 3)
val c = sc.parallelize(100 to 109, 3)
def myfunc(aiter: Iterator[Int], biter: Iterator[Int], citer: Iterator[Int]): Iterator[String] =
{
  var res = List[String]()
  while (aiter.hasNext && biter.hasNext && citer.hasNext)
  {
    val x = aiter.next + " " + biter.next + " " + citer.next
    res ::= x
  }
  res.iterator
}
a.zipPartitions(b, c)(myfunc).collect
res50: Array[String] = Array(2 12 102, 1 11 101, 0 10 100, 5 15 105, 4 14 104, 3 13 103, 9 19 109, 8 18 108, 7 17 107, 6 16 106)

zipWithIndex

功能說明 

使用元素索引來壓縮RDD的元素。索引從0開始。如果RDD分佈在多個分割槽上,則啟動一個Spark作業來執行此操作。

函式原型

def zipWithIndex(): RDD[(T, Long)]

使用例子

// 字串的例子
scala> val r1 = sc.parallelize(Array("a", "b", "c", "d"))
r1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[13] at parallelize at <console>:24

scala> val r2 = r1.zipWithIndex
r2: org.apache.spark.rdd.RDD[(String, Long)] = ZippedWithIndexRDD[14] at zipWithIndex at <console>:26

scala> r2.collect()
res10: Array[(String, Long)] = Array((a,0), (b,1), (c,2), (d,3))


// 整數型別的例子
scala> val z = sc.parallelize(1 to 10, 5)
z: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[16] at parallelize at <console>:24

scala> val r2 = z.zipWithIndex
r2: org.apache.spark.rdd.RDD[(Int, Long)] = ZippedWithIndexRDD[18] at zipWithIndex at <console>:26

scala> r2.collect()
res11: Array[(Int, Long)] = Array((1,0), (2,1), (3,2), (4,3), (5,4), (6,5), (7,6), (8,7), (9,8), (10,9))

scala> r2.partitions.length
res12: Int = 5

sortBy

sortBy函式是在org.apache.spark.rdd.RDD類中實現的,它的實現如下:


/**

 * Return this RDD sorted by the given key function.

 */

def sortBy[K](

    f: (T) => K,

    ascending: Boolean = true,

    numPartitions: Int = this.partitions.size)

    (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] =

  this.keyBy[K](f)

      .sortByKey(ascending, numPartitions)

      .values

該函式最多可以傳三個引數:

第一個引數是一個函式,該函式的也有一個帶T泛型的引數,返回型別和RDD中元素的型別是一致的;
第二個引數是ascending,從字面的意思大家應該可以猜到,是的,這引數決定排序後RDD中的元素是升序還是降序,預設是true,也就是升序;
第三個引數是numPartitions,該引數決定排序後的RDD的分割槽個數,預設排序後的分割槽個數和排序之前的個數相等,即為this.partitions.size


從sortBy函式的實現可以看出,第一個引數是必須傳入的,而後面的兩個引數可以不傳入。而且sortBy函式函式的實現依賴於sortByKey函式,關於sortByKey函式後面會進行說明。keyBy函式也是RDD類中進行實現的,它的主要作用就是將將傳進來的每個元素作用於f(x)中,並返回tuples型別的元素,也就變成了Key-Value型別的RDD了,它的實現如下:

/**
* Creates tuples of the elements in this RDD by applying `f`.
*/
def keyBy[K](f: T => K): RDD[(K, T)] = {
    map(x => (f(x), x))
}

那麼,如何使用sortBy函式呢?

scala> val data = List(3,1,90,3,5,12)
data: List[Int] = List(3, 1, 90, 3, 5, 12)
 
scala> val rdd = sc.parallelize(data)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:14
 
scala> rdd.collect
res0: Array[Int] = Array(3, 1, 90, 3, 5, 12)
 
scala> rdd.sortBy(x => x).collect
res1: Array[Int] = Array(1, 3, 3, 5, 12, 90)
 
scala> rdd.sortBy(x => x, false).collect
res3: Array[Int] = Array(90, 12, 5, 3, 3, 1)
 
scala> val result = rdd.sortBy(x => x, false)
result: org.apache.spark.rdd.RDD[Int] = MappedRDD[23] at sortBy at <console>:16
 
scala> result.partitions.size
res9: Int = 2
 
scala> val result = rdd.sortBy(x => x, false, 1)
result: org.apache.spark.rdd.RDD[Int] = MappedRDD[26] at sortBy at <console>:16
 
scala> result.partitions.size
res10: Int = 1

上面的例項對rdd中的元素進行升序排序。並對排序後的RDD的分割槽個數進行了修改,上面的result就是排序後的RDD,預設的分割槽個數是2,而我們對它進行了修改,所以最後變成了1。


sortByKey

sortByKey函式作用於Key-Value形式的RDD,並對Key進行排序。它是在org.apache.spark.rdd.OrderedRDDFunctions中實現的,實現如下

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size)
    : RDD[(K, V)] =
{
  val part = new RangePartitioner(numPartitions, self, ascending)
  new ShuffledRDD[K, V, V](self, part)
    .setKeyOrdering(if (ascending) ordering else ordering.reverse)
}

從函式的實現可以看出,它主要接受兩個函式,含義和sortBy一樣,這裡就不進行解釋了。該函式返回的RDD一定是ShuffledRDD型別的,因為對源RDD進行排序,必須進行Shuffle操作,而Shuffle操作的結果RDD就是ShuffledRDD。其實這個函式的實現很優雅,裡面用到了RangePartitioner,它可以使得相應的範圍Key資料分到同一個partition中,然後內部用到了mapPartitions對每個partition中的資料進行排序,而每個partition中資料的排序用到了標準的sort機制,避免了大量資料的shuffle。下面對sortByKey的使用進行說明:

scala> val a = sc.parallelize(List("wyp", "iteblog", "com", "397090770", "test"), 2)
a: org.apache.spark.rdd.RDD[String] =
ParallelCollectionRDD[30] at parallelize at <console>:12
 
scala> val b = sc. parallelize (1 to a.count.toInt , 2)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[31] at parallelize at <console>:14
 
scala> val c = a.zip(b)
c: org.apache.spark.rdd.RDD[(String, Int)] = ZippedPartitionsRDD2[32] at zip at <console>:16
 
scala> c.sortByKey().collect
res11: Array[(String, Int)] = Array((397090770,4), (com,3), (iteblog,2), (test,5), (wyp,1))

上面對Key進行了排序。細心的讀者可能會問,sortBy函式中的第一個引數可以對排序方式進行重寫。為什麼sortByKey沒有呢?難道只能用預設的排序規則。不是,是有的。其實在OrderedRDDFunctions類中有個變數ordering它是隱形的:private val ordering = implicitly[Ordering[K]]。他就是預設的排序規則,我們可以對它進行重寫,如下:

scala> val b = sc.parallelize(List(3,1,9,12,4))
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[38] at parallelize at <console>:12
 
scala> val c = b.zip(a)
c: org.apache.spark.rdd.RDD[(Int, String)] = ZippedPartitionsRDD2[39] at zip at <console>:16
 
scala> c.sortByKey().collect
res15: Array[(Int, String)] = Array((1,iteblog), (3,wyp), (4,test), (9,com), (12,397090770))
 
scala> implicit val sortIntegersByString = new Ordering[Int]{
     | override def compare(a: Int, b: Int) =
     | a.toString.compare(b.toString)}
sortIntegersByString: Ordering[Int] = [email protected]
 
scala>  c.sortByKey().collect
res17: Array[(Int, String)] = Array((1,iteblog), (12,397090770), (3,wyp), (4,test), (9,com))

例子中的sortIntegersByString就是修改了預設的排序規則。這樣將預設按照Int大小排序改成了對字串的排序,所以12會排序在3之前。

三、Spark RDD的容錯機制

一般而言,RDD容錯性具備兩種方式:資料檢查點記錄資料的更新

  • checkpoint機制——資料檢查點
  • 記錄更新機制(即Lineage機制)

3.1 checkpoint機制

checkpoint的意思是建立檢查點,類似於快照,傳統的Spark任務計算過程中,DAG特別長,叢集需要將整個DAG計算完成得到結果,但是如果在這個漫長的計算過程中出現數據丟失,Spark又會根據依賴關係重新從頭開始計算一遍結果,這樣很浪費效能,當然我們可以考慮將中間計算結果cache或者persist到記憶體或者磁碟中,但是這樣不能保證資料完全不丟失,儲存的記憶體出現了問題或者磁碟壞掉,也會導致Spark再次根據RDD重新再次計算一遍,所以就出現了checkpoint機制,其中checkpoint的作用就是將DAG中比較重要的中間計算結果儲存到一個高可用的地方,通常這個地方是HDFS裡面。 

另外需要注意cache和checkpoint機制之間的區別:

  1. 檢查點是新建一個job來完成的,是執行的完一個job之後,新建一個來完成的;而cache,是job執行過程中進行。
  2. 檢查點對RDD的checkpoint是將資料的血統截斷,只儲存了想要儲存的RDD在HDFS中,而cache的是計算血統的資料在記憶體中。
  3. 快取的清除方式也不一樣,checkpoint到HDFS中的RDD需要手動清除,如果不手動清除,會一直存在,可以被下一個驅動程式所使用;而cache到記憶體和persist到磁碟的partition, 由 blockManager 管理。一旦 driver program 執行結束,也就是 executor 所在程序 CoarseGrainedExecutorBackend stop,blockManager 也會 stop,被 cache 到磁碟上的 RDD 也會被清空(整個 blockManager 使用的 local 資料夾被刪除)。

checkpoint機制不足

操作成本高,需要通過資料中心的網路連線在機器之間複製龐大的資料集,而網路頻寬往往比記憶體頻寬低得多,同時還需要消耗更多的儲存資源。因此Spark側重於記錄更新的方式,即血統機制。

3.2 Lineage機制

RDD只支援粗粒度轉換,即只記錄單個塊上執行的單個操作,然後將建立RDD的一系列變換序列(每個RDD都包含了它是如何由其他RDD變換過來的以及如何重建某一塊資料的資訊。因此RDD的容錯機制又稱“血統(Lineage)”容錯)記錄下來,以便恢復丟失的分割槽。 Lineage本質上很類似於資料庫中的重做日誌(Redo Log),只不過這個重做日誌粒度很大,是對全域性資料做同樣的重做進而恢復資料。

相比其他系統的細顆粒度的記憶體資料更新級別的備份或者LOG機制,RDD的Lineage記錄的是粗顆粒度的特定資料Transformation操作(如filter、map、join等)行為。當這個RDD的部分分割槽資料丟失時,它可以通過Lineage獲取足夠的資訊來重新運算和恢復丟失的資料分割槽。因為這種粗顆粒的資料模型,