Flink SQL任務自動生成與提交
起因
事情的起因,是看到一篇公眾號文章Apache Flink 在汽車之家的應用與實踐,裡面提到了“基於 SQL 的開發流程”。在平臺提供以上功能的基礎上,使用者可以快速的實現 SQL 作業的開發:
建立一個 SQL 任務;
1.編寫 DDL 宣告 Source 和 Sink;
2.編寫 DML,完成主要業務邏輯的實現;
3.線上檢視結果,若資料符合預期,新增 INSERT INTO 語句,寫入到指定 Sink 中即可。
即這種功能:
之前也寫過一個spark自動提交任務的小專案,但始終無法做到簡單互動自動生成任務,因而最終沒有用到生產環境當中。Flink的SQL是Flink生態裡的一等公民,可以做到直接一條SQL語句把資料來源source載入到一張表,也可以一條SQL語句把資料寫入到sink終端。這讓互動式自動生成任務能夠簡單實現。
本文前提是,簡單實現。如果不考慮價效比,當然也可以做到一個WEB系統,全互動式,點點點,需要的什麼引數,什麼功能提交到後臺,解析生成任務也是可以做的,但這樣價效比不高。
思路
1.首先是一個簡單的WEB系統,後臺springboot可以快速實現,前端thymeleaf可以快速做一個簡單的頁面,不考慮CSS,JS。
2.然後提供一個頁面,傳入flink叢集引數,mainclass,parallelism等,以及業務引數,主要是sourceSql,sinkSql,transformationSql。然後後臺提供一個Flink任務的模板,將sourceSql,sinkSql,transformationSql這三個引數替換掉。再編譯成class並生成jar包。
3.然後將jar包提交到flink叢集,再將任務提交到叢集。
這一點可以通過Flink提供的restapi實現。
https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/ops/rest_api/
關於第2點,也可以在模板當中以提供引數的形式傳入sourceSql,sinkSql,transformationSql,flink 在提交任務的接口裡也提供了這樣的引數program-args,但只適合簡單的引數,像sql語句這種帶大量的引號,空格,換行符不建議這樣傳參。
實現
功能本身實現起來倒沒太大問題。程式碼就不提供了,光佔篇幅。有興趣的同學可以看
1.配置
需要修改application的以下4個配置
分別是
flink叢集的訪問地址,
佈置WEB服務的根目錄地址,
編譯flink任務所依賴的jar包目錄,
flink任務main class,也就是flink任務模板的路徑(不要寫成包名的格式)
flinkurl=http://localhost:8081
rootpath=D://ideaproject//test//flink_remotesubmit
dependmentpath=D://ideaproject//test//flink_remotesubmit//lib
mainpath=//src//main//java//com//yp//flink//model//TableApiModel.java
2.介面如下
提交成功後,跳轉到flink任務介面
3.環境
基於
flink 1.13.2
spring boot 2.5
問題
遇到的問題,來自於汽車之家的那張圖。就文章裡表述的需求來說,需要每天按小時統計PV,UV。
1.flink 的kafka connector 目前只支援批模式(有界資料),不支援流模式(無界)。
如圖中紅色標記處。
如果按inStreamingMode,那就是實時統計,沒法設定batch頻次;如果按inBatchMode批次,上圖可見,目前不支援。而且kafka connector裡引數也只支援kafka的起始位置,不支援結束位置。
所以這裡怎麼實現的?目前不得而知。
2.還是如上圖所見,在sink的時候是隻支援append模式,而在append模式下,不支援group by,因為使用了group by 會改行結果行。
如果強行使用group by 將會丟擲異常:
目前sink只支援append模式,如果使用了group by 等會改變結果行,會報錯:AppendStreamTableSink doesn't support consuming update changes which is produced by node GroupAggregate
所以對kafka資料來源進行sink的怎樣進行分組聚合?
Flink SQL還是非常強大的。前幾天釋出的flink 14版本,批執行模式現在支援在同一應用中混合使用 DataStream API 和 SQL/Table API(此前僅支援單獨使用 DataStream API 或 SQL/Table API)。也引入了更多的connetor。
蒼茫之天涯,乃吾輩之所愛也;浩瀚之程式,亦吾之所愛也,然則何時而愛耶?必曰:先天下之憂而憂,後天下之愛而愛也!