1. 程式人生 > >Flink原始碼--CLI提交Job

Flink原始碼--CLI提交Job

Flink提供了一個命令列介面,來執行並控制打包後的jar檔案中的程式。其是Flink安裝的一部分,在本地以及分散式模式下都可以使用。CLI命令介面位於:

<flink_home>/bin/flink

當提交Job後,其預設會連線到Flink的JobManager。

提交Flink的Job有一個前提,即JobManager必須處於執行中,Flink有3種執行執行模式:

1、本地模式:<flink-home>/bin/start-local.sh

2、叢集模式:<flink-home>/bin/start-cluster.sh

3、Yarn或Mesos環境

1、使用語法

這裡可將CLI的命令抽象為:

./flink <ACTION> [OPTIONS] [ARGUMENTS]

2、flink指令碼

指令碼檔案:$FLINK_HOME/bin/flink

其主要是執行. “$bin”/config.sh來載入flink的環境配置資訊,而config.sh則讀取flink-conf.yaml、slaves、masters等來讀取配置。

在flink指令碼檔案的最後(55行),執行了最終的Job:

exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}" -classpath "`manglePathList "
$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.CliFrontend "[email protected]"

這裡就是用java命令來執行flink的類:org.apache.flink.client.CliFrontend 來完成。

例如,我的CLI命令是:

flink run -c com.toptrade.Job toptrade-flink-1.0.jar prod.properties

則flink做的事情是校驗我的命令,並載入flink的環境配置、taskManagers、HA的機器配置、日誌檔案、ClassPath以及hadoop的配置等,並最終執行java執行命令,如下:

這裡寫圖片描述

即指定:

1、/home/flink/java/jdk1.8.0_60/bin/java 
2、-Dlog.file、-Dlog4j.configuration、-Dlogback.configurationFile
3、-classpath
4、執行org.apache.flink.client.CliFrontend5、引用引數run -c com.toptrade.Job toptrade-flink-1.0.jar prod.properties
6、載入flink環境配置資訊

3、org.apache.flink.client.CliFrontend

此類是所有Job的入口,通過讀取flink的環境、配置資訊,並根據使用者提供的jar包和入口類,進行最終的Job提交。

這裡寫圖片描述

我們再來看看parseParameters(args)方法:
這裡寫圖片描述

這裡會根據《Action》的不同,進行不同的操作,我們主要看下run,即程式的執行:

這裡的run方法主要分為以下幾大部分:

1、將命令列中的options、程式入口類、jar檔案等封裝起來

//命令列選項類,繼承自ProgramOptions
        RunOptions options;
        try {
            // 解析args,將配置資訊用RunOptions類封裝起來
            options = CliFrontendParser.parseRunCommand(args);
        }

關於RunOptions,其繼承關係圖如下:

這裡寫圖片描述

我們看看最基礎的抽象類CommandLineOptions:
這裡寫圖片描述

下層的抽象類ProgramOptions,也是一個基礎類,其將配置中的jar、入口類、classPath以及《options》的一些資訊封裝起來:
這裡寫圖片描述

RunOptions類:只是呼叫父類的構造方法:
這裡寫圖片描述

2、建立一個封裝了入口類、jar檔案、classpath路徑、使用者配置引數的例項:PackagedProgram

// 構建程式
        PackagedProgram program;
        try {
            LOG.info("Building program from JAR file");
            // 建立一個封裝了入口類、jar檔案、classpath路徑、使用者配置引數的例項:PackagedProgram
            program = buildProgram(options);
        }

buildProgram方法主要是載入並初始化PackagedProgram類:

// Get assembler class
        String entryPointClass = options.getEntryPointClassName();

        PackagedProgram program = entryPointClass == null ?
                new PackagedProgram(jarFile, classpaths, programArgs) :
                new PackagedProgram(jarFile, classpaths, entryPointClass, programArgs);

3、封裝必要的函式並提交到遠端叢集中

// 3、封裝必要的函式並提交到遠端叢集中,有2個子類:StandaloneClusterClient以及YarnClusterClient,現在也包含了Mesos的支援
        ClusterClient client = null;
        try {
            // 根據RunOptions和PackagedProgram,建立Client
            client = createClient(options, program);
            client.setPrintStatusDuringExecution(options.getStdoutLogging());
            client.setDetached(options.getDetachedMode());
            LOG.debug("Client slots is set to {}", client.getMaxSlots());

            LOG.debug(options.getSavepointRestoreSettings().toString());

            int userParallelism = options.getParallelism();
            LOG.debug("User parallelism is set to {}", userParallelism);
            if (client.getMaxSlots() != -1 && userParallelism == -1) {
                logAndSysout("Using the parallelism provided by the remote cluster ("
                    + client.getMaxSlots()+"). "
                    + "To use another parallelism, set it at the ./bin/flink client.");
                userParallelism = client.getMaxSlots();
            }
            // 最終的執行,根據PackagedProgram以及ClusterClient和並行度,執行程式
            return executeProgram(program, client, userParallelism);
        }

這裡有2個主要的方法:

1client = createClient(options, program);
2、executeProgram(program, client, userParallelism);

我們分別來看下2個方法。
createClient:建立ClusterClient物件,包含叢集模式資訊,JobManager地址以及WebUI埠等。
這裡寫圖片描述

最後也是最重要的一個方法:executeProgram。

// Client通過Actor提交程式到JobManager
    protected int executeProgram(PackagedProgram program, ClusterClient client, int parallelism) {
        //輸出:Starting execution of program
        logAndSysout("Starting execution of program");
        // 包含JobId的Job提交類
        JobSubmissionResult result;
        try {
            //執行CluterClient中的run方法
            result = client.run(program, parallelism);
        }

繼續看一下client.run(program, parallelism):

public JobSubmissionResult run(PackagedProgram prog, int parallelism)
            throws ProgramInvocationException, ProgramMissingJobException
    {
        Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
        // 如果包含入口類(非互動模式提交Job)
        if (prog.isUsingProgramEntryPoint()) {
            // JobWithJars是一個Flink資料流計劃,包含了jar中所有的類,以及用於載入使用者程式碼的ClassLoader
            final JobWithJars jobWithJars;
            if (hasUserJarsInClassPath(prog.getAllLibraries())) {
                jobWithJars = prog.getPlanWithoutJars();
            } else {
                jobWithJars = prog.getPlanWithJars();
            }
            //跳轉到3個引數的run方法
            return run(jobWithJars, parallelism, prog.getSavepointSettings());
        }

我們這裡暫時只關注包含入口類的情況,不對互動提交模式進行分析。

// 此時Client已經連線到Flink的叢集,該呼叫將被阻塞直到執行完成
    public JobSubmissionResult run(JobWithJars jobWithJars, int parallelism, SavepointRestoreSettings savepointSettings)
            throws CompilerException, ProgramInvocationException {
        ClassLoader classLoader = jobWithJars.getUserCodeClassLoader();
        if (classLoader == null) {
            throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader.");
        }
        //這裡根據流或批,為每一個operator進行優化,例如shuffle的方式,hash join、sort-merge join或者廣播等進行優化
        OptimizedPlan optPlan = getOptimizedPlan(compiler, jobWithJars, parallelism);
        //根據優化後的執行計劃,jar檔案,classpath,類載入器,儲存點設定執行
        return run(optPlan, jobWithJars.getJarFiles(), jobWithJars.getClasspaths(), classLoader, savepointSettings);
    }
public JobSubmissionResult run(FlinkPlan compiledPlan,
            List<URL> libraries, List<URL> classpaths, ClassLoader classLoader, SavepointRestoreSettings savepointSettings)
        throws ProgramInvocationException
    {
        // 生成JobGraph,並將JobGraph提交到JobManager
        JobGraph job = getJobGraph(compiledPlan, libraries, classpaths, savepointSettings);
        return submitJob(job, classLoader);
    }

關於如何生成JobGraph,這裡忽略。

最後,我們看下submitJob方法,這裡以StandaloneClusterClient.submitJob方法為例,判斷是否是分離模式,如果不是的話,則執行ClusterClient的run(JobGraph jobGraph, ClassLoader classLoader)方法:

public JobExecutionResult run(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {

        waitForClusterToBeReady();

        final LeaderRetrievalService leaderRetrievalService;
        try {
            // 根據配置資訊中是否是高可用模式,建立LeaderRetrievalService物件
            leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig, true);
        } catch (Exception e) {
            throw new ProgramInvocationException("Could not create the leader retrieval service", e);
        }

        try {
            logAndSysout("Submitting job with JobID: " + jobGraph.getJobID() + ". Waiting for job completion.");
            // 呼叫JobClient的submitJobAndWait方法,提交Job
            this.lastJobExecutionResult = JobClient.submitJobAndWait(actorSystemLoader.get(), flinkConfig,
                leaderRetrievalService, jobGraph, timeout, printStatusDuringExecution, classLoader);
            return this.lastJobExecutionResult;
        } catch (JobExecutionException e) {
            throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
        }
    }

後續的流程主要是通過JobClient將Job提交給JobClient actor,而後由這個actor提交給JobManager,完成Job的提交。

4、日誌輸出

這裡寫圖片描述

可以看到,主要是在createClient(options, program)和executeProgram(program, client, userParallelism)方法中輸出。

以上便是作業提交的全部流程,中間有些過程可能偏離了主線,而有些過程又一筆帶過,很多細節沒有涉及到。

關於如何生成StreamGraph以及JobGraph,可以參考群主的部落格: