1. 程式人生 > 實用技巧 >Spark學習之路 七、Spark 執行流程

Spark學習之路 七、Spark 執行流程

目錄

正文

回到頂部

一、Spark中的基本概念

(1)Application:表示你的應用程式

(2)Driver:表示main()函式,建立SparkContext。由SparkContext負責與ClusterManager通訊,進行資源的申請,任務的分配和監控等。程式執行完畢後關閉SparkContext

(3)Executor:某個Application執行在Worker節點上的一個程序,該程序負責執行某些task,並且負責將資料存在記憶體或者磁碟上。在Spark on Yarn模式下,其程序名稱為 CoarseGrainedExecutor Backend,一個CoarseGrainedExecutor Backend程序有且僅有一個executor物件,它負責將Task包裝成taskRunner,並從執行緒池中抽取出一個空閒執行緒執行Task,這樣,每個CoarseGrainedExecutorBackend能並行執行Task的資料就取決於分配給它的CPU的個數。

(4)Worker:叢集中可以執行Application程式碼的節點。在Standalone模式中指的是通過slave檔案配置的worker節點,在Spark on Yarn模式中指的就是NodeManager節點。

(5)Task:在Executor程序中執行任務的工作單元,多個Task組成一個Stage

(6)Job:包含多個Task組成的平行計算,是由Action行為觸發的

(7)Stage:每個Job會被拆分很多組Task,作為一個TaskSet,其名稱為Stage

(8)DAGScheduler:根據Job構建基於Stage的DAG,並提交Stage給TaskScheduler,其劃分Stage的依據是RDD之間的依賴關係

(9)TaskScheduler:將TaskSet提交給Worker(叢集)執行,每個Executor執行什麼Task就是在此處分配的。

回到頂部

二、Spark的執行流程

2.1 Spark的基本執行流程

1、說明

(1)構建Spark Application的執行環境(啟動SparkContext),SparkContext向資源管理器(可以是Standalone、Mesos或YARN)註冊並申請執行Executor資源;

(2)資源管理器分配Executor資源並啟動StandaloneExecutorBackend,Executor執行情況將隨著心跳傳送到資源管理器上;

(3)SparkContext構建成DAG圖,將DAG圖分解成Stage,並把Taskset傳送給Task Scheduler。Executor向SparkContext申請Task

(4)Task Scheduler將Task發放給Executor運行同時SparkContext將應用程式程式碼發放給Executor。

(5)Task在Executor上執行,執行完畢釋放所有資源。

2、圖解

3、Spark執行架構特點

(1)每個Application獲取專屬的executor程序,該程序在Application期間一直駐留,並以多執行緒方式執行tasks。這種Application隔離機制有其優勢的,無論是從排程角度看(每個Driver排程它自己的任務),還是從執行角度看(來自不同Application的Task執行在不同的JVM中)。當然,這也意味著Spark Application不能跨應用程式共享資料,除非將資料寫入到外部儲存系統。

(2)Spark與資源管理器無關,只要能夠獲取executor程序,並能保持相互通訊就可以了。

(3)提交SparkContext的Client應該靠近Worker節點(執行Executor的節點),最好是在同一個Rack裡,因為Spark Application執行過程中SparkContext和Executor之間有大量的資訊交換;如果想在遠端叢集中執行,最好使用RPC將SparkContext提交給叢集,不要遠離Worker執行SparkContext。

(4)Task採用了資料本地性和推測執行的優化機制。

4、DAGScheduler

Job=多個stage,Stage=多個同種task, Task分為ShuffleMapTask和ResultTask,Dependency分為ShuffleDependency和NarrowDependency

面向stage的切分,切分依據為寬依賴

維護waiting jobs和active jobs,維護waiting stages、active stages和failed stages,以及與jobs的對映關係

主要職能:

1、接收提交Job的主入口,submitJob(rdd, ...)runJob(rdd, ...)。在SparkContext裡會呼叫這兩個方法。

  • 生成一個Stage並提交,接著判斷Stage是否有父Stage未完成,若有,提交併等待父Stage,以此類推。結果是:DAGScheduler裡增加了一些waiting stage和一個running stage。
  • running stage提交後,分析stage裡Task的型別,生成一個Task描述,即TaskSet。
  • 呼叫TaskScheduler.submitTask(taskSet, ...)方法,把Task描述提交給TaskScheduler。TaskScheduler依據資源量和觸發分配條件,會為這個TaskSet分配資源並觸發執行。
  • DAGScheduler提交job後,非同步返回JobWaiter物件,能夠返回job執行狀態,能夠cancel job,執行成功後會處理並返回結果

2、處理TaskCompletionEvent

  • 如果task執行成功,對應的stage裡減去這個task,做一些計數工作:
    • 如果task是ResultTask,計數器Accumulator加一,在job裡為該task置true,job finish總數加一。加完後如果finish數目與partition數目相等,說明這個stage完成了,標記stage完成,從running stages裡減去這個stage,做一些stage移除的清理工作
    • 如果task是ShuffleMapTask,計數器Accumulator加一,在stage里加上一個output location,裡面是一個MapStatus類。MapStatusShuffleMapTask執行完成的返回,包含location資訊和block size(可以選擇壓縮或未壓縮)。同時檢查該stage完成,向MapOutputTracker註冊本stage裡的shuffleId和location資訊。然後檢查stage的output location裡是否存在空,若存在空,說明一些task失敗了,整個stage重新提交;否則,繼續從waiting stages裡提交下一個需要做的stage
  • 如果task是重提交,對應的stage裡增加這個task
  • 如果task是fetch失敗,馬上標記對應的stage完成,從running stages裡減去。如果不允許retry,abort整個stage;否則,重新提交整個stage。另外,把這個fetch相關的location和map任務資訊,從stage裡剔除,從MapOutputTracker登出掉。最後,如果這次fetch的blockManagerId物件不為空,做一次ExecutorLost處理,下次shuffle會換在另一個executor上去執行。
  • 其他task狀態會由TaskScheduler處理,如Exception, TaskResultLost, commitDenied等。

3、其他與job相關的操作還包括:cancel job, cancel stage, resubmit failed stage等

其他職能:

cacheLocations 和 preferLocation

5、TaskScheduler

維護task和executor對應關係,executor和物理資源對應關係,在排隊的task和正在跑的task。

內部維護一個任務佇列,根據FIFO或Fair策略,排程任務。

TaskScheduler本身是個介面,spark裡只實現了一個TaskSchedulerImpl,理論上任務排程可以定製。

主要功能:

1、submitTasks(taskSet),接收DAGScheduler提交來的tasks

  • 為tasks建立一個TaskSetManager,新增到任務佇列裡。TaskSetManager跟蹤每個task的執行狀況,維護了task的許多具體資訊。
  • 觸發一次資源的索要。
    • 首先,TaskScheduler對照手頭的可用資源和Task佇列,進行executor分配(考慮優先順序、本地化等策略),符合條件的executor會被分配給TaskSetManager
    • 然後,得到的Task描述交給SchedulerBackend,呼叫launchTask(tasks),觸發executor上task的執行。task描述被序列化後發給executor,executor提取task資訊,呼叫task的run()方法執行計算。

2、cancelTasks(stageId),取消一個stage的tasks

  • 呼叫SchedulerBackendkillTask(taskId, executorId, ...)方法。taskId和executorId在TaskScheduler裡一直維護著。

3、resourceOffer(offers: Seq[Workers]),這是非常重要的一個方法,呼叫者是SchedulerBacnend,用途是底層資源SchedulerBackend把空餘的workers資源交給TaskScheduler,讓其根據排程策略為排隊的任務分配合理的cpu和記憶體資源,然後把任務描述列表傳回給SchedulerBackend

  • 從worker offers裡,蒐集executor和host的對應關係、active executors、機架資訊等等
  • worker offers資源列表進行隨機洗牌,任務佇列裡的任務列表依據排程策略進行一次排序
  • 遍歷每個taskSet,按照程序本地化、worker本地化、機器本地化、機架本地化的優先順序順序,為每個taskSet提供可用的cpu核數,看是否滿足
    • 預設一個task需要一個cpu,設定引數為"spark.task.cpus=1"
    • 為taskSet分配資源,校驗是否滿足的邏輯,最終在TaskSetManagerresourceOffer(execId, host, maxLocality)方法裡
    • 滿足的話,會生成最終的任務描述,並且呼叫DAGSchedulertaskStarted(task, info)方法,通知DAGScheduler,這時候每次會觸發DAGScheduler做一次submitMissingStage的嘗試,即stage的tasks都分配到了資源的話,馬上會被提交執行

4、statusUpdate(taskId, taskState, data),另一個非常重要的方法,呼叫者是SchedulerBacnend,用途是SchedulerBacnend會將task執行的狀態彙報給TaskScheduler做一些決定

  • TaskLost,找到該task對應的executor,從active executor裡移除,避免這個executor被分配到其他task繼續失敗下去。
  • task finish包括四種狀態:finished, killed, failed, lost。只有finished是成功執行完成了。其他三種是失敗。
  • task成功執行完,呼叫TaskResultGetter.enqueueSuccessfulTask(taskSet, tid, data),否則呼叫TaskResultGetter.enqueueFailedTask(taskSet, tid, state, data)TaskResultGetter內部維護了一個執行緒池,負責非同步fetch task執行結果並反序列化。預設開四個執行緒做這件事,可配引數"spark.resultGetter.threads"=4

TaskResultGetter取task result的邏輯

1、對於success task,如果taskResult裡的資料是直接結果資料,直接把data反序列出來得到結果;如果不是,會呼叫blockManager.getRemoteBytes(blockId)從遠端獲取。如果遠端取回的資料是空的,那麼會呼叫TaskScheduler.handleFailedTask,告訴它這個任務是完成了的但是資料是丟失的。否則,取到資料之後會通知BlockManagerMaster移除這個block資訊,呼叫TaskScheduler.handleSuccessfulTask,告訴它這個任務是執行成功的,並且把result data傳回去。

2、對於failed task,從data裡解析出fail的理由,呼叫TaskScheduler.handleFailedTask,告訴它這個任務失敗了,理由是什麼。

6、SchedulerBackend

TaskScheduler下層,用於對接不同的資源管理系統,SchedulerBackend是個介面,需要實現的主要方法如下:

def start(): Unit
def stop(): Unit
def reviveOffers(): Unit // 重要方法:SchedulerBackend把自己手頭上的可用資源交給TaskScheduler,TaskScheduler根據排程策略分配給排隊的任務嗎,返回一批可執行的任務描述,SchedulerBackend負責launchTask,即最終把task塞到了executor模型上,executor裡的執行緒池會執行task的run()
def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit =
    throw new UnsupportedOperationException

粗粒度:程序常駐的模式,典型代表是standalone模式,mesos粗粒度模式,yarn

細粒度:mesos細粒度模式

這裡討論粗粒度模式,更好理解:CoarseGrainedSchedulerBackend

維護executor相關資訊(包括executor的地址、通訊埠、host、總核數,剩餘核數),手頭上executor有多少被註冊使用了,有多少剩餘,總共還有多少核是空的等等。

主要職能

1、Driver端主要通過actor監聽和處理下面這些事件:

  • RegisterExecutor(executorId, hostPort, cores, logUrls)。這是executor新增的來源,通常worker拉起、重啟會觸發executor的註冊。CoarseGrainedSchedulerBackend把這些executor維護起來,更新內部的資源資訊,比如總核數增加。最後呼叫一次makeOffer(),即把手頭資源丟給TaskScheduler去分配一次,返回任務描述回來,把任務launch起來。這個makeOffer()的呼叫會出現在任何與資源變化相關的事件中,下面會看到。
  • StatusUpdate(executorId, taskId, state, data)。task的狀態回撥。首先,呼叫TaskScheduler.statusUpdate上報上去。然後,判斷這個task是否執行結束了,結束了的話把executor上的freeCore加回去,呼叫一次makeOffer()
  • ReviveOffers。這個事件就是別人直接向SchedulerBackend請求資源,直接呼叫makeOffer()
  • KillTask(taskId, executorId, interruptThread)。這個killTask的事件,會被髮送給executor的actor,executor會處理KillTask這個事件。
  • StopExecutors。通知每一個executor,處理StopExecutor事件。
  • RemoveExecutor(executorId, reason)。從維護資訊中,那這堆executor涉及的資源數減掉,然後呼叫TaskScheduler.executorLost()方法,通知上層我這邊有一批資源不能用了,你處理下吧。TaskScheduler會繼續把executorLost的事件上報給DAGScheduler,原因是DAGScheduler關心shuffle任務的output location。DAGScheduler會告訴BlockManager這個executor不可用了,移走它,然後把所有的stage的shuffleOutput資訊都遍歷一遍,移走這個executor,並且把更新後的shuffleOutput資訊註冊到MapOutputTracker上,最後清理下本地的CachedLocationsMap。

2、reviveOffers()方法的實現。直接呼叫了makeOffers()方法,得到一批可執行的任務描述,呼叫launchTasks

3、launchTasks(tasks: Seq[Seq[TaskDescription]])方法。

  • 遍歷每個task描述,序列化成二進位制,然後傳送給每個對應的executor這個任務資訊
    • 如果這個二進位制資訊太大,超過了9.2M(預設的akkaFrameSize 10M 減去 預設 為akka留空的200K),會出錯,abort整個taskSet,並列印提醒增大akka frame size
    • 如果二進位制資料大小可接受,傳送給executor的actor,處理LaunchTask(serializedTask)事件。

7、Executor

Executor是spark裡的程序模型,可以套用到不同的資源管理系統上,與SchedulerBackend配合使用。

內部有個執行緒池,有個running tasks map,有個actor,接收上面提到的由SchedulerBackend發來的事件。

事件處理

  1. launchTask。根據task描述,生成一個TaskRunner執行緒,丟盡running tasks map裡,用執行緒池執行這個TaskRunner
  2. killTask。從running tasks map裡拿出執行緒物件,調它的kill方法。
回到頂部

三、Spark在不同叢集中的執行架構

Spark注重建立良好的生態系統,它不僅支援多種外部檔案儲存系統,提供了多種多樣的叢集執行模式。部署在單臺機器上時,既可以用本地(Local)模式執行,也可以使用偽分散式模式來執行;當以分散式叢集部署的時候,可以根據自己叢集的實際情況選擇Standalone模式(Spark自帶的模式)、YARN-Client模式或者YARN-Cluster模式。Spark的各種執行模式雖然在啟動方式、執行位置、排程策略上各有不同,但它們的目的基本都是一致的,就是在合適的位置安全可靠的根據使用者的配置和Job的需要執行和管理Task。

3.1 Spark on Standalone執行過程

Standalone模式是Spark實現的資源排程框架,其主要的節點有Client節點、Master節點和Worker節點。其中Driver既可以執行在Master節點上中,也可以執行在本地Client端。當用spark-shell互動式工具提交Spark的Job時,Driver在Master節點上執行;當使用spark-submit工具提交Job或者在Eclips、IDEA等開發平臺上使用”new SparkConf().setMaster(“spark://master:7077”)”方式執行Spark任務時,Driver是執行在本地Client端上的。

執行過程文字說明

1、我們提交一個任務,任務就叫Application
2、初始化程式的入口SparkContext,
  2.1 初始化DAG Scheduler
  2.2 初始化Task Scheduler
3、Task Scheduler向master去進行註冊並申請資源(CPU Core和Memory)
4、Master根據SparkContext的資源申請要求和Worker心跳週期內報告的資訊決定在哪個Worker上分配資源,然後在該Worker上獲取資源,然後啟動StandaloneExecutorBackend;順便初
始化好了一個執行緒池
5、StandaloneExecutorBackend向Driver(SparkContext)註冊,這樣Driver就知道哪些Executor為他進行服務了。
  到這個時候其實我們的初始化過程基本完成了,我們開始執行transformation的程式碼,但是程式碼並不會真正的執行,直到我們遇到一個action操作。生產一個job任務,進行stage的劃分
6、SparkContext將Applicaiton程式碼傳送給StandaloneExecutorBackend;並且SparkContext解析Applicaiton程式碼,構建DAG圖,並提交給DAG Scheduler分解成Stage(當碰到Action操作 時,就會催生Job;每個Job中含有1個或多個Stage,Stage一般在獲取外部資料和shuffle之前產生)。
7、將Stage(或者稱為TaskSet)提交給Task Scheduler。Task Scheduler負責將Task分配到相應的Worker,最後提交給StandaloneExecutorBackend執行;
8、對task進行序列化,並根據task的分配演算法,分配task
9、對接收過來的task進行反序列化,把task封裝成一個執行緒
10、開始執行Task,並向SparkContext報告,直至Task完成。
11、資源登出

執行過程圖形說明

3.2 Spark on YARN執行過程

YARN是一種統一資源管理機制,在其上面可以執行多套計算框架。目前的大資料技術世界,大多數公司除了使用Spark來進行資料計算,由於歷史原因或者單方面業務處理的效能考慮而使用著其他的計算框架,比如MapReduce、Storm等計算框架。Spark基於此種情況開發了Spark on YARN的執行模式,由於藉助了YARN良好的彈性資源管理機制,不僅部署Application更加方便,而且使用者在YARN叢集中執行的服務和Application的資源也完全隔離,更具實踐應用價值的是YARN可以通過佇列的方式,管理同時執行在叢集中的多個服務。

Spark on YARN模式根據Driver在叢集中的位置分為兩種模式:一種是YARN-Client模式,另一種是YARN-Cluster(或稱為YARN-Standalone模式)。

3.2.1 YARN框架流程

任何框架與YARN的結合,都必須遵循YARN的開發模式。在分析Spark on YARN的實現細節之前,有必要先分析一下YARN框架的一些基本原理。

參考:http://www.cnblogs.com/qingyunzong/p/8615096.html

3.2.2 YARN-Client

Yarn-Client模式中,Driver在客戶端本地執行,這種模式可以使得Spark Application和客戶端進行互動,因為Driver在客戶端,所以可以通過webUI訪問Driver的狀態,預設是http://hadoop1:4040訪問,而YARN通過http:// hadoop1:8088訪問。

YARN-client的工作流程分為以下幾個步驟:

文字說明

1.Spark Yarn Client向YARN的ResourceManager申請啟動Application Master。同時在SparkContent初始化中將建立DAGScheduler和TASKScheduler等,由於我們選擇的是Yarn-Client模式,程式會選擇YarnClientClusterScheduler和YarnClientSchedulerBackend;

2.ResourceManager收到請求後,在叢集中選擇一個NodeManager,為該應用程式分配第一個Container,要求它在這個Container中啟動應用程式的ApplicationMaster,與YARN-Cluster區別的是在該ApplicationMaster不執行SparkContext,只與SparkContext進行聯絡進行資源的分派;

3.Client中的SparkContext初始化完畢後,與ApplicationMaster建立通訊,向ResourceManager註冊,根據任務資訊向ResourceManager申請資源(Container);

4.一旦ApplicationMaster申請到資源(也就是Container)後,便與對應的NodeManager通訊,要求它在獲得的Container中啟動啟動CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend啟動後會向Client中的SparkContext註冊並申請Task;

5.Client中的SparkContext分配Task給CoarseGrainedExecutorBackend執行,CoarseGrainedExecutorBackend執行Task並向Driver彙報執行的狀態和進度,以讓Client隨時掌握各個任務的執行狀態,從而可以在任務失敗時重新啟動任務;

6.應用程式執行完成後,Client的SparkContext向ResourceManager申請登出並關閉自己。

圖片說明

3.2.3 YARN-Cluster

在YARN-Cluster模式中,當用戶向YARN中提交一個應用程式後,YARN將分兩個階段執行該應用程式:第一個階段是把Spark的Driver作為一個ApplicationMaster在YARN叢集中先啟動;第二個階段是由ApplicationMaster建立應用程式,然後為它向ResourceManager申請資源,並啟動Executor來執行Task,同時監控它的整個執行過程,直到執行完成。

YARN-cluster的工作流程分為以下幾個步驟:

文字說明

1.Spark Yarn Client向YARN中提交應用程式,包括ApplicationMaster程式、啟動ApplicationMaster的命令、需要在Executor中執行的程式等;

2.ResourceManager收到請求後,在叢集中選擇一個NodeManager,為該應用程式分配第一個Container,要求它在這個Container中啟動應用程式的ApplicationMaster,其中ApplicationMaster進行SparkContext等的初始化;

3.ApplicationMaster向ResourceManager註冊,這樣使用者可以直接通過ResourceManage檢視應用程式的執行狀態,然後它將採用輪詢的方式通過RPC協議為各個任務申請資源,並監控它們的執行狀態直到執行結束;

4.一旦ApplicationMaster申請到資源(也就是Container)後,便與對應的NodeManager通訊,要求它在獲得的Container中啟動啟動CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend啟動後會向ApplicationMaster中的SparkContext註冊並申請Task。這一點和Standalone模式一樣,只不過SparkContext在Spark Application中初始化時,使用CoarseGrainedSchedulerBackend配合YarnClusterScheduler進行任務的排程,其中YarnClusterScheduler只是對TaskSchedulerImpl的一個簡單包裝,增加了對Executor的等待邏輯等;

5.ApplicationMaster中的SparkContext分配Task給CoarseGrainedExecutorBackend執行,CoarseGrainedExecutorBackend執行Task並向ApplicationMaster彙報執行的狀態和進度,以讓ApplicationMaster隨時掌握各個任務的執行狀態,從而可以在任務失敗時重新啟動任務;

6.應用程式執行完成後,ApplicationMaster向ResourceManager申請登出並關閉自己。

圖片說明

3.2.4 YARN-Client與YARN-Cluster區別

理解YARN-Client和YARN-Cluster深層次的區別之前先清楚一個概念:Application Master。在YARN中,每個Application例項都有一個ApplicationMaster程序,它是Application啟動的第一個容器。它負責和ResourceManager打交道並請求資源,獲取資源之後告訴NodeManager為其啟動Container。從深層次的含義講YARN-Cluster和YARN-Client模式的區別其實就是ApplicationMaster程序的區別。

1、YARN-Cluster模式下,Driver執行在AM(Application Master)中,它負責向YARN申請資源,並監督作業的執行狀況。當用戶提交了作業之後,就可以關掉Client,作業會繼續在YARN上執行,因而YARN-Cluster模式不適合執行互動型別的作業;

2、YARN-Client模式下,Application Master僅僅向YARN請求Executor,Client會和請求的Container通訊來排程他們工作,也就是說Client不能離開。