Flink原始碼--CLI提交Job
Flink提供了一個命令列介面,來執行並控制打包後的jar檔案中的程式。其是Flink安裝的一部分,在本地以及分散式模式下都可以使用。CLI命令介面位於:
<flink_home>/bin/flink
當提交Job後,其預設會連線到Flink的JobManager。
提交Flink的Job有一個前提,即JobManager必須處於執行中,Flink有3種執行執行模式:
1、本地模式:<flink-home>/bin/start-local.sh
2、叢集模式:<flink-home>/bin/start-cluster.sh
3、Yarn或Mesos環境
1、使用語法
這裡可將CLI的命令抽象為:
./flink <ACTION> [OPTIONS] [ARGUMENTS]
2、flink指令碼
指令碼檔案:$FLINK_HOME/bin/flink
其主要是執行. “$bin”/config.sh來載入flink的環境配置資訊,而config.sh則讀取flink-conf.yaml、slaves、masters等來讀取配置。
在flink指令碼檔案的最後(55行),執行了最終的Job:
exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}" -classpath "`manglePathList " $CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.CliFrontend "[email protected]"
這裡就是用java命令來執行flink的類:org.apache.flink.client.CliFrontend
來完成。
例如,我的CLI命令是:
flink run -c com.toptrade.Job toptrade-flink-1.0.jar prod.properties
則flink做的事情是校驗我的命令,並載入flink的環境配置、taskManagers、HA的機器配置、日誌檔案、ClassPath以及hadoop的配置等,並最終執行java執行命令,如下:
即指定:
1、/home/flink/java/jdk1.8.0_60/bin/java
2、-Dlog.file、-Dlog4j.configuration、-Dlogback.configurationFile
3、-classpath
4、執行org.apache.flink.client.CliFrontend類
5、引用引數run -c com.toptrade.Job toptrade-flink-1.0.jar prod.properties
6、載入flink環境配置資訊
3、org.apache.flink.client.CliFrontend
此類是所有Job的入口,通過讀取flink的環境、配置資訊,並根據使用者提供的jar包和入口類,進行最終的Job提交。
我們再來看看parseParameters(args)方法:
這裡會根據《Action》的不同,進行不同的操作,我們主要看下run,即程式的執行:
這裡的run方法主要分為以下幾大部分:
1、將命令列中的options、程式入口類、jar檔案等封裝起來
//命令列選項類,繼承自ProgramOptions
RunOptions options;
try {
// 解析args,將配置資訊用RunOptions類封裝起來
options = CliFrontendParser.parseRunCommand(args);
}
關於RunOptions,其繼承關係圖如下:
我們看看最基礎的抽象類CommandLineOptions:
下層的抽象類ProgramOptions,也是一個基礎類,其將配置中的jar、入口類、classPath以及《options》的一些資訊封裝起來:
RunOptions類:只是呼叫父類的構造方法:
2、建立一個封裝了入口類、jar檔案、classpath路徑、使用者配置引數的例項:PackagedProgram
// 構建程式
PackagedProgram program;
try {
LOG.info("Building program from JAR file");
// 建立一個封裝了入口類、jar檔案、classpath路徑、使用者配置引數的例項:PackagedProgram
program = buildProgram(options);
}
buildProgram方法主要是載入並初始化PackagedProgram類:
// Get assembler class
String entryPointClass = options.getEntryPointClassName();
PackagedProgram program = entryPointClass == null ?
new PackagedProgram(jarFile, classpaths, programArgs) :
new PackagedProgram(jarFile, classpaths, entryPointClass, programArgs);
3、封裝必要的函式並提交到遠端叢集中
// 3、封裝必要的函式並提交到遠端叢集中,有2個子類:StandaloneClusterClient以及YarnClusterClient,現在也包含了Mesos的支援
ClusterClient client = null;
try {
// 根據RunOptions和PackagedProgram,建立Client
client = createClient(options, program);
client.setPrintStatusDuringExecution(options.getStdoutLogging());
client.setDetached(options.getDetachedMode());
LOG.debug("Client slots is set to {}", client.getMaxSlots());
LOG.debug(options.getSavepointRestoreSettings().toString());
int userParallelism = options.getParallelism();
LOG.debug("User parallelism is set to {}", userParallelism);
if (client.getMaxSlots() != -1 && userParallelism == -1) {
logAndSysout("Using the parallelism provided by the remote cluster ("
+ client.getMaxSlots()+"). "
+ "To use another parallelism, set it at the ./bin/flink client.");
userParallelism = client.getMaxSlots();
}
// 最終的執行,根據PackagedProgram以及ClusterClient和並行度,執行程式
return executeProgram(program, client, userParallelism);
}
這裡有2個主要的方法:
1、client = createClient(options, program);
2、executeProgram(program, client, userParallelism);
我們分別來看下2個方法。
createClient:建立ClusterClient物件,包含叢集模式資訊,JobManager地址以及WebUI埠等。
最後也是最重要的一個方法:executeProgram。
// Client通過Actor提交程式到JobManager
protected int executeProgram(PackagedProgram program, ClusterClient client, int parallelism) {
//輸出:Starting execution of program
logAndSysout("Starting execution of program");
// 包含JobId的Job提交類
JobSubmissionResult result;
try {
//執行CluterClient中的run方法
result = client.run(program, parallelism);
}
繼續看一下client.run(program, parallelism):
public JobSubmissionResult run(PackagedProgram prog, int parallelism)
throws ProgramInvocationException, ProgramMissingJobException
{
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
// 如果包含入口類(非互動模式提交Job)
if (prog.isUsingProgramEntryPoint()) {
// JobWithJars是一個Flink資料流計劃,包含了jar中所有的類,以及用於載入使用者程式碼的ClassLoader
final JobWithJars jobWithJars;
if (hasUserJarsInClassPath(prog.getAllLibraries())) {
jobWithJars = prog.getPlanWithoutJars();
} else {
jobWithJars = prog.getPlanWithJars();
}
//跳轉到3個引數的run方法
return run(jobWithJars, parallelism, prog.getSavepointSettings());
}
我們這裡暫時只關注包含入口類的情況,不對互動提交模式進行分析。
// 此時Client已經連線到Flink的叢集,該呼叫將被阻塞直到執行完成
public JobSubmissionResult run(JobWithJars jobWithJars, int parallelism, SavepointRestoreSettings savepointSettings)
throws CompilerException, ProgramInvocationException {
ClassLoader classLoader = jobWithJars.getUserCodeClassLoader();
if (classLoader == null) {
throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader.");
}
//這裡根據流或批,為每一個operator進行優化,例如shuffle的方式,hash join、sort-merge join或者廣播等進行優化
OptimizedPlan optPlan = getOptimizedPlan(compiler, jobWithJars, parallelism);
//根據優化後的執行計劃,jar檔案,classpath,類載入器,儲存點設定執行
return run(optPlan, jobWithJars.getJarFiles(), jobWithJars.getClasspaths(), classLoader, savepointSettings);
}
public JobSubmissionResult run(FlinkPlan compiledPlan,
List<URL> libraries, List<URL> classpaths, ClassLoader classLoader, SavepointRestoreSettings savepointSettings)
throws ProgramInvocationException
{
// 生成JobGraph,並將JobGraph提交到JobManager
JobGraph job = getJobGraph(compiledPlan, libraries, classpaths, savepointSettings);
return submitJob(job, classLoader);
}
關於如何生成JobGraph,這裡忽略。
最後,我們看下submitJob方法,這裡以StandaloneClusterClient.submitJob方法為例,判斷是否是分離模式,如果不是的話,則執行ClusterClient的run(JobGraph jobGraph, ClassLoader classLoader)方法:
public JobExecutionResult run(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
waitForClusterToBeReady();
final LeaderRetrievalService leaderRetrievalService;
try {
// 根據配置資訊中是否是高可用模式,建立LeaderRetrievalService物件
leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig, true);
} catch (Exception e) {
throw new ProgramInvocationException("Could not create the leader retrieval service", e);
}
try {
logAndSysout("Submitting job with JobID: " + jobGraph.getJobID() + ". Waiting for job completion.");
// 呼叫JobClient的submitJobAndWait方法,提交Job
this.lastJobExecutionResult = JobClient.submitJobAndWait(actorSystemLoader.get(), flinkConfig,
leaderRetrievalService, jobGraph, timeout, printStatusDuringExecution, classLoader);
return this.lastJobExecutionResult;
} catch (JobExecutionException e) {
throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
}
}
後續的流程主要是通過JobClient將Job提交給JobClient actor,而後由這個actor提交給JobManager,完成Job的提交。
4、日誌輸出
可以看到,主要是在createClient(options, program)和executeProgram(program, client, userParallelism)方法中輸出。
以上便是作業提交的全部流程,中間有些過程可能偏離了主線,而有些過程又一筆帶過,很多細節沒有涉及到。
關於如何生成StreamGraph以及JobGraph,可以參考群主的部落格: