Flink 1.12.0 sql 任務指定 job name
阿新 • • 發佈:2020-12-19
參考前文:解決 Flink 1.11.0 sql 不能指定 jobName 的問題
從 FLink 1.11 改版 sql 的執行流程後,就不能和 Stream Api 一樣使用 env.execute("JobName") 來指定任務名
看了原始碼後發現,在 sql 任務中,直接使用了 "insert-into" 拼接 catelog/database/sink table 做為 sql 任務的 job name
String jobName = "insert-into_" + String.join(",", sinkIdentifierNames);
使用體驗當然是不好的,在 JIRA 上有個 改進的嚴重 issues: https://issues.apache.org/jira/browse/FLINK-18545 討論這個問題,
最後決定在 PipelineOptions 中新增 "pipeline.name" 引數做為 job name
public class PipelineOptions { /** * The job name used for printing and logging. */ public static final ConfigOption<String> NAME = key("pipeline.name") .stringType() .noDefaultValue() .withDescription("The job name used for printing and logging.");
這個 issues 在 Flink 1.12.0 終於 merge 進去了,所以升級到 Flink 1.12.0 就不再需要修改原始碼,直接在 TableConfig 中新增 "pipeline.name" 引數即可
由於之前為了指定 JobName 之前修改過原始碼,所以升級到 Flink 1.12.0 的第一件事情就是去掉之前修改的原始碼,使用 “pipeline.name” 配置引數指定 JobName
其他程式碼都和以前一樣,只需要在 TableConfig 新增引數即可
val tabConf = tableEnv.getConfig
onf.setString("pipeline.name", Common.jobName)
歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文