Java呼叫Shell指令碼(內含FlinkShell提交取消FlinkJob示例)
阿新 • • 發佈:2021-01-23
技術標籤:Java實戰專案分享Java基礎系列
先從需求說起:有一個數據管理平臺客戶端,資料處理流是FlinkJob的運算元鏈完成的,但是要讓客戶端後端的SpringBoot專案直接控制FlinkJob的任務執行,包括提交,下線等等。
一開始找了下現成的整合包,客戶端,但是好像沒有。後來仔細看了開源的DolphinSchedule,發現它針對Flink的操作全是Shell,於是自己也想著模仿它來弄一套。
(1)本地寫一個.sh指令碼
從這個shell看出,接受了一些入參。並且對百度發起了get請求。
(2)通過Java來呼叫
public static void main(String[ ] args) {
try {
// 指定測試shell指令碼
String shellPath = "/Users/jojo/Downloads/ddp-mvp-flink-job-" +
"linhantao-54214c7fa0e526ce2131ee6d7f7de4c7747c3528/test.sh 1,2,3";
// 執行指令碼
Process process = Runtime.getRuntime().exec (shellPath);
// 執行waitFor函式,因為shell程序是JAVA程序的子程序,JAVA作為父程序需要等待子程序執行完畢再繼續執行自己的任務
process.waitFor();
BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream()));
StringBuffer sb = new StringBuffer();
String line;
while ((line = br.readLine()) != null) {
sb.append(line).append("\n");
}
String result = sb.toString();
System.out.println(result);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
(3)Flink Shell :
// 配置自己的Flink指令碼的目錄
String flinkShellPath = "XX/XX/flink/bin/flink ";
// jar包路徑
String jarPath = "/examples/streaming/SocketWindowWordCount.jar";
// 指定FlinkShell指令碼路徑,後面的jar可以換成自己的FlinkJob Jar,提交後記得儲存JobId
String submitShell = "XX/XX/bin/flink run "+jarPath;
// 拿到某個FlinkJobId
String flinkJobId = "8e657bcc19c6c1a8db884f0e(JobId)";
// 取消某個FlinkJob
String cancelShell = "XX/XX/bin/flink cancel " + flinkJobId;