1. 程式人生 > 實用技巧 >Flink 1.12.0 sql 任務指定 job name

Flink 1.12.0 sql 任務指定 job name

參考前文:解決 Flink 1.11.0 sql 不能指定 jobName 的問題

從 FLink 1.11 改版 sql 的執行流程後,就不能和 Stream Api 一樣使用 env.execute("JobName") 來指定任務名

看了原始碼後發現,在 sql 任務中,直接使用了 "insert-into" 拼接 catelog/database/sink table 做為 sql 任務的 job name

String jobName = "insert-into_" + String.join(",", sinkIdentifierNames);

使用體驗當然是不好的,在 JIRA 上有個 改進的嚴重 issues: https://issues.apache.org/jira/browse/FLINK-18545 討論這個問題,
最後決定在 PipelineOptions 中新增 "pipeline.name" 引數做為 job name

public class PipelineOptions {

  /**
   * The job name used for printing and logging.
   */
  public static final ConfigOption<String> NAME =
    key("pipeline.name")
      .stringType()
      .noDefaultValue()
      .withDescription("The job name used for printing and logging.");

這個 issues 在 Flink 1.12.0 終於 merge 進去了,所以升級到 Flink 1.12.0 就不再需要修改原始碼,直接在 TableConfig 中新增 "pipeline.name" 引數即可

由於之前為了指定 JobName 之前修改過原始碼,所以升級到 Flink 1.12.0 的第一件事情就是去掉之前修改的原始碼,使用 “pipeline.name” 配置引數指定 JobName

其他程式碼都和以前一樣,只需要在 TableConfig 新增引數即可

val tabConf = tableEnv.getConfig
onf.setString("pipeline.name", Common.jobName)

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文