1. 程式人生 > >Flink - allowedLateness

Flink - allowedLateness

result etc del sele mes 清理 lean 等於 超過

WindowOperator

processElement

final Collection<W> elementWindows = windowAssigner.assignWindows(   //找出該element被assign的所有windows
    element.getValue(), element.getTimestamp(), windowAssignerContext);

//if element is handled by none of assigned elementWindows
boolean isSkippedElement = true
; //element默認是會skiped for (W window: elementWindows) { // drop if the window is already late if (isWindowLate(window)) { //如果window是late,邏輯是window.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark(),continue表示skip continue; } isSkippedElement = false
; //只要有一個窗口非late,該element就是非late數據 windowState.setCurrentNamespace(window); windowState.add(element.getValue()); //把數據加到windowState中 triggerContext.key = key; triggerContext.window = window; //EventTimeTrigger,(window.maxTimestamp() <= ctx.getCurrentWatermark(),會立即fire //否則只是ctx.registerEventTimeTimer(window.maxTimestamp()),註冊等待後續watermark來觸發
TriggerResult triggerResult = triggerContext.onElement(element); if (triggerResult.isFire()) { //如果Fire ACC contents = windowState.get(); if (contents == null) { continue; } emitWindowContents(window, contents); //emit window內容, 這裏會調用自己定義的user function } //對於比較常用的TumblingEventTimeWindows,用EventTimeTrigger,所以是不會觸發purge的 if (triggerResult.isPurge()) { //如果purge windowState.clear(); //將window的state清除掉 } registerCleanupTimer(window); //window的數據也需要清除 } // side output input event if // element not handled by any window // late arriving tag has been set // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp //如果所有的assign window都是late,再判斷一下element也是late if (isSkippedElement && isElementLate(element)) { //isElementLate, (element.getTimestamp() + allowedLateness <= internalTimerService.currentWatermark()) if (lateDataOutputTag != null){ sideOutput(element); //如果定義了sideOutput,就輸出late element } else { this.numLateRecordsDropped.inc(); //否則直接丟棄 } }

這裏currentWatermark的默認值,
private long currentWatermark = Long.MIN_VALUE;

如果定期發送watermark,那麽在第一次收到watermark前,不會有late數據

繼續看看,數據清除掉邏輯
protected void registerCleanupTimer(W window) {
    long cleanupTime = cleanupTime(window); //cleanupTime, window.maxTimestamp() + allowedLateness

    if (windowAssigner.isEventTime()) {
        triggerContext.registerEventTimeTimer(cleanupTime); //這裏只是簡單的註冊registerEventTimeTimer
    } else {
        triggerContext.registerProcessingTimeTimer(cleanupTime);
    }
}

如果clear只是簡單的註冊EventTimeTimer,那麽在onEventTime的時候一定有clear的邏輯、

WindowOperator.onEventTime

if (windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {  //time == cleanupTime(window);
    clearAllState(triggerContext.window, windowState, mergingWindows);
}

果然,onEventTime的時候會判斷,如果Timer的time等於 window的cleanup time,就把all state清除掉

所以當超過,window.maxTimestamp() + allowedLateness就會被清理掉

Flink - allowedLateness