Flink - allowedLateness
阿新 • • 發佈:2017-10-31
result etc del sele mes 清理 lean 等於 超過
WindowOperator
processElement
final Collection<W> elementWindows = windowAssigner.assignWindows( //找出該element被assign的所有windows element.getValue(), element.getTimestamp(), windowAssignerContext); //if element is handled by none of assigned elementWindows boolean isSkippedElement = true; //element默認是會skiped for (W window: elementWindows) { // drop if the window is already late if (isWindowLate(window)) { //如果window是late,邏輯是window.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark(),continue表示skip continue; } isSkippedElement = false; //只要有一個窗口非late,該element就是非late數據 windowState.setCurrentNamespace(window); windowState.add(element.getValue()); //把數據加到windowState中 triggerContext.key = key; triggerContext.window = window; //EventTimeTrigger,(window.maxTimestamp() <= ctx.getCurrentWatermark(),會立即fire //否則只是ctx.registerEventTimeTimer(window.maxTimestamp()),註冊等待後續watermark來觸發TriggerResult triggerResult = triggerContext.onElement(element); if (triggerResult.isFire()) { //如果Fire ACC contents = windowState.get(); if (contents == null) { continue; } emitWindowContents(window, contents); //emit window內容, 這裏會調用自己定義的user function } //對於比較常用的TumblingEventTimeWindows,用EventTimeTrigger,所以是不會觸發purge的 if (triggerResult.isPurge()) { //如果purge windowState.clear(); //將window的state清除掉 } registerCleanupTimer(window); //window的數據也需要清除 } // side output input event if // element not handled by any window // late arriving tag has been set // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp //如果所有的assign window都是late,再判斷一下element也是late if (isSkippedElement && isElementLate(element)) { //isElementLate, (element.getTimestamp() + allowedLateness <= internalTimerService.currentWatermark()) if (lateDataOutputTag != null){ sideOutput(element); //如果定義了sideOutput,就輸出late element } else { this.numLateRecordsDropped.inc(); //否則直接丟棄 } }
這裏currentWatermark的默認值,
private long currentWatermark = Long.MIN_VALUE;
如果定期發送watermark,那麽在第一次收到watermark前,不會有late數據
繼續看看,數據清除掉邏輯
protected void registerCleanupTimer(W window) { long cleanupTime = cleanupTime(window); //cleanupTime, window.maxTimestamp() + allowedLateness if (windowAssigner.isEventTime()) { triggerContext.registerEventTimeTimer(cleanupTime); //這裏只是簡單的註冊registerEventTimeTimer } else { triggerContext.registerProcessingTimeTimer(cleanupTime); } }
如果clear只是簡單的註冊EventTimeTimer,那麽在onEventTime的時候一定有clear的邏輯、
WindowOperator.onEventTime
if (windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) { //time == cleanupTime(window); clearAllState(triggerContext.window, windowState, mergingWindows); }
果然,onEventTime的時候會判斷,如果Timer的time等於 window的cleanup time,就把all state清除掉
所以當超過,window.maxTimestamp() + allowedLateness就會被清理掉
Flink - allowedLateness