1. 程式人生 > 其它 >Java呼叫Shell指令碼(內含FlinkShell提交取消FlinkJob示例)

Java呼叫Shell指令碼(內含FlinkShell提交取消FlinkJob示例)

技術標籤:Java實戰專案分享Java基礎系列

先從需求說起:有一個數據管理平臺客戶端,資料處理流是FlinkJob的運算元鏈完成的,但是要讓客戶端後端的SpringBoot專案直接控制FlinkJob的任務執行,包括提交,下線等等。
一開始找了下現成的整合包,客戶端,但是好像沒有。後來仔細看了開源的DolphinSchedule,發現它針對Flink的操作全是Shell,於是自己也想著模仿它來弄一套。

(1)本地寫一個.sh指令碼

在這裡插入圖片描述

從這個shell看出,接受了一些入參。並且對百度發起了get請求。

(2)通過Java來呼叫

public static void main(String[
] args) { try { // 指定測試shell指令碼 String shellPath = "/Users/jojo/Downloads/ddp-mvp-flink-job-" + "linhantao-54214c7fa0e526ce2131ee6d7f7de4c7747c3528/test.sh 1,2,3"; // 執行指令碼 Process process = Runtime.getRuntime().exec
(shellPath); // 執行waitFor函式,因為shell程序是JAVA程序的子程序,JAVA作為父程序需要等待子程序執行完畢再繼續執行自己的任務 process.waitFor(); BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream())); StringBuffer sb = new StringBuffer(); String line; while
((line = br.readLine()) != null) { sb.append(line).append("\n"); } String result = sb.toString(); System.out.println(result); } catch (InterruptedException interruptedException) { interruptedException.printStackTrace(); } catch (IOException ioException) { ioException.printStackTrace(); } }

在這裡插入圖片描述

(3)Flink Shell :

// 配置自己的Flink指令碼的目錄
            String flinkShellPath = "XX/XX/flink/bin/flink ";
            // jar包路徑
            String jarPath = "/examples/streaming/SocketWindowWordCount.jar";
            // 指定FlinkShell指令碼路徑,後面的jar可以換成自己的FlinkJob Jar,提交後記得儲存JobId
            String submitShell = "XX/XX/bin/flink run "+jarPath;

            // 拿到某個FlinkJobId
            String flinkJobId = "8e657bcc19c6c1a8db884f0e(JobId)";
            // 取消某個FlinkJob
            String cancelShell = "XX/XX/bin/flink cancel " + flinkJobId;
後續使用和踩坑會持續記錄…