Flink window ReduceFunction 在一個事件到來的時候被觸發多次
阿新 • • 發佈:2019-02-15
今天有一個同學在群裡面問了如下一個問題:
“每輸入條記錄,都會立即執行四次。。”
------
因為一直都是用的event time + watermark的方式來做的流,不太清楚他這個情況。一度以為是事件被複制了多份,導致了視窗Function被觸發多次。
後來看了一下原始碼,發現這個問題其實很簡單。reduce function跟普通的function不一樣,需要等待視窗內的所有資料到齊之後再開始計算。而是視窗內資料條數大於1的時候就會觸發一次reduce function,使視窗內始終儲存一條資料。
至於上面問題中的被執行四次,那是因為有四個視窗。簡單畫一下:
可能畫的有點抽象,簡而言之就是事件時間點都被4個(window size/slide size)視窗所擁有,亦,這個事件會被分配給4個視窗,所以,才會有上面都每一個視窗addElement觸發了reduce function的問題。
對應原始碼:
WindowOperator
核心就是這兩句@Override public void processElement(StreamRecord<IN> element) throws Exception { final Collection<W> elementWindows = windowAssigner.assignWindows( element.getValue(), element.getTimestamp(), windowAssignerContext); //if element is handled by none of assigned elementWindows boolean isSkippedElement = true; final K key = this.<K>getKeyedStateBackend().getCurrentKey(); if (windowAssigner instanceof MergingWindowAssigner) { // merging window 比如 session window pass } else { for (W window: elementWindows) { // drop if the window is already late if (isWindowLate(window)) { continue; } isSkippedElement = false; windowState.setCurrentNamespace(window); windowState.add(element.getValue()); triggerContext.key = key; triggerContext.window = window; TriggerResult triggerResult = triggerContext.onElement(element); if (triggerResult.isFire()) { ACC contents = windowState.get(); if (contents == null) { continue; } emitWindowContents(window, contents); } if (triggerResult.isPurge()) { windowState.clear(); } registerCleanupTimer(window); } } // side output input event if // element not handled by any window // late arriving tag has been set // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp if (isSkippedElement && isElementLate(element)) { if (lateDataOutputTag != null){ sideOutput(element); } else { this.numLateRecordsDropped.inc(); } } }
windowState.setCurrentNamespace(window);
windowState.add(element.getValue());
reduce function是怎麼被呼叫到的呢?參見windowState初始化的程式碼
if (windowStateDescriptor != null) {
windowState = (InternalAppendingState<W, IN, ACC>) getOrCreateKeyedState(windowSerializer, windowStateDescriptor);
}
windowStateDescriptor又是從哪傳進來的呢?參見WindowedStream類
public <R> SingleOutputStreamOperator<R> reduce(
ReduceFunction<T> reduceFunction,
WindowFunction<T, R, K, W> function,
TypeInformation<R> resultType) {
if (reduceFunction instanceof RichFunction) {
throw new UnsupportedOperationException("ReduceFunction of reduce can not be a RichFunction.");
}
//clean the closures
function = input.getExecutionEnvironment().clean(function);
reduceFunction = input.getExecutionEnvironment().clean(reduceFunction);
String callLocation = Utils.getCallLocationName();
String udfName = "WindowedStream." + callLocation;
String opName;
KeySelector<T, K> keySel = input.getKeySelector();
OneInputStreamOperator<T, R> operator;
if (evictor != null) {
@SuppressWarnings({"unchecked", "rawtypes"})
TypeSerializer<StreamRecord<T>> streamRecordSerializer =
(TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
ListStateDescriptor<StreamRecord<T>> stateDesc =
new ListStateDescriptor<>("window-contents", streamRecordSerializer);
opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
operator =
new EvictingWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
new InternalIterableWindowFunction<>(new ReduceApplyWindowFunction<>(reduceFunction, function)),
trigger,
evictor,
allowedLateness,
lateDataOutputTag);
} else {//注意這裡的reduceFunction,這就是你苦苦尋找的東西
ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
reduceFunction,
input.getType().createSerializer(getExecutionEnvironment().getConfig()));
opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
operator =
new WindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
new InternalSingleValueWindowFunction<>(function),
trigger,
allowedLateness,
lateDataOutputTag);
}
return input.transform(opName, resultType, operator);
}
至此,弄清楚了一部分window的程式碼邏輯,也是更深刻的感受到幫別人就是幫自己。