1. 程式人生 > >Flink的Job啟動JobManager端(原始碼分析)

Flink的Job啟動JobManager端(原始碼分析)

通過前面的文章瞭解到

Driver將使用者程式碼轉換成streamGraph再轉換成Jobgraph後向Jobmanager端提交

JobManager啟動以後會在Dispatcher.java起來RPC方法submitJob(jobGraph),用於接收來自Driver轉化得到的JobGraph來啟動任務

具體來看jobGraph提交到JobManager的submitJob方法

前面都是一些呼叫鏈沒有什麼好講的,最後到createJobManager( )方法這裡

先看一下1,建立了一個jobmanagerRunner並且將中Driver端得到的JobGraph傳遞了進去

 在建立JobManagerRunner的過程中它呼叫了

這裡主要是為了建立一個jobMaster,在jobMaster的構造方法中

在這裡它先是create傳入了jobgraph然後又通過createAndRestoreExecutionGraph()方法轉換得到executionGraph

這個executionGraph就可以用來排程啟動任務了

具體看一下他的轉化邏輯

可以看到它從createExecutionGraph方法中得到了executionGraph

並且通過getCheckpointCoordinator()方法得到了一個coordinator(主要是用於週期性觸發checkpoint,呼叫對應TaskManager的rpc生成barriers往下游傳送)

繼續看一下他的轉化邏輯

在createExecutionGraph中通過ExecutionGraphBuilder.buildGraph()返回了一個executionGraph

在buildGraph()方法中

建立了一個executionGraph

 

為executionGraph設定一些基礎資訊,包括排程方式等(這裡stream是eager的排程方法)

然後

1處得到了一個的拓撲圖包含了所有jobGraph的所有jobVertex節點

2處就是具體遍歷所有jobGraph的jobVertex生成executionGraph的頂點ExecutionJobVertex

遍歷所有jobGraph的頂點jobVertex

在這裡就具體生成了ExecutionJobVertex中的每一個ExecutionVertex[] taskVertices

當然這裡還會配置很多ExecutionGraph的資訊,就不一一列舉了

配置了一些ExecutionGraph的屬性以後

呼叫了

可以看到我的註釋,就是說這個地方其實是和coordinator的建立有關,在這個方法中

建立了一個coordinator物件

在這裡註冊了一個JobStatus的監聽

來看一下這個監聽的作用

可以看到原始碼上的註解就是說用於監聽job狀態的改變,具體監聽

看到這裡就非常明顯了

當監聽到jobstutes的狀態改變時

當jobstatus變成Running時呼叫了coordinator的.startCheckpointScheduler()方法其中

這裡可以看到建立了一個週期的排程執行緒

看下執行緒的run方法

這裡就真相大白了,呼叫了triggerCheckpoint方法觸發一次checkpoint(觸發checkpoint的邏輯以後隨緣更新到再講)

注意,前面說到只是註冊了一個監聽,也就是說這個coordinator現在其實還沒有啟動起來的!!要到監聽到jobStatus變成running才會啟動

回到最開始的這裡

1處轉化成executionGraph以後

2處具體看一下這個startJobManagerRunner()方法

把jobManager啟動了起來

 

在其中

啟動了這個jobMasterService

在這裡開啟了jobmaster的一些RPC,像什麼cancel job的stop job 的還有register TM的

然後startJobExecution()方法中

這裡其實會向jobManager中啟動的resourceManager的RPC請求solt資訊初始化自己的的soltPool這裡不細講了,我還沒有研究

後面

這個地方就是修改job狀態和排程運行了

其中呼叫了scheduleExecutionGraph(),在其中又呼叫了

這個地方比較重要,在其中先

\

這裡它就通過CAS修改了jobStatue從Created變成了Running

修改完了以後還沒完,還通過這個方法notifyJobStatusChange(),這個方法裡面具體看一看

他遍歷了所有的listener,也就是說會觸發我們前面註冊的那個coordinator的監聽監聽到job狀態改變為running

這裡coordinator就啟動完成了

繼續往下,在修改完job狀態以後

因為流模式這裡是用的EAGER,flink批處理我不熟這裡就不展開了

在這個schduleEager方法中

然後

看到這裡它建立了一個TaskDeploymentDescriptor一個用於排程TaskManager端任務的tdd物件

看過前面幾篇部落格的同學,就應該有印象了,在TaskManager啟動會啟動很多的RPC介面

其中有一個

一目瞭然了,這個東西是用來發送給TaskManager用於啟動TaskManager端任務的!!!!

到這裡jobManager端的job啟動任務就差不多完成了

接下來就是TaskManager端的任務了,隨緣更新的時候在說一下真正TaskManager節點是如何啟動我們job任