Flink命令列提交job (原始碼分析)
這篇文章主要介紹從命令列到任務在Driver端執行的過程
通過flink run 命令提交jar包執行程式
以yarn 模式提交任務命令類似於: flink run -m yarn-cluster XXX.jar
先來看一下指令碼中的呼叫類
在flink.sh指令碼中可以看到提交的命令走到了這樣一個外觀類上,用於提交job解析使用者命令列引數
在其main方法中
先會解析對應需要的flink引數包括flink-conf-dir等,接著
1處會根據是否有hadoop許可權安全控制走對應的doas(),具體的執行邏輯為2處解析對應的使用者引數
拿到引數後會先將引數中的第一個先取出來作為action
這裡我們只看job提交的,解析出來也就是run,然後將剩餘的引數用於job執行
在job執行前會先解析剩餘的引數,比如執行的jar檔案地址,執行的主類名(沒有後面回去Manifest裡面找)作為entryPoint入口,並行度等引數
接著
就用得到的這些引數構建program了,這裡其實就是拿到了入口執行類的全額限定名,然後通過類載入器載入執行主類
接著,會根據執行時使用者的主類是否為Program的實現類(使用者可以直接返回plan)來設定對應的packageProgram的屬性program是否為空
那我們常規的提交main方法主類的這裡就是空的,如果是主類實現progarm的就反射例項化了一個以後賦給它
接著,就是執行並且提交任務了
這裡比較重要,yarn模式提交的話這裡會排程整個叢集,提交常見的異常
Couldn't deploy Yarn session cluster
就是從這個方法裡面丟擲的,與yarn有關
這裡只看yarn的排程叢集,因為standalone模式的話Jobmanager和TaskManager是已經啟動好的了不需要這裡
其中走到了這個方法deployInternal()
可以看到這裡就是申請AppMaster並且傳入了yarn模式啟動叢集的類的全額限定名,其實就是這個類
用於啟動jobmanager,和standalone 的入口類
org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
功能差不多,但是還有有區別,當這個yarnsourceManager類申請到contain的時候就會
就會去起對應的taskManager了
回到最開始,當叢集排程完以後
執行使用者程式
其實就是呼叫了使用者的main方法,結束
後面就是job往jobmanager提交了,前面的文章有
總結:
通過一個外觀類解析使用者引數,拿到類名
排程叢集啟動申請AppMaster,Contaion起JM,TM
然後類名通過類載入器載入類,然後反射例項呼叫使用者的main方法啟動Job
&n