1. 程式人生 > 其它 >Spark原始碼系列(四)圖解作業生命週期

Spark原始碼系列(四)圖解作業生命週期

這一章我們探索了Spark作業的執行過程,但是沒把整個過程描繪出來,好,跟著我走吧,let you know!

我們先回顧一下這個圖,Driver Program是我們寫的那個程式,它的核心是SparkContext,回想一下,從api的使用角度,RDD都必須通過它來獲得。

下面講一講它所不為認知的一面,它和其它元件是如何互動的。

Driver向Master註冊Application過程

SparkContext例項化之後,在內部例項化兩個很重要的類,DAGScheduler和TaskScheduler。

在standalone的模式下,TaskScheduler的實現類是TaskSchedulerImpl,在初始化它的時候SparkContext會傳入一個SparkDeploySchedulerBackend。

在SparkDeploySchedulerBackend的start方法裡面啟動了一個AppClient。

    val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, 
            classPathEntries, libraryPathEntries, extraJavaOpts)
    val sparkHome = sc.getSparkHome()
    val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
                 sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))
    client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
    client.start()

maxCores是由引數spark.cores.max來指定的,executorMemoy是由spark.executor.memory指定的。

AppClient啟動之後就會去向Master註冊Applicatoin了,後面的過程我用圖來表達。

上面的圖中涉及到了三方通訊,具體的過程如下:

1、Driver通過AppClient向Master傳送了RegisterApplication訊息來註冊Application,Master收到訊息之後會發送RegisteredApplication通知Driver註冊成功,Driver的接收類還是AppClient。

2、Master接受到RegisterApplication之後會觸發排程過程,在資源足夠的情況下會向Woker和Driver分別傳送LaunchExecutor、ExecutorAdded訊息。

3、Worker接收到LaunchExecutor訊息之後,會執行訊息中攜帶的命令,執行CoarseGrainedExecutorBackend類(圖中僅以它繼承的介面ExecutorBackend代替),執行完畢之後會發送ExecutorStateChanged訊息給Master。

4、Master接收ExecutorStateChanged之後,立即傳送ExecutorUpdated訊息通知Driver。

5、Driver中的AppClient接收到Master發過來的ExecutorAdded和ExecutorUpdated後進行相應的處理。

6、啟動之後的CoarseGrainedExecutorBackend會向Driver傳送RegisterExecutor訊息。

7、Driver中的SparkDeploySchedulerBackend(具體程式碼在CoarseGrainedSchedulerBackend裡面)接收到RegisterExecutor訊息,回覆註冊成功的訊息RegisteredExecutor給ExecutorBackend,並且立馬準備給它傳送任務。

8、CoarseGrainedExecutorBackend接收到RegisteredExecutor訊息之後,例項化一個Executor等待任務的到來。

資源的排程

好,在我們講完了整個註冊Application的通訊過程之後,其中一個比較重要的地方是它的排程這塊,它是怎麼排程的?這也是我在前面為什麼那麼強調maxCores和executorMemoy的原因。

細心的讀者如果看了第一章《spark-submit提交作業過程》的就知道,其實我已經講過排程了,因為當時不知道這個app是啥。但是現在我們知道app是啥了。程式碼我不就貼了,總結一下吧。

1、先排程Driver,再排程Application。

2、它的排程Application的方式是先進先出,所以就不要奇怪為什麼你的App總得不到排程了,就像去北京的醫院看病,去晚了號就沒了,是一個道理。

3、Executor的分配方式有兩種,一種是傾向於把任務分散在多個節點上,一種是在儘量少的節點上執行,由引數spark.deploy.spreadOut引數來決定的,預設是true,把任務分散到多個節點上。

遍歷所有等待的Application,給它分配Executor執行的Worker,預設分配方式如下:

1、先從workers當中選出記憶體大於executorMemoy的worker,並且按照空閒cpu數從大到小的順序來排序。

2、遍歷worker,從可用的worker分配需要的cpu數,每個worker提供一個cpu核心,直到cpu數不足或者maxCores分配完畢。

3、給選出來的worker傳送任務,讓它們啟動Executor,每個Executor佔用的記憶體是我們設定的executorMemoy。

資源排程的過程大體是這樣的,說到這裡有些童鞋在有點兒疑惑了,那我們任務排程裡面FIFO/FAIR排程是在哪裡用的?任務排程器排程的不是Application,而是你的程式碼裡面被解析出來的所有Task,這在上一章當中有提到。

基於這個原因,在共用SparkContext的情況下,比如Shark、JobServer什麼的,任務排程器的作用才會明顯。

Driver向Executor釋出Task過程

下面我們講一講Driver向Executor釋出Task過程,這在上一章是講過的,現在把圖給大家放出來了。

1、Driver程式的程式碼執行到action操作,觸發了SparkContext的runJob方法。

2、SparkContext比較懶,轉手就交給DAGScheduler。

3、DAGScheduler把Job劃分stage,然後把stage轉化為相應的Tasks,把Tasks交給TaskScheduler。

4、通過TaskScheduler把Tasks新增到任務隊列當中,轉手就交給SchedulerBackend了。

5、排程器給Task分配執行Executor,ExecutorBackend負責執行Task了。

補充:ExecutorBackend執行Task,是通過它內部的Executor來執行的,Executor內部有個執行緒池,new了一個TaskRunner交給執行緒池了。

Task狀態更新

Task執行是通過TaskRunner來執行,它需要通過ExecutorBackend和Driver通訊,通訊訊息是StatusUpdate:

1、Task執行之前,告訴Driver當前Task的狀態為TaskState.RUNNING。

2、Task執行之後,告訴Driver當前Task的狀態為TaskState.FINISHED,並返回計算結果。

3、如果Task執行過程中發生錯誤,告訴Driver當前Task的狀態為TaskState.FAILED,並返回錯誤原因。

4、如果Task在中途被Kill掉了,告訴Driver當前Task的狀態為TaskState.FAILED。

下面講的是執行成功的狀態,具體過程以文字為主。

1、Task執行結束之後,呼叫ExecutorBackend的statusUpdate方法,把結果返回。結果超過10M,就把結果儲存在blockManager處,返回blockId,需要的時候通過blockId到blockManager認領。

2、ExecutorBackend直接向Driver傳送StatusUpdate返回Task的資訊。

3、Driver(這裡具體指的是SchedulerBackend)接收到StatusUpdate訊息之後,呼叫TaskScheduler的statusUpdate方法,然後準備給ExecutorBackend傳送下一批Task。

4、TaskScheduler通過TaskId找到管理這個Task的TaskSetManager(負責管理一批Task的類),從TaskSetManager裡面刪掉這個Task,並把Task插入到TaskResultGetter(負責獲取Task結果的類)的成功佇列裡。

5、TaskResultGetter獲取到結果之後,呼叫TaskScheduler的handleSuccessfulTask方法把結果返回。

6、TaskScheduler呼叫TaskSetManager的handleSuccessfulTask方法,處理成功的Task。

7、TaskSetManager呼叫DAGScheduler的taskEnded方法,告訴DAGScheduler這個Task執行結束了,如果這個時候Task全部成功了,就會結束TaskSetManager。

8、DAGScheduler在taskEnded方法裡觸發CompletionEvent事件,CompletionEvent分ResultTask和ShuffleMapTask來處理。

  1)ResultTask:job的numFinished加1,如果numFinished等於它的分片數,則表示任務該Stage結束,標記這個Stage為結束,最後呼叫JobListener(具體實現在JobWaiter)的taskSucceeded方法,把結果交給resultHandler(經過包裝的自己寫的那個匿名函式)處理,如果完成的Task數量等於總任務數,任務退出。

  2)ShuffleMapTask:

   (1)呼叫Stage的addOutputLoc方法,把結果新增到Stage的outputLocs列表裡。

   (2)如果該Stage沒有等待的Task了,就標記該Stage為結束。

   (3)把Stage的outputLocs註冊到MapOutputTracker裡面,留個下一個Stage用。

   (4)如果Stage的outputLocs為空,表示它的計算失敗,重新提交Stage。

     (5)找出下一個在等待並且沒有父親的Stage提交。