Flink – process watermark
WindowOperator.processElement
主要的工作,將當前的element的value加到對應的window中,
windowState.setCurrentNamespace(window); windowState.add(element.getValue()); triggerContext.key = key; triggerContext.window = window; TriggerResult triggerResult= triggerContext.onElement(element);
調用triggerContext.onElement
這裏的Context只是一個簡單的封裝,
public TriggerResult onElement(StreamRecord<IN> element) throws Exception { return trigger.onElement(element.getValue(), element.getTimestamp(), window, this); }
EventTimeTrigger
onElement
@Override public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { // if the watermark is already past the window fire immediatelyreturn TriggerResult.FIRE; } else { ctx.registerEventTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; } }
如果當前window.maxTimestamp已經小於CurrentWatermark,直接觸發
否則將window.maxTimestamp註冊到TimeService中,等待觸發
WindowOperator.Context
public void registerEventTimeTimer(long time) { internalTimerService.registerEventTimeTimer(window, time); }
InternalTimerService
在AbstractStreamOperator
public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT>, Serializable, KeyContext {
註意這裏實現了KeyContext
所以AbstractStreamOperator實現了
public Object getCurrentKey() { if (keyedStateBackend != null) { return keyedStateBackend.getCurrentKey(); } else { throw new UnsupportedOperationException("Key can only be retrieven on KeyedStream."); } }
在AbstractStreamOperator初始化InternalTimeServiceManager
private transient InternalTimeServiceManager<?, ?> timeServiceManager;
@Override public final void initializeState(OperatorStateHandles stateHandles) throws Exception { if (getKeyedStateBackend() != null && timeServiceManager == null) { timeServiceManager = new InternalTimeServiceManager<>( getKeyedStateBackend().getNumberOfKeyGroups(), getKeyedStateBackend().getKeyGroupRange(), this, getRuntimeContext().getProcessingTimeService()); }
WindowOperator中InternalTimerService初始化,
internalTimerService =
getInternalTimerService("window-timers", windowSerializer, this);
在AbstractStreamOperator調用,
public <K, N> InternalTimerService<N> getInternalTimerService( String name, TypeSerializer<N> namespaceSerializer, Triggerable<K, N> triggerable) { checkTimerServiceInitialization(); // the following casting is to overcome type restrictions. TypeSerializer<K> keySerializer = (TypeSerializer<K>) getKeyedStateBackend().getKeySerializer(); InternalTimeServiceManager<K, N> keyedTimeServiceHandler = (InternalTimeServiceManager<K, N>) timeServiceManager; return keyedTimeServiceHandler.getInternalTimerService(name, keySerializer, namespaceSerializer, triggerable); }
其實就是調用InternalTimeServiceManager.getInternalTimerService
最終得到HeapInternalTimerService
HeapInternalTimerService.registerEventTimeTimer
@Override public void registerEventTimeTimer(N namespace, long time) { InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace); Set<InternalTimer<K, N>> timerSet = getEventTimeTimerSetForTimer(timer); if (timerSet.add(timer)) { eventTimeTimersQueue.add(timer); } }
創建InternalTimer,包含,time(window.maxTimestamp), key(keyContext.getCurrentKey), namespace(window)
getEventTimeTimerSetForTimer
private Set<InternalTimer<K, N>> getEventTimeTimerSetForTimer(InternalTimer<K, N> timer) { checkArgument(localKeyGroupRange != null, "The operator has not been initialized."); int keyGroupIdx = KeyGroupRangeAssignment.assignToKeyGroup(timer.getKey(), this.totalKeyGroups); return getEventTimeTimerSetForKeyGroup(keyGroupIdx); }
private Set<InternalTimer<K, N>> getEventTimeTimerSetForKeyGroup(int keyGroupIdx) { int localIdx = getIndexForKeyGroup(keyGroupIdx); Set<InternalTimer<K, N>> timers = eventTimeTimersByKeyGroup[localIdx]; if (timers == null) { timers = new HashSet<>(); eventTimeTimersByKeyGroup[localIdx] = timers; } return timers; }
先找到key所對應的,keygroup,每個keygroup對應於一個Timer集合
這樣設計的目的,因為最終timer也是要checkpoint的,而checkpoint的最小單位是keygroup,所以不同keygroup所對應的timer需要分離開
最終把timer加到eventTimeTimersQueue,
private final PriorityQueue<InternalTimer<K, N>> eventTimeTimersQueue;
PriorityQueue是堆實現的,所以只要在InternalTimer裏面實現compareTo,就可以讓timer排序
AbstractStreamOperator.processWatermark
public void processWatermark(Watermark mark) throws Exception { if (timeServiceManager != null) { timeServiceManager.advanceWatermark(mark); } output.emitWatermark(mark); }
timeServiceManager.advanceWatermark
public void advanceWatermark(Watermark watermark) throws Exception { for (HeapInternalTimerService<?, ?> service : timerServices.values()) { service.advanceWatermark(watermark.getTimestamp()); } }
HeapInternalTimerService.advanceWatermark
public void advanceWatermark(long time) throws Exception { currentWatermark = time; InternalTimer<K, N> timer; while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) { Set<InternalTimer<K, N>> timerSet = getEventTimeTimerSetForTimer(timer); timerSet.remove(timer); eventTimeTimersQueue.remove(); keyContext.setCurrentKey(timer.getKey()); triggerTarget.onEventTime(timer); } }
從eventTimeTimersQueue從小到大取timer,如果小於傳入的water mark,那麽說明這個window需要觸發
設置operater的current key,keyContext.setCurrentKey(timer.getKey())
這裏註意watermarker是沒有key的,所以當一個watermark來的時候是會觸發所有timer,而timer的key是不一定的,所以這裏一定要設置keyContext,否則就亂了 最終觸發triggerTarget.onEventTime
triggerTarget就是WindowOperator
WindowOperator.onEventTime
windowState.setCurrentNamespace(triggerContext.window); ACC contents = null; if (windowState != null) { contents = windowState.get(); } if (contents != null) { TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp()); if (triggerResult.isFire()) { emitWindowContents(triggerContext.window, contents); } if (triggerResult.isPurge()) { windowState.clear(); } }
這裏調用triggerContext.onEventTime,得到TriggerResult
如果fire,走到這,這個肯定滿足的,emitWindowContents
如果purge,就把windowState清空
emitWindowContents,調用用戶定義的windowFunction來處理window的contents
private void emitWindowContents(W window, ACC contents) throws Exception { timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp()); processContext.window = window; userFunction.process(triggerContext.key, window, processContext, contents, timestampedCollector); }
Flink – process watermark