Flink CEP複雜事件處理
Flink CEP複雜事件處理
背景
Complex Event Processing(CEP)是Flink提供的一個非常亮眼的功能。在實際生產中,隨著資料的實時性要求越來越高,實時資料的量也在不斷膨脹,在某些業務場景中需要根據連續的實時資料,發現其中有價值的那些事件。
Flink的CEP到底解決了什麼樣的問題呢?
比如我們需要在大量的訂單交易中發現那些虛假交易,在網站的訪問日誌中尋找那些使用指令碼或者工具“爆破”登陸的使用者,或者在快遞運輸中發現那些滯留很久沒有簽收的包裹等。
Flink對CEP的支援非常友好,並且支援複雜度非常高的模式匹配,其吞吐和延遲都令人滿意。
程式結構
Flink CEP 的程式結構主要分為兩個步驟:
- 定義模式
- 匹配結果
可以在官網中找到一個Flink提供的案例:
DataStream<Event> input = ... Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where( new SimpleCondition<Event>() { @Override public boolean filter(Event event) { return event.getId() == 42; } } ).next("middle").subtype(SubEvent.class).where( new SimpleCondition<SubEvent>() { @Override public boolean filter(SubEvent subEvent) { return subEvent.getVolume() >= 10.0; } } ).followedBy("end").where( new SimpleCondition<Event>() { @Override public boolean filter(Event event) { return event.getName().equals("end"); } } ); PatternStream<Event> patternStream = CEP.pattern(input, pattern); DataStream<Alert> result = patternStream.process( new PatternProcessFunction<Event, Alert>() { @Override public void processMatch( Map<String, List<Event>> pattern, Context ctx, Collector<Alert> out) throws Exception { out.collect(createAlertFrom(pattern)); } });
可以看到程式結構分別是:
- 第一步,定義一個模式Pattern,在這裡定義一個這樣的模式,即在所有接收到的時間中匹配那些以id等於42的事件,然後匹配volumn 大於10.0 的事件,繼續匹配一個name等於 end的事件;
- 第二部,匹配模式並且發出報警,根據定義的pattern在輸入流上進行匹配,一旦命中我們的模式,就發出一個報警。
模式定義
Flink支援了非常豐富的模式定義,這些模式也是我們實現複雜業務邏輯的基礎,我們把支援的模式簡單做了以下分類,完整的API可以參考官網:https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/libs/cep.html#individual-patterns
簡單模式
聯合模式
匹配後的忽略模式
原始碼解析
從官網案例中可以發現,Flink CEP的整個過程:
- 從一個source作為輸入
- 經過一個Pattern運算元轉換為PatternStream
- 經過select/process運算元轉換為DataStream
看一下select和process運算元都做了什麼?
可以看到最終的邏輯都是在PatternStream中進行的
最終經過PatternStreamBuilder的build方法生成了一個SingleOutputStreamOperator,這個類繼承了DataStream。
最終的處理計算邏輯其實都封裝在CepOperator這個類中,而在CepOperator這個類中的processElement方法則是對每一條資料的處理邏輯
同時由於CepOperator實現了Triggerable介面,所以會執行定時器,所有核心的處理邏輯都在UpdateNFA這個方法中。
實戰案例
模擬電商網站使用者搜尋的資料來作為資料的輸入源,然後查詢其中重複搜尋某一個商品的人,併發送告警,程式碼如下
public static void main(String[] args) throws Exception{
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource source = env.fromElements(
//瀏覽記錄
Tuple3.of("Marry", "外套", 1L),
Tuple3.of("Marry", "帽子",1L),
Tuple3.of("Marry", "帽子",2L),
Tuple3.of("Marry", "帽子",3L),
Tuple3.of("Ming", "衣服",1L),
Tuple3.of("Marry", "鞋子",1L),
Tuple3.of("Marry", "鞋子",2L),
Tuple3.of("LiLei", "帽子",1L),
Tuple3.of("LiLei", "帽子",2L),
Tuple3.of("LiLei", "帽子",3L)
);
//定義Pattern,尋找連續搜尋帽子的使用者
Pattern<Tuple3<String, String, Long>, Tuple3<String, String, Long>> pattern = Pattern
.<Tuple3<String, String, Long>>begin("start")
.where(new SimpleCondition<Tuple3<String, String, Long>>() {
@Override
public boolean filter(Tuple3<String, String, Long> value) throws Exception {
return value.f1.equals("帽子");
}
}) //.timesOrMore(3);
.next("middle")
.where(new SimpleCondition<Tuple3<String, String, Long>>() {
@Override
public boolean filter(Tuple3<String, String, Long> value) throws Exception {
return value.f1.equals("帽子");
}
});
KeyedStream keyedStream = source.keyBy(0);
PatternStream patternStream = CEP.pattern(keyedStream, pattern);
SingleOutputStreamOperator matchStream = patternStream.select(new PatternSelectFunction<Tuple3<String, String, Long>, String>() {
@Override
public String select(Map<String, List<Tuple3<String, String, Long>>> pattern) throws Exception {
List<Tuple3<String, String, Long>> middle = pattern.get("middle");
return middle.get(0).f0 + ":" + middle.get(0).f2 + ":" + "連續搜尋兩次帽子!";
}
});
matchStream.printToErr();
env.execute("execute cep");
}
邏輯拆解:
首先定義一個數據源,模擬一些使用者的搜尋資料,然後定義自己的Pattern,這個模式的特點就是連續兩次搜尋商品“帽子”,然後進行匹配,發現匹配後輸出一條提示資訊進行列印