Spark原理概述
原文來自我的個人網站:http://www.itrensheng.com/archives/Spark_basic_knowledge
一. Spark出現的背景
在Spark出現之前,大資料計算引擎主要是MapReduce。HDFS + MapReduce的組合幾乎可以實現所有的大資料應用場景。MR框架抽象程度比較高,需要我們編寫Map和Reduce兩個步驟(MapReduce 框架其實包含5 個步驟:Map、Sort、Combine、Shuffle以及Reduce)
每個Map和Reduce之間需要進行Shuffle(這步操作會涉及數量巨大的網路傳輸,需要耗費大量的時間)。由於 MapReduce 的框架限制,一個 MapReduce 任務只能包含一次 Map 和一次 Reduce,計算完成之後,MapReduce會將運算中間結果寫回到磁碟中,供下次計算使用。
二.Spark簡介
Spark是由加州大學伯克利分校AMP實驗室開源的分散式大規模資料處理通用引擎,具有高吞吐、低延時、通用易擴充套件、高容錯等特點。Spark內部提供了豐富的開發庫,集成了資料分析引擎Spark SQL、圖計算框架GraphX、機器學習庫MLlib、流計算引擎Spark Streaming
相比於MapReduce的計算模型,Spark是將資料一直快取在記憶體中,直到計算得到最後的結果,再將結果寫入到磁碟,所以多次運算的情況下,Spark省略了多次磁碟IO。
對比 | MapReduce | Spark |
---|---|---|
速度 | 處理資料需要連續的讀寫磁碟 | 是MapReduce的10到100倍 |
編碼難度 | 程式設計師來賦值每一步 | RDD高可用,失敗重試 |
及時性 | 不適合做OLAP,只適合批處理 | 能兼顧批處理和OLAP |
排程 | 使用外部的排程,如Oozie | 自帶排程,也可使用外部排程 |
程式語言 | Java | Scala |
SQL支援 | 本身不提供,需要外部查詢引擎,如Hive | 自帶Spark SQL |
可擴充套件性 | 最大支援14000個節點 | 最大8000節點 |
機器學習 | 外部依賴Mahout | 自帶Spark MLlib |
快取 | 能不快取到記憶體中 | 可以快取到記憶體中 |
安全性 | 安全特性比Spark廣泛 | 不如MapReduce |
三. Spark系統架構
Driver:
一個Spark job執行前會啟動一個Driver程序,也就是作業的主程序,負責解析和生成各個Stage,並排程Task到Executor上
SparkContext:
程式執行排程的核心,高層排程去DAGScheduler劃分程式的每個階段,底層排程器TaskScheduler劃分每個階段具體任務
Worker:
也就是WorkderNode,負責執行Master所傳送的指令,來具體分配資源並執行任務
Executer:
負責執行作業。如圖中所以,Executer是分步在各個Worker Node上,接收來自Driver的命令並載入Task
DAGScheduler:
負責高層排程,劃分stage並生產DAG有向無環圖
TaskScheduler:
負責具體stage內部的底層排程,具體task的排程和容錯
Job:
每次Action都會觸發一次Job,一個Job可能包含一個或多個stage
Stage:
用來計算中間結果的Tasksets。分為ShuffleMapStage和ResultStage,出了最後一個Stage是ResultStage外,其他都是ShuffleMapStage。ShuffleMapStage會產生中間結果,是以檔案的方式儲存在叢集當中,以便能夠在不同stage種重用
Task:
任務執行的工作單位,每個Task會被髮送到一個節點上,每個Task對應RDD的一個partition.
RDD:
是以partition分片的不可變,Lazy級別資料集合 運算元
Transformation:
由DAGScheduler劃分到pipeline中,是Lazy級別的,不會觸發任務的執行
Action:
會觸發Job來執行pipeline中的運算
四. Spark Job執行流程
spark = SparkSession\ .builder\ .appName("PythonWordCount")\ .getOrCreate() lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0]) counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(add) output = counts.collect() for (word, count) in output: print("%s: %i" % (word, count)) spark.stop()
- 使用spark-submit向叢集提交一個job之後,就會啟動一個Driver程序。Driver程序會根據deploy-mode不同而不同,可能是本地啟動,也可能是叢集中的節點
- Driver程序向資源管理器Resource Manager(可以是Standalone、Mesos或YARN)註冊並申請執行Executor資源,如YARN會根據spark-submit中申請的引數來為Spark作業設定對應的資源引數,並在叢集中的各個節點上分配對應數量的Executor程序
- Driver程序會將整個Job拆分為多個Stage,一個Stage可能包含多個Task,並將這些Task分配到第二步中申請到的Executor程序中執行。Task是執行的最小Unit。當一個Stage所屬的所有Task都執行完成之後,會在各個節點的磁碟檔案中記錄中間結果並繼續執行後續的Stage。
四. RDD
定義:
RDD 是 Spark 的計算模型。RDD(Resilient Distributed Dataset)叫做彈性的分散式資料集合,是 Spark中最基本的資料抽象,它代表一個不可變、只讀的,被分割槽的資料集。
可以將 RDD 理解為一個分散式物件集合,本質上是一個只讀的分割槽記錄集合。每個 RDD可以分成多個分割槽,每個分割槽就是一個數據集片段。一個 RDD 的不同分割槽可以儲存到叢集中的不同結點上,從而可以在叢集中的不同結點上進行平行計算。
五大特性:
-
分割槽列表:RDD是分割槽的,且每一個分割槽都會被一個Task所處理,所以Job的並行執行能力取決於分割槽多少。預設情況下,RDD的分割槽數是整合自父RDD,這個值也可以在建立RDD的時候在程式碼中指定
-
計算函式:每個分割槽都有一個計算函式,這個計算函式是以分片為基本單位的。如在RDD的寬依賴場景下,將寬依賴劃分為Stage,而Stage使用BlockManager獲取分割槽資料並根據計算函式來split對應的Block
-
存在依賴關係:RDD經過計算任務每次都會轉化為一個不可變的新的RDD。因為有依賴關係,所以當前一個RDD失敗的時候,Spark會根據依賴關係重新計算前一個失敗的RDD,而不是所有的RDD。
-
KV資料型別分割槽器:控制分割槽策略和分割槽數,每個KV形式的RDD都有Partitioner屬性,來控制RDD如何分割槽。
5.優先位置列表:每個分割槽都有優先位置列表,用於儲存Partition的優先位置。如果是讀取HDFS,那就是每個Block的優先位置。
RDD的依賴關係
依賴關係分為寬依賴(Wide Dependency)和窄依賴(Narraw Dependency)。
-
寬依賴:子RDD分割槽依賴父RDD的所有分割槽。如果子RDD部分分割槽甚至全部分割槽資料損壞或丟失,需要從所有父RDD重新計算,相對窄依賴而言付出的代價更高,所以應儘量避免寬依賴的使用
-
-
窄依賴:父RDD的分割槽只對應一個子RDD的分割槽。如果子RDD只有部分分割槽資料損壞或者丟失,只需要從對應的父RDD重新計算恢復如果子RDD只有部分分割槽資料損壞或者丟失,只需要從對應的父RDD重新計算恢復
型別
RDD可以分為2中型別:Transformation 和 Action
Transformation 操作不是馬上提交 Spark 叢集執行的,Spark 在遇到 Transformation操作時只會記錄需要這樣的操作,並不會去執行,需要等到有 Action 操作的時候才會真正啟動計算過程進行計算.
針對每個 Action,Spark 會生成一個 Job,從資料的建立開始,經過 Transformation, 結尾是 Action 操作.這些操作對應形成一個有向無環圖(DAG),形成 DAG 的先決條件是最後的函式操作是一個Action
五. 快取
Spark 本身就是一個基於記憶體的迭代式計算,當某個RDD的計算結果會被多次重複使用的時候,快取就很有必要(尤其是對於整個血統很長的計算任務)。如果程式從頭到尾只有一個 Action 操作且子RDD只依賴於一個父RDD 的話,就不需要使用 cache 這個機制。
Spark 可以使用 persist 和 cache 方法將任意 RDD 快取到記憶體、磁碟檔案系統中。快取是容錯的,如果一個 RDD 分片丟失,則可以通過構建它的轉換來自動重構。被快取的 RDD 被使用時,存取速度會被大大加速。一般情況下,Executor 記憶體的 60% 會分配給 cache,剩下的 40% 用來執行任務
-
MEMORY_ONLY: 使用未序列化的Java物件格式,將資料儲存在記憶體中。如果記憶體不夠存放所有的資料,則某些分割槽的資料就不會進行持久化。那麼下次對這個RDD執行運算元操作時,那些沒有被持久化的資料,需要從源頭處重新計算一遍。這是預設的持久化策略,使用cache()方法時,實際就是使用的這種持久化策略。
-
MEMORY_ONLY_SER: 基本含義同MEMORY_ONLY。唯一的區別是,會將RDD中的資料進行序列化,RDD的每個partition會被序列化成一個位元組陣列。這種方式更加節省記憶體,從而可以避免持久化的資料佔用過多記憶體導致頻繁GC。
-
MYMORY_AND_DISK: 使用未序列化的Java物件格式,優先嚐試將資料儲存在記憶體中。如果記憶體不夠存放所有的資料,會將資料寫入磁碟檔案中,下次對這個RDD執行運算元時,持久化在磁碟檔案中的資料會被讀取出來使用。
-
MEMORY_AND_DISK_SER: 基本含義同MEMORY_AND_DISK。唯一的區別是,會將RDD中的資料進行序列化,RDD的每個partition會被序列化成一個位元組陣列。這種方式更加節省記憶體,從而可以避免持久化的資料佔用過多記憶體導致頻繁GC。
-
DISK_ONLY: 使用未序列化的Java物件格式,將資料全部寫入磁碟檔案中。
-
MEMORY_ONLY_2/MEMORY_AND_DISK_2: 對於上述任意一種持久化策略,如果加上字尾_2,代表的是將每個持久化的資料,都複製一份副本,並將副本儲存到其他節點上。這種基於副本的持久化機制主要用於進行容錯。假如某個節點掛掉,節點的記憶體或磁碟中的持久化資料丟失了,那麼後續對RDD計算時還可以使用該資料在其他節點上的副本。如果沒有副本的話,就只能將這些資料從源頭處重新計算一遍了。
-
OFF_HEAP(experimental) : RDD的資料序例化之後儲存至Tachyon。相比於MEMORY_ONLY_SER,OFF_HEAP能夠減少垃圾回收開銷、使得Spark Executor更“小”更“輕”的同時可以共享記憶體;而且資料儲存於Tachyon中,Spark叢集節點故障並不會造成資料丟失,因此這種方式在“大”記憶體或多併發應用的場景下是很有吸引力的。需要注意的是,Tachyon並不直接包含於Spark的體系之內,需要選擇合適的版本進行部署;它的資料是以“塊”為單位進行管理的,這些塊可以根據一定的演算法被丟棄,且不會被重建。