Flink的Job啟動JobManager端(原始碼分析)
通過前面的文章瞭解到
Driver將使用者程式碼轉換成streamGraph再轉換成Jobgraph後向Jobmanager端提交
JobManager啟動以後會在Dispatcher.java起來RPC方法submitJob(jobGraph),用於接收來自Driver轉化得到的JobGraph來啟動任務
具體來看jobGraph提交到JobManager的submitJob方法
前面都是一些呼叫鏈沒有什麼好講的,最後到createJobManager( )方法這裡
先看一下1,建立了一個jobmanagerRunner並且將中Driver端得到的JobGraph傳遞了進去
在建立JobManagerRunner的過程中它呼叫了
這裡主要是為了建立一個jobMaster,在jobMaster的構造方法中
在這裡它先是create傳入了jobgraph然後又通過createAndRestoreExecutionGraph()方法轉換得到executionGraph
這個executionGraph就可以用來排程啟動任務了
具體看一下他的轉化邏輯
可以看到它從createExecutionGraph方法中得到了executionGraph
並且通過getCheckpointCoordinator()方法得到了一個coordinator(主要是用於週期性觸發checkpoint,呼叫對應TaskManager的rpc生成barriers往下游傳送)
繼續看一下他的轉化邏輯
在createExecutionGraph中通過ExecutionGraphBuilder.buildGraph()返回了一個executionGraph
在buildGraph()方法中
建立了一個executionGraph
為executionGraph設定一些基礎資訊,包括排程方式等(這裡stream是eager的排程方法)
然後
1處得到了一個的拓撲圖包含了所有jobGraph的所有jobVertex節點
2處就是具體遍歷所有jobGraph的jobVertex生成executionGraph的頂點ExecutionJobVertex
遍歷所有jobGraph的頂點jobVertex
在這裡就具體生成了ExecutionJobVertex中的每一個ExecutionVertex[] taskVertices
當然這裡還會配置很多ExecutionGraph的資訊,就不一一列舉了
配置了一些ExecutionGraph的屬性以後
呼叫了
可以看到我的註釋,就是說這個地方其實是和coordinator的建立有關,在這個方法中
建立了一個coordinator物件
在這裡註冊了一個JobStatus的監聽
來看一下這個監聽的作用
可以看到原始碼上的註解就是說用於監聽job狀態的改變,具體監聽
看到這裡就非常明顯了
當監聽到jobstutes的狀態改變時
當jobstatus變成Running時呼叫了coordinator的.startCheckpointScheduler()方法其中
這裡可以看到建立了一個週期的排程執行緒
看下執行緒的run方法
這裡就真相大白了,呼叫了triggerCheckpoint方法觸發一次checkpoint(觸發checkpoint的邏輯以後隨緣更新到再講)
注意,前面說到只是註冊了一個監聽,也就是說這個coordinator現在其實還沒有啟動起來的!!要到監聽到jobStatus變成running才會啟動
回到最開始的這裡
1處轉化成executionGraph以後
2處具體看一下這個startJobManagerRunner()方法
把jobManager啟動了起來
在其中
啟動了這個jobMasterService
在這裡開啟了jobmaster的一些RPC,像什麼cancel job的stop job 的還有register TM的
然後startJobExecution()方法中
這裡其實會向jobManager中啟動的resourceManager的RPC請求solt資訊初始化自己的的soltPool這裡不細講了,我還沒有研究
後面
這個地方就是修改job狀態和排程運行了
其中呼叫了scheduleExecutionGraph(),在其中又呼叫了
這個地方比較重要,在其中先
\
這裡它就通過CAS修改了jobStatue從Created變成了Running
修改完了以後還沒完,還通過這個方法notifyJobStatusChange(),這個方法裡面具體看一看
他遍歷了所有的listener,也就是說會觸發我們前面註冊的那個coordinator的監聽監聽到job狀態改變為running
這裡coordinator就啟動完成了
繼續往下,在修改完job狀態以後
因為流模式這裡是用的EAGER,flink批處理我不熟這裡就不展開了
在這個schduleEager方法中
然後
看到這裡它建立了一個TaskDeploymentDescriptor一個用於排程TaskManager端任務的tdd物件
看過前面幾篇部落格的同學,就應該有印象了,在TaskManager啟動會啟動很多的RPC介面
其中有一個
一目瞭然了,這個東西是用來發送給TaskManager用於啟動TaskManager端任務的!!!!
到這裡jobManager端的job啟動任務就差不多完成了
接下來就是TaskManager端的任務了,隨緣更新的時候在說一下真正TaskManager節點是如何啟動我們job任