1. 程式人生 > 實用技巧 >Flink CEP複雜事件處理

Flink CEP複雜事件處理

背景

Complex Event Processing(CEP)是Flink提供的一個非常亮眼的功能。在實際生產中,隨著資料的實時性要求越來越高,實時資料的量也在不斷膨脹,在某些業務場景中需要根據連續的實時資料,發現其中有價值的那些事件。
Flink的CEP到底解決了什麼樣的問題呢?
比如我們需要在大量的訂單交易中發現那些虛假交易,在網站的訪問日誌中尋找那些使用指令碼或者工具“爆破”登陸的使用者,或者在快遞運輸中發現那些滯留很久沒有簽收的包裹等。
Flink對CEP的支援非常友好,並且支援複雜度非常高的模式匹配,其吞吐和延遲都令人滿意。

程式結構

Flink CEP 的程式結構主要分為兩個步驟:

  • 定義模式
  • 匹配結果

可以在官網中找到一個Flink提供的案例:

DataStream<Event> input = ...

Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(

        new SimpleCondition<Event>() {

            @Override

            public boolean filter(Event event) {

                return event.getId() == 42;

            }

        }

    ).next("middle").subtype(SubEvent.class).where(

        new SimpleCondition<SubEvent>() {

            @Override

            public boolean filter(SubEvent subEvent) {

                return subEvent.getVolume() >= 10.0;

            }

        }

    ).followedBy("end").where(

         new SimpleCondition<Event>() {

            @Override

            public boolean filter(Event event) {

                return event.getName().equals("end");

            }

         }

    );

PatternStream<Event> patternStream = CEP.pattern(input, pattern);

DataStream<Alert> result = patternStream.process(

    new PatternProcessFunction<Event, Alert>() {

        @Override

        public void processMatch(

                Map<String, List<Event>> pattern,

                Context ctx,

                Collector<Alert> out) throws Exception {

            out.collect(createAlertFrom(pattern));

        }

    });

可以看到程式結構分別是:

  • 第一步,定義一個模式Pattern,在這裡定義一個這樣的模式,即在所有接收到的時間中匹配那些以id等於42的事件,然後匹配volumn 大於10.0 的事件,繼續匹配一個name等於 end的事件;
  • 第二部,匹配模式並且發出報警,根據定義的pattern在輸入流上進行匹配,一旦命中我們的模式,就發出一個報警。

模式定義

Flink支援了非常豐富的模式定義,這些模式也是我們實現複雜業務邏輯的基礎,我們把支援的模式簡單做了以下分類,完整的API可以參考官網:https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/libs/cep.html#individual-patterns

簡單模式

聯合模式

匹配後的忽略模式

原始碼解析

從官網案例中可以發現,Flink CEP的整個過程:

  • 從一個source作為輸入
  • 經過一個Pattern運算元轉換為PatternStream
  • 經過select/process運算元轉換為DataStream

看一下select和process運算元都做了什麼?

可以看到最終的邏輯都是在PatternStream中進行的

最終經過PatternStreamBuilder的build方法生成了一個SingleOutputStreamOperator,這個類繼承了DataStream。

最終的處理計算邏輯其實都封裝在CepOperator這個類中,而在CepOperator這個類中的processElement方法則是對每一條資料的處理邏輯

同時由於CepOperator實現了Triggerable介面,所以會執行定時器,所有核心的處理邏輯都在UpdateNFA這個方法中。

實戰案例

模擬電商網站使用者搜尋的資料來作為資料的輸入源,然後查詢其中重複搜尋某一個商品的人,併發送告警,程式碼如下

public static void main(String[] args) throws Exception{

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.setParallelism(1);

    DataStreamSource source = env.fromElements(

            //瀏覽記錄

            Tuple3.of("Marry", "外套", 1L),

            Tuple3.of("Marry", "帽子",1L),

            Tuple3.of("Marry", "帽子",2L),

            Tuple3.of("Marry", "帽子",3L),

            Tuple3.of("Ming", "衣服",1L),

            Tuple3.of("Marry", "鞋子",1L),

            Tuple3.of("Marry", "鞋子",2L),

            Tuple3.of("LiLei", "帽子",1L),

            Tuple3.of("LiLei", "帽子",2L),

            Tuple3.of("LiLei", "帽子",3L)

    );

    //定義Pattern,尋找連續搜尋帽子的使用者

    Pattern<Tuple3<String, String, Long>, Tuple3<String, String, Long>> pattern = Pattern

            .<Tuple3<String, String, Long>>begin("start")

            .where(new SimpleCondition<Tuple3<String, String, Long>>() {

                @Override

                public boolean filter(Tuple3<String, String, Long> value) throws Exception {

                    return value.f1.equals("帽子");

                }

            }) //.timesOrMore(3);

            .next("middle")

            .where(new SimpleCondition<Tuple3<String, String, Long>>() {

                @Override

                public boolean filter(Tuple3<String, String, Long> value) throws Exception {

                    return value.f1.equals("帽子");

                }

            });

    KeyedStream keyedStream = source.keyBy(0);

    PatternStream patternStream = CEP.pattern(keyedStream, pattern);

    SingleOutputStreamOperator matchStream = patternStream.select(new PatternSelectFunction<Tuple3<String, String, Long>, String>() {

        @Override

        public String select(Map<String, List<Tuple3<String, String, Long>>> pattern) throws Exception {

            List<Tuple3<String, String, Long>> middle = pattern.get("middle");

            return middle.get(0).f0 + ":" + middle.get(0).f2 + ":" + "連續搜尋兩次帽子!";

        }

    });

    matchStream.printToErr();

    env.execute("execute cep");

}

邏輯拆解:
首先定義一個數據源,模擬一些使用者的搜尋資料,然後定義自己的Pattern,這個模式的特點就是連續兩次搜尋商品“帽子”,然後進行匹配,發現匹配後輸出一條提示資訊進行列印