Apache Flink原始碼解析之stream-operator
前面我們談論了Flink stream中的transformation。你可以將transformation
看成編寫Flink程式並構建流式處理程式的必要組成部分(靜態表現形式);而本篇我們將探討transformation
在Flink執行時對應的動態表現形式——operator。他們之間的對映關係見下圖:
具體的探討可以檢視前文:Flink中的一些核心概念
StreamOperator
所有operator
的最終基類,operator
的分類方式,按照輸入流個數不同分為:
- 無輸入:StreamSource
- 單個流輸入:OneInputStreamOperator
- 兩個流輸入:TwoInputStreamOperator
跟生命週期有關的核心抽象方法:
- setup : 例項化
operator
- open :該方法會在任何元素被處理之前執行,它的實現通常包含了
operator
的初始化邏輯 - close :該方法在所有的元素都進入到
operator
被處理之後呼叫 - dispose :該方法在
operator
生命週期的最後階段執行,主要用於回收資源
StreamOperator
及其實現類中還包含了一些狀態恢復與儲存相關的邏輯,但這些不是本文的主題,所有暫時不做探討。
先來看一下整個package的類關係圖:
我們整個剖析方式大致也按照以上operator
的分類方式以及類的層次結構來。
StreamSource
作為一個流處理DAG的起點,source operator
相比其他operator
無疑是特別的(從類的繼承關係圖也可以看出來)。
它需要接受SourceFunction
的例項。並且我們可以看到,它的chaining strategy
是HEAD
(它表示operator
不能有前置operator
,但可以作為其他operator
的前置operator
,下文會談到)。
this.chainingStrategy = ChainingStrategy.HEAD;
StreamSource
的實現略顯複雜,因為它涉及到我們前面文章談SourceFunction
時談到的SourceFunction.SourceContext
- NonTimestampContext:針對
ProcessingTime
,該SourceContext
將時間戳設定為-1,並且不發射watermark
- AutomaticWatermarkContext:針對
IngestionTime
,提供自動的watermark
發射機制的SourceContext
- ManualWatermarkContext:針對
EventTime
的人工發射watermark
的SourceContext
它們之間的對應關係也體現在其run
方法的實現中:
switch (timeCharacteristic) {
case EventTime:
ctx = new ManualWatermarkContext<>(this, lockingObject, collector);
break;
case IngestionTime:
ctx = new AutomaticWatermarkContext<>(this, lockingObject, collector,
getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval());
break;
case ProcessingTime:
ctx = new NonTimestampContext<>(this, lockingObject, collector);
break;
default:
throw new Exception(String.valueOf(timeCharacteristic));
}
在run
方法內部會呼叫SourceFunction
的run
方法:
try {
userFunction.run(ctx);
// if we get here, then the user function either exited after being done (finite source)
// or the function was canceled or stopped. For the finite source case, we should emit
// a final watermark that indicates that we reached the end of event-time
if (!isCanceledOrStopped()) {
ctx.emitWatermark(Watermark.MAX_WATERMARK);
}
} finally {
// make sure that the context is closed in any case
ctx.close();
}
StreamSource
通過一個屬性:canceledOrStopped
來控制sourceFunction
的停止。
整個StreamSource
的執行邏輯由run
來表述,通過cancel
來控制停止邏輯。
NonTimestampContext
NonTimestampContext
會忽略時間戳,因此它的實現裡稍微特別一點的地方在下面的這兩個方法:
public void collectWithTimestamp(T element, long timestamp) {
// ignore the timestamp
collect(element);
}
以及
public void emitWatermark(Watermark mark) {
owner.checkAsyncException();
// do nothing else
}
第一個方法忽略了時間戳,第二個方法不傳送watermark
。
ManualWatermarkContext
無需特別說明
AutomaticWatermarkContext
該類是自動傳送watermark
的實現,在構造器中接收引數watermarkInterval
來指定自動傳送watermark
的時間間隔。具體的實現機制是,新建一個獨立的發射執行緒,以指定的時間間隔發射:
this.scheduleExecutor = Executors.newScheduledThreadPool(1);
this.watermarkTimer = scheduleExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
final long currentTime = System.currentTimeMillis();
if (currentTime > nextWatermarkTime) {
// align the watermarks across all machines. this will ensure that we
// don't have watermarks that creep along at different intervals because
// the machine clocks are out of sync
final long watermarkTime = currentTime - (currentTime % watermarkInterval);
synchronized (lockingObjectParam) {
if (currentTime > nextWatermarkTime) {
outputParam.emitWatermark(new Watermark(watermarkTime));
nextWatermarkTime += watermarkInterval;
}
}
}
}
}, 0, watermarkInterval, TimeUnit.MILLISECONDS);
除了這種基於時間的以固定頻率發射watermark
的機制,在collect方法被呼叫時,也會檢查當前的時間戳,如果達到傳送條件也會觸發emit watermark
。
而因為該類實現的是自動傳送,在構造器中實現一個定時傳送機制,所以emitWatermark
方法也就不需要再實現傳送邏輯(因為已不再需要使用者程式呼叫emitWatermark
方法了),而該方法在該類中的主要任務是負責停止自動傳送。停止自動傳送的觸發條件是收到最後一個元素的訊號(將最後一個元素的時間戳設定為Long.MAX_VALUE
),emitWatermark
收到該標識後,再將其往下游傳遞並關閉定時傳送執行緒。
OneInputStreamOperator
單一輸入流的operator
介面,繼承自StreamOperator
。提供了兩個介面方法:
- processElement:處理到達該
operator
的一個元素 - processWatermark:處理一個
Watermark
TwoInputStreamOperator
支援兩個流作為輸入的operator
,同樣繼承自StreamOperator
。擴充了多個介面方法:
- processElement1 : 處理來自第一個輸入的某個元素
- processElement2 : 處理來自第二個輸入的某個元素
- processWatermark1 : 處理來自第一個輸入的一個
Watermark
- processWatermark2 : 處理來自第二個輸入的一個
Watermark
輔助實現類
Output
Collector
的擴充套件,增加了發射WaterMark
的功能。該介面主要供operator
用於發射元素或者WaterMark
。
- emitWatermark : 該發射
WaterMark
將廣播給下游的所有operator
TimeCharacteristic
Flink在涉及到時間相關的處理時,將時間劃分為三類。而時間型別的定義在Flink中就是用該列舉來表示:
- ProcessingTime
- IngestionTime
- EventTime
這三種時間型別之前我們曾多次提及,這裡不再囉嗦
TimestampedCollector
Output
的包裝器實現,它用於給元素設定時間戳
AbstractStreamOperator
該抽象類為實現一個具體的operator
提供基本的支援,Flink內建提供的operator
全部都直接或間接繼承自AbstractStreamOperator
。
它內部包含了三大類的屬性:
- 配置屬性
- 執行時屬性
- 鍵值對狀態屬性
大都數方法都是輔助方法,值得一提的是setup
方法。從這裡我們可以看到所有operator
識別符號的生成方式:
String operatorIdentifier = getClass().getSimpleName() + "_" + config.getVertexID() + "_" + runtimeContext.getIndexOfThisSubtask();
可以看到標識是由”_”間隔的三段拼接而成。三段分別是:類名,vertex id
,以及當前subtask
的索引。
然後基於此標識,建立了用於儲存狀態的stateBackend
:
stateBackend = container.createStateBackend(operatorIdentifier, keySerializer);
stateBackend
在 dispose
方法中會被關閉。
AbstractStreamOperator
並沒有對open/close等生命週期方法提供具體的實現,這些方法的具體實現被後延至後面談到的AbstractUdfStreamOperator
中。
AbstractUdfStreamOperator
該類主要針對operator
生命週期相關的方法(open/close/dispose)提供了模板實現。而這些實現都統一針對使用者定義的Function
的例項(簡稱udf
)。
ChainingStrategy
該列舉定義了operator
的chain strategy
(連結策略)。當一個operator
連結到其前置operator
時,意味著它們將在同一個執行緒上執行。StreamOperator
的預設值是HEAD
,這意味著它將沒有前置operator
,不過它有可能成為其他operator
的前置operator
。大部分StreamOperator
將該列舉以ALWAYS
覆蓋,表示它們將連結到一個前置operator
。
它的三個列舉值:
- ALWAYS :上面已經提到過,它允許將當前
operator
連結到某前置operator
,這是提升效能的良好實踐,它能夠提升operator
的並行度 - NEVER :該策略不支援
operator
被連結到某前置operator
也不支援被作為其他operator
的前置operator
。 - HEAD :該策略表示
operator
沒有前置operator
,不過可以作為其他operator
的chain header
內建的Operator實現
StreamCounter
元素累加器,沒有什麼特別的
StreamProject
這裡需要解釋一下,此處的project,並非通常所指的專案的意思,而是投射、投影
的意思。你可以將其類比於SQL中的SELECT
子句。因此他允許你選擇你需要的fields
集合。這通過其構造器的一個欄位索引陣列來指定:
在processElement
方法中,它依次遍歷所有需要的欄位索引,將元素中需要的欄位提取出來,放入一個用於輸出的outTuple
,最後再將其發射出去:
public void processElement(StreamRecord<IN> element) throws Exception {
for (int i = 0; i < this.numFields; i++) {
outTuple.setField(((Tuple) element.getValue()).getField(fields[i]), i);
}
output.collect(element.replace(outTuple));
}
StreamFilter
filter operator
,處理邏輯很簡單,根據自定義的FilterFunction
方法,對每個元素進行過濾,如果滿足過濾條件,則將該元素emit
出去。
StreamMap
map operator
,根據傳入的MapFunction
,對每個元素應用map
操作後將其發射出去。
StreamFlatMap
flatmap operator
接收FlatMapFunction
函式,有一些特別之處:在其open
方法中,它初始化了一個TimestampedCollector
,作為傳遞給FlatMapFunction
的collector
,該collector
是給那些特定的userFunction
使用的,並且用於給他們操作的元素設定時間戳。
StreamGroupedFold
分組的fold operator
,fold
函式的執行依賴於一個初始化值initialValue
。因此這裡涉及到狀態儲存。並且狀態是跟具體的分割槽關聯的。因此,在open
方法的實現中,需要獲得跟分割槽關聯的ValueState
:
ValueStateDescriptor<OUT> stateId = new ValueStateDescriptor<>(STATE_NAME, outTypeSerializer, null);
values = getPartitionedState(stateId);
processElement
方法的實現,涉及到一系列的操作:從ValueState
中獲取資料,作為“新”的初始值跟當前元素一起進行fold
函式運算,獲得結果後更新ValueState
,然後將獲得的結果emit
出去。
StreamGroupedReduce
按分組進行reduce
操作的operator
.
基於特定的狀態名稱:
private static final String STATE_NAME = "_op_state";
構建狀態id
ValueStateDescriptor<IN> stateId = new ValueStateDescriptor<>(STATE_NAME, serializer, null);
然後再獲取狀態值:
values = getPartitionedState(stateId);
以上兩個動作在open
方法中實現
在processElement
方法中,分為兩種情況:
- 如果之前已存在狀態值,那麼拿當前值跟之前的狀態值做
reduce
並獲得結果,將結果再次更新到最新狀態並emit
出去 - 如果之前不存在狀態值,那麼直接將當前值更新到狀態中,並將當前值
emit
出去
StreamSink
sink operator
,通常是流處理的最後一個operator
。它接收SinkFunction
的例項。在processElement
中依次呼叫其invoke
方法。
小結
本文主要探討了stream transformation
的執行時形式operator
的大致實現。
微信掃碼關注公眾號:Apache_Flink
QQ掃碼關注QQ群:Apache Flink學習交流群(123414680)