1. 程式人生 > >理解Spark的核心RDD

理解Spark的核心RDD

與許多專有的大資料處理平臺不同,Spark建立在統一抽象的RDD之上,使得它可以以基本一致的方式應對不同的大資料處理場景,包括MapReduce,Streaming,SQL,Machine Learning以及Graph等。這即Matei Zaharia所謂的“設計一個通用的程式設計抽象(Unified Programming Abstraction)。這正是Spark這朵小火花讓人著迷的地方。

要理解Spark,就需得理解RDD。

RDD是什麼?

RDD,全稱為Resilient Distributed Datasets,是一個容錯的、並行的資料結構,可以讓使用者顯式地將資料儲存到磁碟和記憶體中,並能控制資料的分割槽。同時,RDD還提供了一組豐富的操作來操作這些資料。在這些操作中,諸如map、flatMap、filter等轉換操作實現了monad模式,很好地契合了Scala的集合操作。除此之外,RDD還提供了諸如join、groupBy、reduceByKey等更為方便的操作(注意,reduceByKey是action,而非transformation),以支援常見的資料運算。

通常來講,針對資料處理有幾種常見模型,包括:Iterative Algorithms,Relational Queries,MapReduce,Stream Processing。例如Hadoop MapReduce採用了MapReduces模型,Storm則採用了Stream Processing模型。RDD混合了這四種模型,使得Spark可以應用於各種大資料處理場景。

RDD作為資料結構,本質上是一個只讀的分割槽記錄集合。一個RDD可以包含多個分割槽,每個分割槽就是一個dataset片段。RDD可以相互依賴。如果RDD的每個分割槽最多隻能被一個Child RDD的一個分割槽使用,則稱之為narrow dependency;若多個Child RDD分割槽都可以依賴,則稱之為wide dependency。不同的操作依據其特性,可能會產生不同的依賴。例如map操作會產生narrow dependency,而join操作則產生wide dependency。

Spark之所以將依賴分為narrow與wide,基於兩點原因。

首先,narrow dependencies可以支援在同一個cluster node上以管道形式執行多條命令,例如在執行了map後,緊接著執行filter。相反,wide dependencies需要所有的父分割槽都是可用的,可能還需要呼叫類似MapReduce之類的操作進行跨節點傳遞。

其次,則是從失敗恢復的角度考慮。narrow dependencies的失敗恢復更有效,因為它只需要重新計算丟失的parent partition即可,而且可以並行地在不同節點進行重計算。而wide dependencies牽涉到RDD各級的多個Parent Partitions。下圖說明了narrow dependencies與wide dependencies之間的區別:

本圖來自Matei Zaharia撰寫的論文An Architecture for Fast and General Data Processing on Large Clusters。圖中,一個box代表一個RDD,一個帶陰影的矩形框代表一個partition。

RDD如何保障資料處理效率?

RDD提供了兩方面的特性persistence和patitioning,使用者可以通過persist與patitionBy函式來控制RDD的這兩個方面。RDD的分割槽特性與平行計算能力(RDD定義了parallerize函式),使得Spark可以更好地利用可伸縮的硬體資源。若將分割槽與持久化二者結合起來,就能更加高效地處理海量資料。例如:

input.map(parseArticle _).partitionBy(partitioner).cache()

partitionBy函式需要接受一個Partitioner物件,如:

val partitioner = new HashPartitioner(sc.defaultParallelism)

RDD本質上是一個記憶體資料集,在訪問RDD時,指標只會指向與操作相關的部分。例如存在一個面向列的資料結構,其中一個實現為Int的陣列,另一個實現為Float的陣列。如果只需要訪問Int欄位,RDD的指標可以只訪問Int陣列,避免了對整個資料結構的掃描。

RDD將操作分為兩類:transformation與action。無論執行了多少次transformation操作,RDD都不會真正執行運算,只有當action操作被執行時,運算才會觸發。而在RDD的內部實現機制中,底層介面則是基於迭代器的,從而使得資料訪問變得更高效,也避免了大量中間結果對記憶體的消耗。

在實現時,RDD針對transformation操作,都提供了對應的繼承自RDD的型別,例如map操作會返回MappedRDD,而flatMap則返回FlatMappedRDD。當我們執行map或flatMap操作時,不過是將當前RDD物件傳遞給對應的RDD物件而已。例如:

def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))

這些繼承自RDD的類都定義了compute函式。該函式會在action操作被呼叫時觸發,在函式內部是通過迭代器進行對應的轉換操作:

private[spark]
class MappedRDD[U: ClassTag, T: ClassTag](prev: RDD[T], f: T => U)
  extends RDD[U](prev) {

  override def getPartitions: Array[Partition] = firstParent[T].partitions

  override def compute(split: Partition, context: TaskContext) =
    firstParent[T].iterator(split, context).map(f)
}

RDD對容錯的支援

支援容錯通常採用兩種方式:資料複製或日誌記錄。對於以資料為中心的系統而言,這兩種方式都非常昂貴,因為它需要跨叢集網路拷貝大量資料,畢竟頻寬的資料遠遠低於記憶體。

RDD天生是支援容錯的。首先,它自身是一個不變的(immutable)資料集,其次,它能夠記住構建它的操作圖(Graph of Operation),因此當執行任務的Worker失敗時,完全可以通過操作圖獲得之前執行的操作,進行重新計算。由於無需採用replication方式支援容錯,很好地降低了跨網路的資料傳輸成本。

不過,在某些場景下,Spark也需要利用記錄日誌的方式來支援容錯。例如,在Spark Streaming中,針對資料進行update操作,或者呼叫Streaming提供的window操作時,就需要恢復執行過程的中間狀態。此時,需要通過Spark提供的checkpoint機制,以支援操作能夠從checkpoint得到恢復。

針對RDD的wide dependency,最有效的容錯方式同樣還是採用checkpoint機制。不過,似乎Spark的最新版本仍然沒有引入auto checkpointing機制。

總結

RDD是Spark的核心,也是整個Spark的架構基礎。它的特性可以總結如下:

  • 它是不變的資料結構儲存
  • 它是支援跨叢集的分散式資料結構
  • 可以根據資料記錄的key對結構進行分割槽
  • 提供了粗粒度的操作,且這些操作都支援分割槽
  • 它將資料儲存在記憶體中,從而提供了低延遲性