1. 程式人生 > >Spark原理概述

Spark原理概述

原文來自我的個人網站:http://www.itrensheng.com/archives/Spark_basic_knowledge

一. Spark出現的背景

在Spark出現之前,大資料計算引擎主要是MapReduce。HDFS + MapReduce的組合幾乎可以實現所有的大資料應用場景。MR框架抽象程度比較高,需要我們編寫Map和Reduce兩個步驟(MapReduce 框架其實包含5 個步驟:Map、Sort、Combine、Shuffle以及Reduce)

每個Map和Reduce之間需要進行Shuffle(這步操作會涉及數量巨大的網路傳輸,需要耗費大量的時間)。由於 MapReduce 的框架限制,一個 MapReduce 任務只能包含一次 Map 和一次 Reduce,計算完成之後,MapReduce會將運算中間結果寫回到磁碟中,供下次計算使用。

二.Spark簡介

Spark是由加州大學伯克利分校AMP實驗室開源的分散式大規模資料處理通用引擎,具有高吞吐、低延時、通用易擴充套件、高容錯等特點。Spark內部提供了豐富的開發庫,集成了資料分析引擎Spark SQL、圖計算框架GraphX、機器學習庫MLlib、流計算引擎Spark Streaming

相比於MapReduce的計算模型,Spark是將資料一直快取在記憶體中,直到計算得到最後的結果,再將結果寫入到磁碟,所以多次運算的情況下,Spark省略了多次磁碟IO。

對比MapReduceSpark
速度 處理資料需要連續的讀寫磁碟 是MapReduce的10到100倍
編碼難度 程式設計師來賦值每一步 RDD高可用,失敗重試
及時性 不適合做OLAP,只適合批處理 能兼顧批處理和OLAP
排程 使用外部的排程,如Oozie 自帶排程,也可使用外部排程
程式語言 Java Scala
SQL支援 本身不提供,需要外部查詢引擎,如Hive 自帶Spark SQL
可擴充套件性 最大支援14000個節點 最大8000節點
機器學習 外部依賴Mahout 自帶Spark MLlib
快取 能不快取到記憶體中 可以快取到記憶體中
安全性 安全特性比Spark廣泛 不如MapReduce

三. Spark系統架構

Driver:

一個Spark job執行前會啟動一個Driver程序,也就是作業的主程序,負責解析和生成各個Stage,並排程Task到Executor上

SparkContext:

程式執行排程的核心,高層排程去DAGScheduler劃分程式的每個階段,底層排程器TaskScheduler劃分每個階段具體任務

Worker:

也就是WorkderNode,負責執行Master所傳送的指令,來具體分配資源並執行任務

Executer:

負責執行作業。如圖中所以,Executer是分步在各個Worker Node上,接收來自Driver的命令並載入Task

DAGScheduler:

負責高層排程,劃分stage並生產DAG有向無環圖

TaskScheduler:

負責具體stage內部的底層排程,具體task的排程和容錯

Job:

每次Action都會觸發一次Job,一個Job可能包含一個或多個stage

Stage:

用來計算中間結果的Tasksets。分為ShuffleMapStage和ResultStage,出了最後一個Stage是ResultStage外,其他都是ShuffleMapStage。ShuffleMapStage會產生中間結果,是以檔案的方式儲存在叢集當中,以便能夠在不同stage種重用

Task:

任務執行的工作單位,每個Task會被髮送到一個節點上,每個Task對應RDD的一個partition.

RDD:

是以partition分片的不可變,Lazy級別資料集合 運算元

Transformation:

由DAGScheduler劃分到pipeline中,是Lazy級別的,不會觸發任務的執行

Action:

會觸發Job來執行pipeline中的運算

四. Spark Job執行流程

    spark = SparkSession\
        .builder\
        .appName("PythonWordCount")\
        .getOrCreate()

    lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
    counts = lines.flatMap(lambda x: x.split(' ')) \
                  .map(lambda x: (x, 1)) \
                  .reduceByKey(add)
    output = counts.collect()
    for (word, count) in output:
        print("%s: %i" % (word, count))

    spark.stop()
  1. 使用spark-submit向叢集提交一個job之後,就會啟動一個Driver程序。Driver程序會根據deploy-mode不同而不同,可能是本地啟動,也可能是叢集中的節點
  2. Driver程序向資源管理器Resource Manager(可以是Standalone、Mesos或YARN)註冊並申請執行Executor資源,如YARN會根據spark-submit中申請的引數來為Spark作業設定對應的資源引數,並在叢集中的各個節點上分配對應數量的Executor程序
  3. Driver程序會將整個Job拆分為多個Stage,一個Stage可能包含多個Task,並將這些Task分配到第二步中申請到的Executor程序中執行。Task是執行的最小Unit。當一個Stage所屬的所有Task都執行完成之後,會在各個節點的磁碟檔案中記錄中間結果並繼續執行後續的Stage。

四. RDD

定義:

RDD 是 Spark 的計算模型。RDD(Resilient Distributed Dataset)叫做彈性的分散式資料集合,是 Spark中最基本的資料抽象,它代表一個不可變、只讀的,被分割槽的資料集。
可以將 RDD 理解為一個分散式物件集合,本質上是一個只讀的分割槽記錄集合。每個 RDD可以分成多個分割槽,每個分割槽就是一個數據集片段。一個 RDD 的不同分割槽可以儲存到叢集中的不同結點上,從而可以在叢集中的不同結點上進行平行計算。

五大特性:

  1. 分割槽列表:RDD是分割槽的,且每一個分割槽都會被一個Task所處理,所以Job的並行執行能力取決於分割槽多少。預設情況下,RDD的分割槽數是整合自父RDD,這個值也可以在建立RDD的時候在程式碼中指定

  2. 計算函式:每個分割槽都有一個計算函式,這個計算函式是以分片為基本單位的。如在RDD的寬依賴場景下,將寬依賴劃分為Stage,而Stage使用BlockManager獲取分割槽資料並根據計算函式來split對應的Block

  3. 存在依賴關係:RDD經過計算任務每次都會轉化為一個不可變的新的RDD。因為有依賴關係,所以當前一個RDD失敗的時候,Spark會根據依賴關係重新計算前一個失敗的RDD,而不是所有的RDD。

  4. KV資料型別分割槽器:控制分割槽策略和分割槽數,每個KV形式的RDD都有Partitioner屬性,來控制RDD如何分割槽。

5.優先位置列表:每個分割槽都有優先位置列表,用於儲存Partition的優先位置。如果是讀取HDFS,那就是每個Block的優先位置。

RDD的依賴關係

依賴關係分為寬依賴(Wide Dependency)和窄依賴(Narraw Dependency)。

  • 寬依賴:子RDD分割槽依賴父RDD的所有分割槽。如果子RDD部分分割槽甚至全部分割槽資料損壞或丟失,需要從所有父RDD重新計算,相對窄依賴而言付出的代價更高,所以應儘量避免寬依賴的使用

  • 窄依賴:父RDD的分割槽只對應一個子RDD的分割槽。如果子RDD只有部分分割槽資料損壞或者丟失,只需要從對應的父RDD重新計算恢復如果子RDD只有部分分割槽資料損壞或者丟失,只需要從對應的父RDD重新計算恢復

型別

RDD可以分為2中型別:Transformation 和 Action

Transformation 操作不是馬上提交 Spark 叢集執行的,Spark 在遇到 Transformation操作時只會記錄需要這樣的操作,並不會去執行,需要等到有 Action 操作的時候才會真正啟動計算過程進行計算.
針對每個 Action,Spark 會生成一個 Job,從資料的建立開始,經過 Transformation, 結尾是 Action 操作.這些操作對應形成一個有向無環圖(DAG),形成 DAG 的先決條件是最後的函式操作是一個Action

五. 快取

Spark 本身就是一個基於記憶體的迭代式計算,當某個RDD的計算結果會被多次重複使用的時候,快取就很有必要(尤其是對於整個血統很長的計算任務)。如果程式從頭到尾只有一個 Action 操作且子RDD只依賴於一個父RDD 的話,就不需要使用 cache 這個機制。
Spark 可以使用 persist 和 cache 方法將任意 RDD 快取到記憶體、磁碟檔案系統中。快取是容錯的,如果一個 RDD 分片丟失,則可以通過構建它的轉換來自動重構。被快取的 RDD 被使用時,存取速度會被大大加速。一般情況下,Executor 記憶體的 60% 會分配給 cache,剩下的 40% 用來執行任務

  • MEMORY_ONLY: 使用未序列化的Java物件格式,將資料儲存在記憶體中。如果記憶體不夠存放所有的資料,則某些分割槽的資料就不會進行持久化。那麼下次對這個RDD執行運算元操作時,那些沒有被持久化的資料,需要從源頭處重新計算一遍。這是預設的持久化策略,使用cache()方法時,實際就是使用的這種持久化策略。

  • MEMORY_ONLY_SER: 基本含義同MEMORY_ONLY。唯一的區別是,會將RDD中的資料進行序列化,RDD的每個partition會被序列化成一個位元組陣列。這種方式更加節省記憶體,從而可以避免持久化的資料佔用過多記憶體導致頻繁GC。

  • MYMORY_AND_DISK: 使用未序列化的Java物件格式,優先嚐試將資料儲存在記憶體中。如果記憶體不夠存放所有的資料,會將資料寫入磁碟檔案中,下次對這個RDD執行運算元時,持久化在磁碟檔案中的資料會被讀取出來使用。

  • MEMORY_AND_DISK_SER: 基本含義同MEMORY_AND_DISK。唯一的區別是,會將RDD中的資料進行序列化,RDD的每個partition會被序列化成一個位元組陣列。這種方式更加節省記憶體,從而可以避免持久化的資料佔用過多記憶體導致頻繁GC。

  • DISK_ONLY: 使用未序列化的Java物件格式,將資料全部寫入磁碟檔案中。

  • MEMORY_ONLY_2/MEMORY_AND_DISK_2: 對於上述任意一種持久化策略,如果加上字尾_2,代表的是將每個持久化的資料,都複製一份副本,並將副本儲存到其他節點上。這種基於副本的持久化機制主要用於進行容錯。假如某個節點掛掉,節點的記憶體或磁碟中的持久化資料丟失了,那麼後續對RDD計算時還可以使用該資料在其他節點上的副本。如果沒有副本的話,就只能將這些資料從源頭處重新計算一遍了。

  • OFF_HEAP(experimental) : RDD的資料序例化之後儲存至Tachyon。相比於MEMORY_ONLY_SER,OFF_HEAP能夠減少垃圾回收開銷、使得Spark Executor更“小”更“輕”的同時可以共享記憶體;而且資料儲存於Tachyon中,Spark叢集節點故障並不會造成資料丟失,因此這種方式在“大”記憶體或多併發應用的場景下是很有吸引力的。需要注意的是,Tachyon並不直接包含於Spark的體系之內,需要選擇合適的版本進行部署;它的資料是以“塊”為單位進行管理的,這些塊可以根據一定的演算法被丟棄,且不會被重建。

微信掃描二維碼,關注我的公眾號 我的個人網站:http://www.itrensheng.com/