1. 程式人生 > >Flink-CEP之模式流與運算子

Flink-CEP之模式流與運算子

之前我們分析了CEP的API,接下來我們將分析CEP API的內部實現包括模式流與運算子。

模式流

模式流(PatternStream)是CEP模式匹配的流抽象,一個PatternStream物件表示模式檢測到的序列所對應的流。該序列以對映來表示,以模式名關聯一組事件物件。

為了使用PatternStream,我們首先要構建它,為此Flink提供了一個名為CEP的幫助類,它定義了一個pattern靜態方法:

DataStream<String> inputStream = ...
Pattern<String, ?> pattern = ...

PatternStream<String> patternStream = CEP.pattern(inputStream, pattern);

該方法接收初始事件流DataStream物件以及用於匹配的Pattern物件,在pattern方法內部通過將這兩個引數傳遞給PatternStream的構造方法來構建該物件的。

從之前的案例程式碼中我們看到,通常會在PatternStream上呼叫select或flatSelect來獲取某個模式下匹配到的事件來實現我們的業務邏輯。而在select/flatSelect方法內部,其實仍然是藉助於常規DataStream實現的,我們以其中select方法(存在多個過載)一個作為示例:

public <R> DataStream<R> select(final PatternSelectFunction<
T, R> patternSelectFunction, TypeInformation<R> outTypeInfo) { DataStream<Map<String, T>> patternStream = CEPOperatorUtils.createPatternStream(inputStream, pattern); return patternStream .map(new PatternSelectMapper<>( patternStream.getExecutionEnvironment().
clean(patternSelectFunction))) .returns(outTypeInfo); }

方法的第一行,藉助於CEPOperatorUtils這一幫助類構建DataStream

final NFACompiler.NFAFactory<T> nfaFactory = NFACompiler.compileFactory(pattern, inputSerializer, false);

接著,會判斷初始輸入流是否是基於鍵分組的(KeyedStream),這是為了採用不同的運算子對初始輸入流進行轉換,如果是KeyedStream,則將其初始輸入流進行強制轉換為KeyedStream並採用KeyedCEPPatternOperator:

patternStream = keyedStream.transform(
    "KeyedCEPPatternOperator",
    (TypeInformation<Map<String, T>>) (TypeInformation<?>) TypeExtractor.getForClass(Map.class),
    new KeyedCEPPatternOperator<>(
        inputSerializer,
        isProcessingTime,
        keySelector,
        keySerializer,
        nfaFactory)
);

如果是普通未分組的資料流,則採用CEPPatternOperator:

patternStream = inputStream.transform(
    "CEPPatternOperator",
    (TypeInformation<Map<String, T>>) (TypeInformation<?>) TypeExtractor.getForClass(Map.class),
    new CEPPatternOperator<T>(
        inputSerializer,
        isProcessingTime,
        nfaFactory
    )
).setParallelism(1);

從上面我們看到無論是哪種運算子都要求傳遞NFA工廠,說明NFA是在運算子內部工作的。另外需要注意的是,如果是普通資料流,其並行度被設定為1,也就是整個資料流沒辦法分割槽以並行執行,而是作為一個全域性資料流參與模式匹配。這一點其實不難想象,因為我們在分析模式時,其有事件選擇策略(嚴格緊鄰還是非嚴格緊鄰),也就是說事件前後順序是模式的一部分,那麼這時候如果普通事件流再分割槽執行,將會打破這種順序,從而導致匹配失效。

通過對PatternStream的解析可知,它其實不同於DataStream API裡的各種資料流物件,它並不是�DataStream的特例,也不是由轉換函式得來,它只是對DataStream的二次封裝。

上面我們提及了兩種運算子,但其實並不止這麼多,具體它們的實現以及差別,我們接下來會進行詳細分析。

運算子

CEP的運算子實現有兩個考慮因素:是否針對基於鍵分割槽的資料流以及是否支援對超時的匹配序列進行處理。因此針對這兩個因素的組合將會產生四種運算子的實現,所有運算子相關的類圖如下所示:

AbstractCEPBasePatternOperator為所有的運算子提供基礎模板,它自身繼承自流運算子(AbstractStreamOperator)並擴充套件了單輸入流運算子介面(OneInputStreamOperator)。AbstractCEPBasePatternOperator定義了兩對抽象方法,分別是:

  • (get/update)NFA:(獲得/更新)NFA的例項;
  • (get/update)PriorityQueue:(獲得/更新)優先順序佇列;

其實,這兩對方法主要是為了實現基於鍵分組的運算子提供的,因為它們會利用使用者狀態API來獲取並更新NFA例項以及優先順序佇列(優先順序佇列用於快取事件時間語義時的事件以等待水位線),這一點我們會在下文剖析。藉助於這兩對抽象方法,提供了對processElement(定義在OneInputStreamOperator介面中)的實現:

public void processElement(StreamRecord<IN> element) throws Exception {
    if (isProcessingTime) {
        //獲得NFA物件,處理事件並更新NFA物件
        NFA<IN> nfa = getNFA();
        processEvent(nfa, element.getValue(), System.currentTimeMillis());
        updateNFA(nfa);
    } else {
        //獲得優先順序佇列快取該元素(直到接收到水位線),更新優先順序佇列
        PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();

        if (getExecutionConfig().isObjectReuseEnabled()) {
            priorityQueue.offer(new StreamRecord<IN>(inputSerializer.copy(element.getValue()), 
                element.getTimestamp()));
        } else {
            priorityQueue.offer(element);
        }
        updatePriorityQueue(priorityQueue);
    }
}

從程式碼實現來看,真正執行模式匹配的是processEvent方法。AbstractCEPBasePatternOperator有兩個抽象派生類,分別是:

  • AbstractCEPPatternOperator:普通的CEP模式運算子;
  • AbstractKeyedCEPPatternOperator:基於鍵分割槽的CEP模式運算子

由於AbstractCEPPatternOperator相對較為簡單,因此我們先分析它的實現。AbstractCEPPatternOperator在執行時是單例項的,因為它的並行度為一,因此它不需要用到使用者狀態API,同時也就不需要實現抽象方法updateNFA以及updatePriorityQueue。它實現了processWatermark方法:

public void processWatermark(Watermark mark) throws Exception {
    //如果優先順序佇列不為空且隊首元素的時間戳小於等於水位線的時間戳,則出隊元素並呼叫processEvent方法處理
    while(!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= mark.getTimestamp()) {
        StreamRecord<IN> streamRecord = priorityQueue.poll();

        processEvent(nfa, streamRecord.getValue(), streamRecord.getTimestamp());
    }

    //發射水位線
    output.emitWatermark(mark);
}

由於AbstractCEPPatternOperator最終繼承自AbstractStreamOperator,所以它還需要實現運算子狀態的快照/恢復方法對。我們可以直接利用運算子狀態快照來儲存相關狀態,這裡主要的狀態就是NFA物件以及優先順序佇列。

接下來,我們來分析AbstractKeyedCEPPatternOperator的實現,不同於AbstractCEPPatternOperator所處理的全域性事件流。AbstractKeyedCEPPatternOperator所面對的是基於鍵分割槽的事件流,因此除了NFA物件以及優先順序佇列,還有所有使用者的鍵集合需要儲存。且因為是多分割槽並行執行,那麼NFA物件和優先順序佇列也將會在多個分割槽內並行存在。這時,將不得不使用使用者狀態API,以在內部將這些狀態是跟鍵關聯(內部是KVState):

private transient ValueState<NFA<IN>> nfaOperatorState;
private transient ValueState<PriorityQueue<StreamRecord<IN>>> priorityQueueOperatorState;

因此get/updateNFA方法對是為了配合ValueState的value/update方法對。但鍵集合仍然可以使用運算子狀態來儲存。

AbstractKeyedCEPPatternOperator跟AbstractCEPPatternOperator還有一個區別比較大的地方在於對processWatermark方法的實現,在processWatermark內部它會迭代所有的鍵,並使得它們內部符合計算條件(參照水位線)的元素都被計算。

參照我們上面給出的運算子繼承關係圖,到目前為止,我們已經解析了上面兩層運算子。其中,第一層為processElement提供模板實現,第二層為processWatermark(跟事件時間有關)提供模板實現以及對運算子邏輯相關的狀態進行維護。而最後一層則才是真正處理事件的模式匹配的processEvent方法的實現,該方法由AbstractCEPBasePatternOperator定義。

運算子對processEvent方法的實現,其邏輯基本上都是類似的:呼叫NFA物件的process方法,逐個處理事件,該方法我們在分析NFA時做過重點剖析。下面我們選擇四個運算子裡最為複雜的基於鍵分割槽且支援超時的運算子(TimeoutKeyedCEPPatternOperator)進行分析:

protected void processEvent(NFA<IN> nfa, IN event, long timestamp) {
    //呼叫NFA的process例項方法,會得到由兩個集合組成的二元組,其中二元組第一個下標表示匹配模式的事件序列;
    //第二個下標表示超時的序列
    Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns = 
        nfa.process(event, timestamp);

    Collection<Map<String, IN>> matchedPatterns = patterns.f0;
    Collection<Tuple2<Map<String, IN>, Long>> partialPatterns = patterns.f1;

    //構建用於輸出的流記錄物件,其內部儲存的資料結構是一個Either物件,它表示這樣一個語義:
    //該物件要麼左邊有值,要麼右邊有值
    StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>> streamRecord = 
        new StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>>(null, timestamp);

    //如果有匹配模式的事件序列,則加入Either的右物件
    if (!matchedPatterns.isEmpty()) {
        for (Map<String, IN> matchedPattern : matchedPatterns) {
            streamRecord.replace(Either.Right(matchedPattern));
            output.collect(streamRecord);
        }
    }

    //如果有超時事件序列,則加入Either的左物件
    if (!partialPatterns.isEmpty()) {
        for (Tuple2<Map<String, IN>, Long> partialPattern: partialPatterns) {
            streamRecord.replace(Either.Left(partialPattern));
            output.collect(streamRecord);
        }
    }
}

其他運算子對processEvent的實現大同小異,由於篇幅有限,不再贅述。

微信掃碼關注公眾號:Apache_Flink

apache_flink_weichat

QQ掃碼關注QQ群:Apache Flink學習交流群(123414680)

qrcode_for_apache_flink_qq_group