乾貨!FLINK SIDDHI ADDON學習心得分享
SIDDHI 是一款功能強大的CEP 引擎,具有自己的DSL,豐富的模式匹配功能和可擴充套件性, SIDDHI和FLINK的整合功能專案地址:https://github.com/haoch/flink-siddhi 本文主要介紹了這個ADDON的一些實現思路
- 將FLINK STREAM 轉化為 SIDDHI STREAM 定義
用法: SiddhiCEP.registerStream(streamName, FlinkDataStream, fieldNames)
通過 FlinkDataStream.getType 獲得流物件的型別定義.registerStream方法會構造一個 SiddhiStreamSchema 物件,根據流物件的型別定義,自動獲取每個field對應的資料型別存在內部的fieldTypes陣列中.
SiddhiStreamSchema 內部會建立一個Siddhi StreamDefinition物件, StreamDefinition的attribute的定義根據fieldNames + fieldTypes 來新增.SiddhiTypeFactory.getAttributeType 負責將Flink 的資料型別對映為Siddhi的資料型別, 並可自動生成一段Define Stream的定義(見 SiddhiStreamSchema.getStreamDefinitionExpression 方法) define stream [streamName] ([fieldName 1] [fieldType 1], ...[fieldName n] [fieldType n])
SiddhiStreamSchema 包括一個StreamSerializer: 用於將DataStream 中的物件轉化為 Siddhi Stream 的輸入(Object Array):
如果流物件是一個簡單型別 Atomic Type 直接將流物件放到 ARRAY中
如果流物件是Tuple 型別,直接將Tuple 中前N個值放到ARRAY中
如果流物件是Pojo或者CaseClass型別,直接根據每個fieldName 獲取Class對應的屬性放到ARRAY中
- 串聯FLINK STREAM 和 SIDDHI STREAM
SiddhiStream: 抽象的Stream基類
convertDataStream 將原始的FLINK流轉化為Tuple型別的流,Tuple的第一個元素為StreamId, 第二個元素為原來流中的資料,支援普通Stream 和 KeyedStream
ExecutionSiddhiStream: 構建 SiddhiOperatorContext 並呼叫SiddhiStreamFactory.createDataStream 建立了整合Siddhi的 DataStream. DataStream的型別為Tuple的子類.SiddhiTypeFactory.getTupleTypeInformation: 其核心思路是通過Siddhi輸出Stream的StreamDefinition獲得其Attribute的定義,再通過 TypeInfoParser.parse構造Flink Tuple 型別的定義
ExecutableStream 根據Siddhi query 建立ExecutionSiddhiStream物件
SingleSiddhiStream, UnionSiddhiStream: ExecutableStream 的子類,支援Fluent Style的鏈式呼叫. UnionSiddhiStream 呼叫了DataStream.union 方法
SiddhiStreamFactory.createDataStream 通過 FLINK DataStream的transform方法使用了自定義的StreamOperator: SiddhiStreamOperator. 在 AbstractSiddhiOperator 的 setup 方法中建立SiddhiManager 和 SiddhiAppRuntime 並註冊了 InputHandler 和 OutputCallback (StreamOutputHandler)
SiddhiStreamOperator.processElement 需要處理兩種場景:
Flink TimeCharacteristic = ProcessingTime: 先呼叫StreamSerializer將資料轉化為Object Array, 再直接呼叫InputHandler.send將資料傳送給Siddhi處理
Flink TimeCharacteristic = EventTime: 快取接收到的StreamRecord 到內部的priorityQueue中,直到收到Watermark, 將priorityQueue中小於watermark的StreamRecord一次傳送給Siddhi處理
StreamOutputHandler:根據Output的TypeInfo將Siddhi Event 轉化為 Flink StreamRecord. 再轉發到SiddhiStreamOperator的Output
- CHECKPOINT
SiddhiStreamOperator中保留了兩種State資訊,一種是priorityQueue中儲存的由於watermark未傳送給Siddhi的訊息. 另一種是Siddhi本身的State, 通過SiddhiAppRuntime.snapshot() 獲得。
如果你不想再體驗一次自學時找不到資料,沒人解答問題,堅持幾天便放棄的感受的話,可以加我們的大資料交流群:894951460,裡面有各種大資料學習的資料和技術交流。
加油吧,程式設計師!路就在腳下,成功就在明天!
未來的你肯定會感謝現在拼命的自己!