1. 程式人生 > >Spark的RDD原理以及2.0特性的介紹

Spark的RDD原理以及2.0特性的介紹

Spark 是什麼 

Spark 是 Apache 頂級專案裡面最火的大資料處理的計算引擎,它目前是負責大資料計算的工作。包括離線計算或互動式查詢、資料探勘演算法、流式計算以及圖計算等。全世界有許多公司和組織使用或給社群貢獻程式碼,社群的活躍度見 www.github.com/apache/spark。

2013 年開始 Spark開發團隊成立 Databricks,來對 Spark 進行運作和管理,並提供 Cloud 服務。Spark 社群基本保持一個季度一個版本,不出意外的話 Spark 2.0 將在五月底釋出。

與 Mapreduce 相比,Spark 具備 DAG 執行引擎以及基於記憶體的多輪迭代計算等優勢,在SQL 層面上,比 Hive/Pig 相比,引入關係資料庫的許多特性,以及記憶體管理技術。另外在 Spark 上所有的計算模型最終都統一基於 RDD 之上執行執行,包括流式和離線計算。Spark 基於磁碟的效能是 MR 的 10 倍,基於記憶體的效能是 MR 的 100 倍 ❶ (見文後參考閱讀❶ ,下同) 。

Spark 提供 SQL、機器學習庫 MLlib、流計算 Streaming 和圖計算 Graphx,同時也支援 Scala、Java、Python 和 R 語言開發的基於 API 的應用程式。

Spark

RDD 的原理

RDD,英文全稱叫 Resilient Distributed Datasets。

an RDD is a read-only, partitioned collection of records❸. 字面意思是隻讀的分散式資料集。

但其實個人覺得可以把 RDD 理解為關係資料庫 裡的一個個操作,比如 map,filter,Join 等。在 Spark 裡面實現了許多這樣的 RDD 類,即可以看成是操作類。當我們呼叫一個 map 介面,底層實現是會生成一個 MapPartitionsRDD 物件,當 RDD 真正執行時,會呼叫 MapPartitionsRDD 物件裡面的 compute 方法來執行這個操作的計算邏輯。但是不同的是,RDD 是 lazy 模式,只有像 count,saveasText 這種 action 動作被呼叫後再會去觸發 runJob 動作。

RDD 分為二類:transformation 和 action。

  • transformation 是從一個 RDD 轉換為一個新的 RDD 或者從資料來源生成一個新的 RDD;
  • action 是觸發 job 的執行。所有的 transformation 都是 lazy 執行,只有在 action 被提交的時候才觸發前面整個 RDD 的執行圖。如下

val file = sc.textFile(args(0))

val words = file.flatMap(line => line.split(” “))

val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _, 2) wordCounts.saveAsTextFile(args(1))

這段程式碼生成的 RDD 的執行樹是如下圖所示:

Spark

最終在 saveAsTextFile 方法時才會將整個 RDD 的執行圖提交給 DAG 執行引擎,根據相關資訊切分成一個一個 Stage,每個 Stage 去執行多個 task,最終完成整個 Job 的執行。

還有一個區別就是,RDD 計算後的中間結果是可以被持久化,當下一次需要使用時,可以直接使用之前持久化好的結果,而不是重新計算,並且這些結果被儲存在各個結點的 executor 上。下一次使用時,排程器可以直接把 task 分發到儲存持久化資料的結點上,減少資料的網路傳輸開稍。這種場景在資料探勘迭代計算是經常出現。如下程式碼

val links = spark.textFile(…).map(…).persist() var ranks = // RDD of (URL, rank) pairs

for (i <- 1 to ITERATIONS) {

// Build an RDD of (targetURL, float) pairs // with the contributions sent by each page val contribs = links.join(ranks).flatMap {

(url, (links, rank)) =>

links.map(dest => (dest, rank/links.size)) }

// Sum contributions by URL and get new ranks

ranks = contribs.reduceByKey((x,y) => x+y)

.mapValues(sum => a/N + (1-a)*sum) }

以上程式碼生成的 RDD 執行樹如下圖所示:

Spark

計算 contribs-0 時需要使用 links 的計算邏輯,當 links 每個分片計算完後,會將這個結果儲存到本地記憶體或磁碟上,下一次 contribs-1 計算要使用 links 的資料時,直接從上一次儲存的記憶體和磁碟上讀取就可以了。這個持久化系統叫做 blockManager,類似於在內部再構建了一個 KV 系統,K 表示每個分割槽 ID 號,V 表示這個分割槽計算後的結果。

另外在 streaming 計算時,每個 batch 會去訊息佇列上拉取這個時間段的資料,每個 Recevier 接收過來資料形成 block 塊並存放到 blockManager 上,為了可靠性,這個 block 塊可以遠端備份,後續的 batch 計算就直接在之前已讀取的 block 塊上進行計算,這樣不斷迴圈迭代來完成流處理。

一個 RDD 一般會有以下四個函式組成。

1. 操作運算元的物理執行邏輯

定義為:

def compute(split: Partition, context: TaskContext): Iterator[T]

如在 MapPartitionsRDD 裡的實現是如下:

override def compute(split: Partition, context: TaskContext): Iterator[U] = f(context, split.index, firstParent[T].iterator(split, context))

函式定義

f: (TaskContext, Int, Iterator[T]) => Iterator[U]

2. 獲取分片資訊

protected def getPartitions: Array[Partition] 

即這個操作的資料劃分為多少個分 區。跟 mapreduce 裡的 map 上的 split 類似的。

3. 獲取父 RDD 的依賴關係

protected def getDependencies: Seq[Dependency[_]] 

依賴分二種:如果 RDD 的每個分割槽最多隻能被一個 Child RDD 的一個分割槽使用,則稱之為 narrow dependency;若依賴於多個 Child RDD 分割槽,則稱之為 wide dependency。不同的操作根據其特性,可能會產生不同的依賴 ❹。如下圖所示

Spark

map 操作前後二個 RDD 操作之間的分割槽是一對一的關係,故產生 narrow dependency,而 join 操作的分割槽分別對應於它的二個子操作相對應的分割槽,故產生 wide dependency。當最後要生成具體的 task 執行時,就需要利用這個依賴關係也生成 Stage 的 DAG 圖。

4. 獲取該操作對應資料的存放位置資訊,主要是針對 HDFS 這類有資料來源的 RDD。

protected def getPreferredLocations(split: Partition): Seq[String]

Spark 的執行模式

Spark 的執行模式有 local、Yarn、Standalone、Mesos 四類。後面三個分別有 cluster 和 client 二種。client 和 cluster 的區別就是指 Driver 是在程式提交客戶端還是在叢集的 AM 上。 比如常見的 Yarn-cluster 模式如下圖所示:

Spark

一般來說,執行簡單測試或 UT 用的是 local 模式執行,其實就是用多執行緒模似分散式執行。 如果業務部門較少且不需要對部門或組之間的資源做劃分和優先順序排程的話,可以使用 Standalone 模式來部署。

當如果有多個部門或組,且希望每個組織可以限制固定執行的最大資源,另外組或者任務需要有優先順序執行的話,可以選擇 Yarn 或 Mesos。

Spark 2.0 的特性

Unifying DataFrames and Datasets in Scala/Java

DataFrame❺ 和 Dataset❽ 的功能是什麼?

它們都是提供給使用者使用,包括各類操作介面的 API。1.3 版本引入 DataFrame,1.6 版本引入 Dataset,2.0 提供的功能是將二者統一,即保留 Dataset,而把 DataFrame 定義為 Dataset[Row],即是 Dataset 裡的元素物件為 Row 的一種(SPARK-13485)。

在參考資料 中有介紹 DataFrame,它就是提供了一系列操作 API,與 RDD API 相比較,DataFrame 裡操作的資料都是帶有 Schema 資訊,所以 DataFrame 裡的所有操作是可以享受 Spark SQL Catalyst optimizer 帶來的效能提升,比如 code generation 以及 Tungsten❼ 等。執行過程如下圖所示

Spark

但是 DataFrame 出來後發現有些情況下 RDD 可以表達的邏輯用 DataFrame 無法表達。比如 要對 group by 或 join 後的結果用自定義的函式,可能用 SQL 是無法表達的。如下程式碼:

case class ClassData(a: String, b: Int)

case class ClassNullableData(a: String, b: Integer)

val ds = Seq(ClassData(“a”, 1), ClassData(“a”, 2)).toDS()

val agged = ds.groupByKey(d => ClassNullableData(d.a, null))

.mapGroups {

case (key, values) => key.a + values.map(_.b).sum

}

中間處理過程的資料是自定義的型別,並且 groupby 後的聚合邏輯也是自定義的,故用 SQL 比較難以表達,所以提出了 Dataset API。Dataset API 擴充套件 DataFrame API 支援靜態型別和執行已經存在的 Scala 或 Java 語言的使用者自定義函式。同時 Dataset 也能享受 Spark SQL 裡所有效能 帶來的提升。

那麼後面發現 Dataset 是包含了 DataFrame 的功能,這樣二者就出現了很大的冗餘,故在 2.0 時將二者統一,保留 Dataset API,把 DataFrame 表示為 Dataset[Row],即 Dataset 的子集。

因此我們在使用 API 時,優先選擇 DataFrame & Dataset,因為它的效能很好,而且以後的優化它都可以享受到,但是為了相容早期版本的程式,RDD API 也會一直保留著。後續 Spark 上層的庫將全部會用 DataFrame,比如 MLlib、Streaming、Graphx 等。

Whole-stage code generation

在參考資料 9 中有幾個例子的程式碼比較,我們看其中一個例子:

elect count(*) from store_sales where ss_item_sk = 1000

那麼在翻譯成計算引擎的執行計劃如下圖:

Spark

而通常物理計劃的程式碼是這樣實現的:

class Filter {

def next(): Boolean = {

var found = false

while (!found && child.next()) {

found = predicate(child.fetch())

}

return found

}

def fetch(): InternalRow = {

child.fetch()

}…

}

但是真正如果我們用 hard code 寫的話,程式碼是這樣的:

var count = 0

for (ss_item_sk in store_sales) {

if (ss_item_sk == 1000) {

count += 1

}

}

發現二者相關如下圖所示:

Spark

那麼如何使得計算引擎的物理執行速度能達到 hard code 的效能呢?這就提出了 whole-stage code generation,即對物理執行的多次呼叫轉換為程式碼 for 迴圈,類似 hard code 方式,減少中間執行的函式呼叫次數,當資料記錄多時,這個呼叫次數是很大。 最後這個優化帶來的效能提升如下圖所示:

Spark

從 benchmark 的結果可以看出,使用了該特性後各操作的效能都有很大的提升。

Structured Streaming

Spark Streaming 是把流式計算看成一個一個的離線計算來完成流式計算,提供了一套 Dstream 的流 API,相比於其他的流式計算,Spark Streaming 的優點是容錯性和吞吐量上要有優勢❿,關於 Spark Streaming 的詳細設計思想和分析,可以到 https://github.com/lw-lin/CoolplaySpark 進行詳細學習和了解。

在 2.0 以前的版本,使用者在使用時,如果有流計算,又有離線計算,就需要用二套 API 去編寫程式,一套是 RDD API,一套是 Dstream API。而且 Dstream API 在易用性上遠不如 SQL 或 DataFrame。

為了真正將流式計算和離線計算在程式設計 API 上統一,同時也讓 Streaming 作業能夠享受 DataFrame/Dataset 上所帶來的優勢:效能提升和 API 易用,於是提出了 Structured Streaming。最後我們只需要基於 DataFrame/Dataset 可以開發離線計算和流式計算的程式,很容易使得 Spark 在 API 跟業界所說的 DataFlow 來統一離線計算和流式計算效果一樣。

比如在做 Batch Aggregation 時我們可以寫成下面的程式碼

Spark

那麼對於流式計算時,我們僅僅是呼叫了 DataFrame/Dataset 的不同函式程式碼,如下:

Spark

最後,在 DataFrame/Dataset 這個 API 上可以完成如下圖所示的所有應用:

Spark

其他主要效能提升

  1. 採用 vectorized Parquet decoder 讀取 parquet 上資料。以前是一行一行的讀取,然後處理。現在改為一次讀取 4096 行記錄,不需要每處理一行記錄去呼叫一次 Parquet 獲取記錄的方法,而是改為一批去呼叫一次(SPARK-12854)。加上 Parquet 本身是列式儲存,這個優化使得 Parquet 讀取速度提高 3 倍。
  2. 採有 radix sort 來提高 sort 的效能(SPARK-14724)。在某些情況下排序效能可以提高 10-20 倍。
  3. 使用 VectorizedHashmap 來代替 Java 的 HashMap 加速 group by 的執行(SPARK-14319)。
  4. 將 Hive 中的 Window 函式用 Native Spark Window 實現,因為 Native Spark Window 在記憶體管理上有優勢(SPARK-8641)。
  5. 避免複雜語句中的邏輯相同部分在執行時重複計算(SPARK-13523)。
  6. 壓縮演算法預設使用 LZ4(SPARK-12388)。

語句的增強

  1. 建立新的語法解析(SPARK-12362)滿足所有的 SQL 語法,這樣即合併 Hive 和標準 SQL 的語句解析,同時不依賴 Hive 的語法解析 jar(SPARK-14776)。之前版本二者的語法解析是獨立的,這樣導致在標準 SQL 中無法使用視窗函式或者 Hive 的語法,而在使用 Hive 語法時無法使用標準 SQL 的語法,比如 In/Exists 子句等。在 SQL 編寫時,沒法在一個 Context 把二者的範圍全部支援,然而有了這個特性後,使得 SQL 語句表達更強大,後續要增加任何語法,只需要維護這一個語法解析即可。當然缺點是後續 Hive 版本的新語法,需要手動新增進來。
  2. 支援 intersect/except(SPARK-12542)。如 select * from t1 except select * from t2 或者 select * from t1 intersect select * from t2
  3. 支援 uncorrelated scalar subquery(SPARK-13417)。如 select (select min(value) from testData where key = (select max(key) from testData) – 1)
  4. 支援 DDL/DML(SPARK-14118)。之前 DDL/DML 語句是呼叫 Hive 的 DDL/DML 語句命令來完成,而現在是直接在 Spark SQL 上就可以完成。
  5. 支援 multi-insert(SPARK-13924)。
  6. 支援 exist(SPARK-12545)和 NOT EXISTS(SPARK-10600),如 select * from (select 1 as a union all select 2 as a) t where exists (select * from (select 1 as b) t2 where b = a and b < 2)
  7. 支援 subqueries 帶有 In/Not In 子句(SPARK-4226),如 select * from (select 1 as a union all select 2 as a) t where a in (select b as a from t2 where b < 2)
  8. 支援 select/where/having 中使用 subquery(SPARK-12543),如 select * from t where a = (select max(b) from t2)select max(a) as ma from t having ma = (select max(b) from t2)
  9. 支援 LeftSemi/LeftAnti(SPARK-14853)。
  10. 支援在條件表示式 In/Not In 裡使用子句(SPARK-14781),如 select * from l where l.a in (select c from r) or l.a in (select c from r where l.b < r.d)
  11. 支援所有的 TPCDS 語句(SPARK-12540)。

與以前版本相容(SPARK-11806)

  1. 不支援執行在 Hadoop 版本 < 2.2 上(SPARK-11807)。
  2. 去掉 HTTPBroadcast(SPARK-12588)。
  3. 去掉 HashShuffleManager(SPARK-14667)。
  4. 去掉 Akka RPC。
  5. 簡化與完善 accumulators and task metrics(SPARK-14626)。
  6. 將 Hive 語法解析以及語法移至 Core 裡(SPARK-14825),在沒有 Hive 元資料庫和 Hive 依賴包時,我們可以像之前版本使用標準 SQL 一樣去使用 HiveQL 語句。

1.6 版本嚴重問題的解決

在 http://geek.csdn.net/news/detail/70162 提到的 1.6 問題中 Spillable 集合記憶體溢位問題在 SPARK-4452 裡已解決,BlockManager 死鎖問題在 SPARK-12757 裡已解決。

最後 2.0 版本還有一些其他的特性,如:

  1. 用 SparkSession 替換掉原來的 SQLContext and HiveContext。
  2. mllib 裡的計算用 DataFrame-based API 代替以前的 RDD 計算邏輯。
  3. 提供更多的 R 語言演算法。
  4. 預設使用 Scala 2.11 編譯與執行。

參考資料

  1. http://spark.apache.org/
  2. https://databricks.com/blog/2016/05/11/spark-2-0-technical-preview-easier-faster-and-smarter.html
  3. http://www.eecs.berkeley.edu/Pubs/TechRpts/2014/EECS-2014-12.pdf
  4. http://www.infoq.com/cn/articles/spark-core-rdd
  5. https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html
  6. http://www.slideshare.net/databricks/spark-summit-eu-2015-spark-dataframes-simple-and-fast-analysis-of-structured-data
  7. https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html
  8. https://databricks.com/blog/2016/01/04/introducing-spark-datasets.html
  9. https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/6122906529858466/293651311471490/5382278320999420/latest.html
  10. http://www.csdn.net/article/2014-01-28/2818282-Spark-Streaming-big-data
  11. http://www.slideshare.net/rxin/the-future-of-realtime-in-spark

文/王聯輝

文章出處——高可用架構微信公眾號