1. 程式人生 > >【Flink】Flink作業排程流程分析

【Flink】Flink作業排程流程分析

1. 概述

當向Flink叢集提交使用者作業時,從使用者角度看,只需要作業處理邏輯正確,輸出正確的結果即可;而不用關心作業何時被排程的,作業申請的資源又是如何被分配的以及作業何時會結束;但是瞭解作業在執行時的具體行為對於我們深入瞭解Flink原理有非常大的幫助,並且對我們如何編寫更合理的作業邏輯有指導意義,因此本文詳細分析作業的排程及資源分配以及作業的生命週期。

2. 流程分析

基於社群master主線(1.11-SNAPSHOT),commit: 12f7873db54cfbc5bf853d66ccd4093f9b749c9a ,HA基於ZK實現分析


上圖概括了Flink作業從Client端提交到到Flink叢集的提交的基本流程[1]。

當執行./flink run指令碼提交使用者作業至Dispathcer後,Dispatcher會拉起JobManagerRunner,而後JobManagerRunner會向Zookeeper註冊競爭Leader。對於之前流程有興趣可以參考深入理解Flink-On-Yarn模式

JobManagerRunner競爭成為Leader時,會呼叫JobManagerRunnerImpl#grantLeadership,此時便開始處理作業,會通過如下的程式碼呼叫路徑啟動JobMaster。

  • JobManagerRunnerImpl#grantLeadership
  • JobManagerRunnerImpl#verifyJobSchedulingStatusAndStartJobManager
  • JobManagerRunnerImpl#startJobMaster。
    startJobMaster方法會首先將該作業的ID寫入對應的ZK目錄並置為RUNNING狀態,寫入該目錄可用於在Dispathcer接收作業時,判斷該作業是否重複提交或恢復作業時使用;在JobManagerRunner排程作業時也在從ZK上拉取作業資訊來判斷作業狀態,若為DONE狀態,則無需排程。啟動JobMaster時會先啟動其RPC Endpoint,以便與其他元件進行RPC呼叫,之後JobMaster便通過JobMaster#startJobExecution開始執行作業,執行作業前會有些前置校驗,如必須確保執行在主執行緒中;啟動JobMaster上的一些服務(元件),如TaskManager和ResourceManager的心跳管理;啟動SlotPool、Scheduler;重連至ResourceManager,並且在ZK中註冊監聽ResourceManager Leader的變化的Retriever等。
    當初始化完JobMaster上相應服務(元件)後,便開始排程,會有如下程式碼呼叫路徑
  • JobMaster#start
  • JobMaster#startJobExecution
  • JobMaster#resetAndStartScheduler
  • JobMaster#startScheduling
  • SchedulerBase#startScheduling。

我們知道使用者編寫的作業是以JobGraph提交到Dispatcher,但是在實際排程時會將JobGraph轉化為ExecutionGraph,JobGraph生成ExecutionGraph是在SchedulerBase物件初始化的時候完成轉化,如下圖所示表示了典型的轉化過程(JobVertex與ExecutionJobVertex一一對應),而具體的轉化邏輯實現可參考如何生成ExecutionGraph及物理執行圖

在SchedulerBase初始化時生成ExecutionGraph後,之後便基於ExecutionGraph排程,而排程基類SchedulerBase預設實現為DefaultScheduler,會繼續通過DefaultScheduler#startSchedulingInternal排程作業,此時會將作業(ExecutionGraph)的狀態從CREATED狀態變更為RUNNING狀態,此時在Flink web介面檢視任務的狀態便已經為RUNNING,但注意此時作業(各頂點)實際並未開始排程,頂點還是處於CREATED狀態,任作業狀態與頂點狀態不完全相關聯,有其各自的演化生命週期,具體可參考Flink作業排程[2];然後根據不同的策略EagerSchedulingStrategy(主要用於流式作業,所有頂點(ExecutionVertex)同時開始排程)和LazyFromSourcesSchedulingStrategy(主要用於批作業,從Source開始開始排程,其他頂點延遲排程)排程。

當提交流式作業時,會有如下程式碼呼叫路徑:

  • EagerSchedulingStrategy#startScheduling
  • EagerSchedulingStrategy#allocateSlotsAndDeploy,在部署之前會根據待部署的ExecutionVertex生成對應的ExecutionVertexDeploymentOption,然後呼叫DefaultScheduler#allocateSlotsAndDeploy開始部署。同樣,在部署之前也需要進行一些前置校驗(ExecutionVertex對應的Execution的狀態必須為CREATED),接著將待部署的ExecutionVertex對應的Execution狀態變更為SCHEDULED,然後開始為ExecutionVertex分配Slot。會有如下的呼叫程式碼路徑:
  • DefaultScheduler#allocateSlots(該過程會ExecutionVertex轉化為ExecutionVertexSchedulingRequirements,會封裝包含一些location資訊、sharing資訊、資源資訊等)
  • DefaultExecutionSlotAllocator#allocateSlotsFor,該方法會開始逐一非同步部署各ExecutionVertex,部署也是根據不同的Slot提供策略來分配,接著會經過如下程式碼呼叫路徑層層轉發,SlotProviderStrategy#allocateSlot -> SlotProvider#allocateSlot(SlotProvider預設實現為SchedulerImpl) -> SchedulerImpl#allocateSlotInternal -> SchedulerImpl#internalAllocateSlot(該方法會根據vertex是否共享slot來分配singleSlot/SharedSlot),以singleSlot為例說明。
    在分配slot時,首先會在JobMaster中SlotPool中進行分配,具體是先SlotPool中獲取所有slot,然後嘗試選擇一個最合適的slot進行分配,這裡的選擇有兩種策略,即按照位置優先和按照之前已分配的slot優先;若從SlotPool無法分配,則通過RPC請求向ResourceManager請求slot,若此時並未連線上ResourceManager,則會將請求快取起來,待連線上ResourceManager後再申請。

當ResourceManager收到申請slot請求時,若發現該JobManager未註冊,則直接丟擲異常;否則將請求轉發給SlotManager處理,SlotManager中維護了叢集所有空閒的slot(TaskManager會向ResourceManager上報自己的資訊,在ResourceManager中由SlotManager儲存Slot和TaskManager對應關係),並從其中找出符合條件的slot,然後向TaskManager傳送RPC請求申請對應的slot。

等待所有的slot申請完成後,然後會將ExecutionVertex對應的Execution分配給對應的Slot,即從Slot中分配對應的資源給Execution,完成分配後可開始部署作業。
部署作業程式碼呼叫路徑如下:

  • DefaultScheduler#waitForAllSlotsAndDeploy
  • DefaultScheduler#deployAll
  • DefaultScheduler#deployOrHandleError
  • DefaultScheduler#deployTaskSafe
  • DefaultExecutionVertexOperations#deploy
  • ExecutionVertex#deploy
  • Execution#deploy(每次排程ExecutionVertex,都會有一個Execute,在此階段會將Execution的狀態變更為DEPLOYING狀態,並且為該ExecutionVertex生成對應的部署描述資訊,然後從對應的slot中獲取對應的TaskManagerGateway,以便向對應的TaskManager提交Task)
  • RpcTaskManagerGateway#submitTask(此時便將Task通過RPC提交給了TaskManager)。

TaskManager(TaskExecutor)在接收到提交Task的請求後,會經過一些初始化(如從BlobServer拉取檔案,反序列化作業和Task資訊、LibaryCacheManager等),然後這些初始化的資訊會用於生成Task(Runnable物件),然後啟動該Task,其程式碼呼叫路徑如下 Task#startTaskThread(啟動Task執行緒)-> Task#run(將ExecutionVertex狀態變更為RUNNING狀態,此時在FLINK web前臺檢視頂點狀態會變更為RUNNING狀態,另外還會生成了一個AbstractInvokable物件,該物件是FLINK銜接執行使用者程式碼的關鍵,而後會經過如下呼叫

  • AbstractInvokable#invoke(AbstractInvokable有幾個關鍵的子類實現, BatchTask/BoundedStreamTask/DataSinkTask/DataSourceTask/StreamTask/SourceStreamTask。對於streaming型別的Source,會呼叫StreamTask#invoke)
  • StreamTask#invoke
  • StreamTask#beforeInvoke
  • StreamTask#initializeStateAndOpen(初始化狀態和進行初始化,這裡會呼叫使用者的open方法(如自定義實現的source))-> StreamTask#runMailboxLoop,便開始處理Source端消費的資料,並流入下游運算元處理。

至此作業從提交到資源分配及排程執行整體流程就已經分析完畢,對於流式作業而言,正常情況下其會一直執行,不會結束。

3. 總結

對於作業的執行,會先提交至Dispatcher,由Dispatcher拉起JobManagerRunner,在JobManagerRunner成為Leader後,便開始處理作業,首先會根據JobGraph生成對應的ExecutionGraph,然後開始排程,作業的狀態首先會變更為RUNNING,然後對各ExecutionVertex申請slot,申請slot會涉及JM與RM、TM之間的通訊,當在TM上分配完slot後,便可將Task提交至TaskManager,然後TaskManager會為每個提交的Task生成一個單獨的執行緒處理。

參考

  1. https://www.infoq.cn/article/RWTM9o0SHHV3Xr8o8giT
  2. https://flink.sojb.cn/internals/job_scheduling.html