1. 程式人生 > >Spark執行架構(Good)

Spark執行架構(Good)

轉自與https://note.youdao.com/share/?id=7fc41e362e86a863a84e787573433a76&type=note#/


1 Spark執行架構

1.1 術語定義

lApplicationSpark Application的概念和Hadoop MapReduce中的類似,指的是使用者編寫的Spark應用程式,包含了一個Driver 功能的程式碼和分佈在叢集中多個節點上執行的Executor程式碼;

lDriverSpark中的Driver即執行上述Applicationmain()函式並且建立SparkContext

,其中建立SparkContext的目的是為了準備Spark應用程式的執行環境。在Spark中由SparkContext負責和ClusterManager通訊,進行資源的申請、任務的分配和監控等;當Executor部分執行完畢後,Driver負責將SparkContext關閉。通常用SparkContext代表Drive

lExecutorApplication執行在Worker 節點上的一個程序,該程序負責執行Task,並且負責將資料存在記憶體或者磁碟上,每個Application都有各自獨立的一批Executor。在Spark on Yarn模式下,其程序名稱為CoarseGrainedExecutorBackend

,類似於Hadoop MapReduce中的YarnChild。一個CoarseGrainedExecutorBackend程序有且僅有一個executor物件,它負責將Task包裝成taskRunner,並從執行緒池中抽取出一個空閒執行緒執行Task。每個CoarseGrainedExecutorBackend能並行執行Task的數量就取決於分配給它的CPU的個數了;

lCluster Manager:指的是在叢集上獲取資源的外部服務,目前有:

Ø StandaloneSpark原生的資源管理,由Master負責資源的分配;

Ø Hadoop Yarn:由YARN

中的ResourceManager負責資源的分配;

lWorker:叢集中任何可以執行Application程式碼的節點,類似於YARN中的NodeManager節點。在Standalone模式中指的就是通過Slave檔案配置的Worker節點,在Spark on Yarn模式中指的就是NodeManager節點;

l作業(Job:包含多個Task組成的平行計算,往往由Spark Action催生,一個JOB包含多個RDD及作用於相應RDD上的各種Operation

l階段(Stage:每個Job會被拆分很多組Task,每組任務被稱為Stage,也可稱TaskSet,一個作業分為多個階段;

l任務(Task: 被送到某個Executor上的工作任務;


clip_image002

1.2 Spark執行基本流程

Spark執行基本流程參見下面示意圖

1. 構建Spark Application的執行環境(啟動SparkContext),SparkContext向資源管理器(可以是StandaloneMesosYARN)註冊並申請執行Executor資源;

2. 資源管理器分配Executor資源並啟動StandaloneExecutorBackendExecutor執行情況將隨著心跳傳送到資源管理器上;

3. SparkContext構建成DAG圖,將DAG圖分解成Stage,並把Taskset傳送給Task SchedulerExecutorSparkContext申請TaskTask SchedulerTask發放給Executor運行同時SparkContext將應用程式程式碼發放給Executor

4. TaskExecutor上執行,執行完畢釋放所有資源。


clip_image004

Spark執行架構特點:

l每個Application獲取專屬的executor程序,該程序在Application期間一直駐留,並以多執行緒方式執行tasks。這種Application隔離機制有其優勢的,無論是從排程角度看(每個Driver排程它自己的任務),還是從執行角度看(來自不同ApplicationTask執行在不同的JVM中)。當然,這也意味著Spark Application不能跨應用程式共享資料,除非將資料寫入到外部儲存系統。

lSpark與資源管理器無關,只要能夠獲取executor程序,並能保持相互通訊就可以了。

l提交SparkContextClient應該靠近Worker節點(執行Executor的節點),最好是在同一個Rack裡,因為Spark Application執行過程中SparkContextExecutor之間有大量的資訊交換;如果想在遠端叢集中執行,最好使用RPCSparkContext提交給叢集,不要遠離Worker執行SparkContext

lTask採用了資料本地性和推測執行的優化機制。

1.2.1 DAGScheduler

DAGScheduler把一個Spark作業轉換成StageDAGDirected Acyclic Graph有向無環圖),根據RDDStage之間的關係找出開銷最小的排程方法,然後把StageTaskSet的形式提交給TaskScheduler,下圖展示了DAGScheduler的作用:


clip_image006

1.2.2 TaskScheduler

DAGScheduler決定了執行Task的理想位置,並把這些資訊傳遞給下層的TaskScheduler。此外,DAGScheduler還處理由於Shuffle資料丟失導致的失敗,這有可能需要重新提交執行之前的Stage(非Shuffle資料丟失導致的Task失敗由TaskScheduler處理)。

TaskScheduler維護所有TaskSet,當ExecutorDriver傳送心跳時,TaskScheduler會根據其資源剩餘情況分配相應的Task。另外TaskScheduler還維護著所有Task的執行狀態,重試失敗的Task。下圖展示了TaskScheduler的作用:


clip_image008

在不同執行模式中任務排程器具體為:

l Spark on Standalone模式為TaskScheduler

l YARN-Client模式為YarnClientClusterScheduler

l YARN-Cluster模式為YarnClusterScheduler

1.3 RDD執行原理

那麼 RDDSpark架構中是如何執行的呢?總高層次來看,主要分為三步:

1.建立 RDD 物件

2.DAGScheduler模組介入運算,計算RDD之間的依賴關係。RDD之間的依賴關係就形成了DAG

3.每一個JOB被分為多個Stage,劃分Stage的一個主要依據是當前計算因子的輸入是否是確定的,如果是則將其分在同一個Stage,避免多個Stage之間的訊息傳遞開銷。


clip_image010

以下面一個按 A-Z 首字母分類,查詢相同首字母下不同姓名總個數的例子來看一下 RDD 是如何執行起來的。


clip_image011

步驟 1 :建立 RDD 上面的例子除去最後一個 collect 是個動作,不會建立 RDD 之外,前面四個轉換都會創建出新的 RDD 。因此第一步就是建立好所有 RDD( 內部的五項資訊 )

步驟 2 :建立執行計劃 Spark 會盡可能地管道化,並基於是否要重新組織資料來劃分 階段(stage) ,例如本例中的 groupBy() 轉換就會將整個執行計劃劃分成兩階段執行。最終會產生一個 DAG(directed acyclic graph ,有向無環圖 ) 作為邏輯執行計劃。


clip_image012

步驟 3 :排程任務 將各階段劃分成不同的 任務 (task) ,每個任務都是資料和計算的合體。在進行下一階段前,當前階段的所有任務都要執行完成。因為下一階段的第一個轉換一定是重新組織資料的,所以必須等當前階段所有結果資料都計算出來了才能繼續。

假設本例中的 hdfs://names 下有四個檔案塊,那麼 HadoopRDD  partitions 就會有四個分割槽對應這四個塊資料,同時 preferedLocations 會指明這四個塊的最佳位置。現在,就可以創建出四個任務,並排程到合適的叢集結點上。


clip_image014

2Spark在不同叢集中的執行架構

Spark注重建立良好的生態系統,它不僅支援多種外部檔案儲存系統,提供了多種多樣的叢集執行模式。部署在單臺機器上時,既可以用本地(Local)模式執行,也可以使用偽分散式模式來執行;當以分散式叢集部署的時候,可以根據自己叢集的實際情況選擇Standalone模式(Spark自帶的模式)、YARN-Client模式或者YARN-Cluster模式。Spark的各種執行模式雖然在啟動方式、執行位置、排程策略上各有不同,但它們的目的基本都是一致的,就是在合適的位置安全可靠的根據使用者的配置和Job的需要執行和管理Task

2.1 Spark on Standalone執行過程

Standalone模式是Spark實現的資源排程框架,其主要的節點有Client節點、Master節點和Worker節點。其中Driver既可以執行在Master節點上中,也可以執行在本地Client端。當用spark-shell互動式工具提交SparkJob時,DriverMaster節點上執行;當使用spark-submit工具提交Job或者在EclipsIDEA等開發平臺上使用”new SparkConf.setManager(spark://master:7077)”方式執行Spark任務時,Driver是執行在本地Client端上的。

其執行過程如下:

1.SparkContext連線到Master,向Master註冊並申請資源(CPU Core Memory);

2.Master根據SparkContext的資源申請要求和Worker心跳週期內報告的資訊決定在哪個Worker上分配資源,然後在該Worker上獲取資源,然後啟動StandaloneExecutorBackend

3.StandaloneExecutorBackendSparkContext註冊;

4.SparkContextApplicaiton程式碼傳送給StandaloneExecutorBackend;並且SparkContext解析Applicaiton程式碼,構建DAG圖,並提交給DAG Scheduler分解成Stage(當碰到Action操作時,就會催生Job;每個Job中含有1個或多個StageStage一般在獲取外部資料和shuffle之前產生),然後以Stage(或者稱為TaskSet)提交給Task SchedulerTask Scheduler負責將Task分配到相應的Worker,最後提交給StandaloneExecutorBackend執行;

5.StandaloneExecutorBackend會建立Executor執行緒池,開始執行Task,並向SparkContext報告,直至Task完成。

6.所有Task完成後,SparkContextMaster登出,釋放資源。


clip_image016

2.2 Spark on YARN執行過程

YARN是一種統一資源管理機制,在其上面可以執行多套計算框架。目前的大資料技術世界,大多數公司除了使用Spark來進行資料計算,由於歷史原因或者單方面業務處理的效能考慮而使用著其他的計算框架,比如MapReduceStorm等計算框架。Spark基於此種情況開發了Spark on YARN的執行模式,由於藉助了YARN良好的彈性資源管理機制,不僅部署Application更加方便,而且使用者在YARN叢集中執行的服務和Application的資源也完全隔離,更具實踐應用價值的是YARN可以通過佇列的方式,管理同時執行在叢集中的多個服務。

Spark on YARN模式根據Driver在叢集中的位置分為兩種模式:一種是YARN-Client模式,另一種是YARN-Cluster(或稱為YARN-Standalone模式)。

2.2.1 YARN框架流程

任何框架與YARN的結合,都必須遵循YARN的開發模式。在分析Spark on YARN的實現細節之前,有必要先分析一下YARN框架的一些基本原理。

Yarn框架的基本執行流程圖為:


clip_image018

其中,ResourceManager負責將叢集的資源分配給各個應用使用,而資源分配和排程的基本單位是Container,其中封裝了機器資源,如記憶體、CPU、磁碟和網路等,每個任務會被分配一個Container,該任務只能在該Container中執行,並使用該Container封裝的資源。NodeManager是一個個的計算節點,主要負責啟動Application所需的Container,監控資源(記憶體、CPU、磁碟和網路等)的使用情況並將之彙報給ResourceManagerResourceManagerNodeManagers共同組成整個資料計算框架,ApplicationMaster與具體的Application相關,主要負責同ResourceManager協商以獲取合適的Container,並跟蹤這些Container的狀態和監控其進度。

2.2.2 YARN-Client

Yarn-Client模式中,Driver在客戶端本地執行,這種模式可以使得Spark Application和客戶端進行互動,因為Driver在客戶端,所以可以通過webUI訪問Driver的狀態,預設是http://hadoop1:4040訪問,而YARN通過http:// hadoop1:8088訪問。

YARN-client的工作流程分為以下幾個步驟:


clip_image020

1.Spark Yarn ClientYARNResourceManager申請啟動Application Master。同時在SparkContent初始化中將建立DAGSchedulerTASKScheduler等,由於我們選擇的是Yarn-Client模式,程式會選擇YarnClientClusterSchedulerYarnClientSchedulerBackend

2.ResourceManager收到請求後,在叢集中選擇一個NodeManager,為該應用程式分配第一個Container,要求它在這個Container中啟動應用程式的ApplicationMaster,與YARN-Cluster區別的是在該ApplicationMaster不執行SparkContext,只與SparkContext進行聯絡進行資源的分派;

3.Client中的SparkContext初始化完畢後,與ApplicationMaster建立通訊,向ResourceManager註冊,根據任務資訊向ResourceManager申請資源(Container);

4.一旦ApplicationMaster申請到資源(也就是Container)後,便與對應的NodeManager通訊,要求它在獲得的Container中啟動啟動CoarseGrainedExecutorBackendCoarseGrainedExecutorBackend啟動後會向Client中的SparkContext註冊並申請Task

5.Client中的SparkContext分配TaskCoarseGrainedExecutorBackend執行,CoarseGrainedExecutorBackend