Flink原始碼系列——Flink中一個簡單的資料處理功能的實現過程
在Flink中,實現從指定主機名和埠接收字串訊息,對接收到的字串中出現的各個單詞,每隔1秒鐘就輸出最近5秒內出現的各個單詞的統計次數。
程式碼實現如下:
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
/** 需要連線的主機名和埠 */
final String hostname;
final int port;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
hostname = params.get("hostname");
port = params.getInt("port");
} catch (Exception e) {
e.printStackTrace();
System.err.println("Please run 'SocketWindowWordCount --host <host> --port <port>'");
return;
}
/** 獲取{@link StreamExecutionEnvironment}的具體實現的例項 */
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/** 通過連線給定地址和埠, 獲取資料流的資料來源 */
DataStream<String> text = env.socketTextStream(hostname, port);
/** 對資料流中的資料進行解析、分組、視窗、以及聚合 */
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
/** 打印出分析結果 */
windowCounts.print();
/** 執行處理程式 */
env.execute("Socket Window WordCount");
}
/** 單詞和統計次數的資料結構 */
public static class WordWithCount {
public String word;
public long count;
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}
對於上述實現,接下來要分析的內容有:
1)如何建立從指定host和port接收資料的資料來源;
2)如何對建立好的資料來源進行一系列操作來實現所需功能;
3)如何將分析結果打印出來。
1、構建資料來源
在Flink中,資料來源的構建是通過StreamExecutionEnviroment的具體實現的例項來構建的,如上述程式碼中的這句程式碼。
DataStream<String> text = env.socketTextStream(hostname, port);
1
這句程式碼就在指定的host和port上構建了一個接受網路資料的資料來源,接下來看其內部如何實現的。
public DataStreamSource<String> socketTextStream(String hostname, int port) {
return socketTextStream(hostname, port, "\n");
}
public DataStreamSource<String> socketTextStream(String hostname, int port, String delimiter) {
return socketTextStream(hostname, port, delimiter, 0);
}
public DataStreamSource<String> socketTextStream(String hostname, int port, String delimiter, long maxRetry) {
return addSource(new SocketTextStreamFunction(hostname, port, delimiter, maxRetry),
"Socket Stream");
}
通過對socketTextStream的過載方法的依次呼叫,可以看到會根據傳入的hostname、port,以及預設的行分隔符”\n”,和最大嘗試次數0,構造一個SocketTextStreamFunction例項,並採用預設的資料來源節點名稱為”Socket Stream”。
SocketTextStreamFunction的類繼承圖如下所示,可以看出其是SourceFunction的一個子類,而SourceFunction是Flink中資料來源的基礎介面。
SourceFunction的定義如下:
@Public
public interface SourceFunction<T> extends Function, Serializable {
void run(SourceContext<T> ctx) throws Exception;
void cancel();
@Public
interface SourceContext<T> {
void collect(T element);
@PublicEvolving
void collectWithTimestamp(T element, long timestamp);
@PublicEvolving
void emitWatermark(Watermark mark);
@PublicEvolving
void markAsTemporarilyIdle();
Object getCheckpointLock();
void close();
}
}
從定義中可以看出,其主要有兩個方法run和cancel。
run(SourceContex)方法,就是實現資料獲取邏輯的地方,並可以通過傳入的引數ctx進行向下遊節點的資料轉發。
cancel()方法,則是用來取消資料來源的資料產生,一般在run方法中,會存在一個迴圈來持續產生資料,而cancel方法則可以使得該迴圈終止。
其內部介面SourceContex則是用來進行資料傳送的介面。瞭解了SourceFunction這個介面的功能後,來看下SocketTextStreamFunction的具體實現,也就是主要看其run方法的具體實現。
public void run(SourceContext<String> ctx) throws Exception {
final StringBuilder buffer = new StringBuilder();
long attempt = 0;
/** 這裡是第一層迴圈,只要當前處於執行狀態,該迴圈就不會退出,會一直迴圈 */
while (isRunning) {
try (Socket socket = new Socket()) {
/** 對指定的hostname和port,建立Socket連線,並構建一個BufferedReader,用來從Socket中讀取資料 */
currentSocket = socket;
LOG.info("Connecting to server socket " + hostname + ':' + port);
socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME);
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
char[] cbuf = new char[8192];
int bytesRead;
/** 這裡是第二層迴圈,對執行狀態進行了雙重校驗,同時對從Socket中讀取的位元組數進行判斷 */
while (isRunning && (bytesRead = reader.read(cbuf)) != -1) {
buffer.append(cbuf, 0, bytesRead);
int delimPos;
/** 這裡是第三層迴圈,就是對從Socket中讀取到的資料,按行分隔符進行分割,並將每行資料作為一個整體字串向下遊轉發 */
while (buffer.length() >= delimiter.length() && (delimPos = buffer.indexOf(delimiter)) != -1) {
String record = buffer.substring(0, delimPos);
if (delimiter.equals("\n") && record.endsWith("\r")) {
record = record.substring(0, record.length() - 1);
}
/** 用入參ctx,進行資料的轉發 */
ctx.collect(record);
buffer.delete(0, delimPos + delimiter.length());
}
}
}
/** 如果由於遇到EOF字元,導致從迴圈中退出,則根據執行狀態,以及設定的最大重試嘗試次數,決定是否進行 sleep and retry,或者直接退出迴圈 */
if (isRunning) {
attempt++;
if (maxNumRetries == -1 || attempt < maxNumRetries) {
LOG.warn("Lost connection to server socket. Retrying in " + delayBetweenRetries + " msecs...");
Thread.sleep(delayBetweenRetries);
}
else {
break;
}
}
}
/** 在最外層的迴圈都退出後,最後檢查下快取中是否還有資料,如果有,則向下遊轉發 */
if (buffer.length() > 0) {
ctx.collect(buffer.toString());
}
}
run方法的邏輯如上,邏輯很清晰,就是從指定的hostname和port持續不斷的讀取資料,按行分隔符劃分成一個個字串,然後轉發到下游。
cancel方法的實現如下,就是將執行狀態的標識isRunning屬性設定為false,並根據需要關閉當前socket。
public void cancel() {
isRunning = false;
Socket theSocket = this.currentSocket;
/** 如果當前socket不為null,則進行關閉操作 */
if (theSocket != null) {
IOUtils.closeSocket(theSocket);
}
}
對SocketTextStreamFunction的實現清楚之後,回到StreamExecutionEnvironment中,看addSource方法。
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName) {
return addSource(function, sourceName, null);
}
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) {
/** 如果傳入的輸出資料型別資訊為null,則嘗試提取輸出資料的型別資訊 */
if (typeInfo == null) {
if (function instanceof ResultTypeQueryable) {
/** 如果傳入的function實現了ResultTypeQueryable介面, 則直接通過介面獲取 */
typeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType();
} else {
try {
/** 通過反射機制來提取型別資訊 */
typeInfo = TypeExtractor.createTypeInfo(
SourceFunction.class,
function.getClass(), 0, null, null);
} catch (final InvalidTypesException e) {
/** 提取失敗, 則返回一個MissingTypeInfo例項 */
typeInfo = (TypeInformation<OUT>) new MissingTypeInfo(sourceName, e);
}
}
}
/** 根據function是否是ParallelSourceFunction的子類例項來判斷是否是一個並行資料來源節點 */
boolean isParallel = function instanceof ParallelSourceFunction;
/** 閉包清理, 可減少序列化內容, 以及防止序列化出錯 */
clean(function);
StreamSource<OUT, ?> sourceOperator;
/** 根據function是否是StoppableFunction的子類例項, 來決定構建不同的StreamOperator */
if (function instanceof StoppableFunction) {
sourceOperator = new StoppableStreamSource<>(cast2StoppableSourceFunction(function));
} else {
sourceOperator = new StreamSource<>(function);
}
/** 返回一個新構建的DataStreamSource例項 */
return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName);
}
通過對addSource過載方法的依次呼叫,最後得到了一個DataStreamSource的例項。具體實現過程如上述程式碼所示,各步驟詳見註解。
其中TypeInformation是Flink的型別系統中的核心類,用作函式輸入和輸出的型別都需要通過TypeInformation來表示,TypeInformation可以看做是資料型別的一個工具,可以通過它獲取對應資料型別的序列化器和比較器等。
由於SocketTextStreamFunction不是繼承自ParallelSourceFunction,且實現stoppableFunction介面,isParallel的值為false,以及sourceOperator變數對應的是一個StreamSource例項。
StreamSource的類繼承圖如下所示。
上圖可以看出StreamSource是StreamOperator介面的一個具體實現類,其建構函式的入參就是一個SourceFunction的子類例項,這裡就是前面介紹過的SocketTextStreamFunciton的例項,構造過程如下:
public StreamSource(SRC sourceFunction) {
super(sourceFunction);
this.chainingStrategy = ChainingStrategy.HEAD;
}
public AbstractUdfStreamOperator(F userFunction) {
this.userFunction = requireNonNull(userFunction);
checkUdfCheckpointingPreconditions();
}
private void checkUdfCheckpointingPreconditions() {
if (userFunction instanceof CheckpointedFunction && userFunction instanceof ListCheckpointed) {
throw new IllegalStateException("User functions are not allowed to implement AND ListCheckpointed.");
}
}
構造過程的邏輯很明瞭,把傳入的userFunction賦值給自己的屬性變數,並對傳入的userFunction做了校驗工作,然後將連結策略設定為HEAD。
Flink中為了優化執行效率,會對資料處理鏈中的相鄰節點會進行合併處理,連結策略有三種:
ALWAYS —— 儘可能的與前後節點進行連結;
NEVER —— 不與前後節點進行連結;
HEAD —— 只能與後面的節點連結,不能與前面的節點連結。
作為資料來源的源頭,是最頂端的節點了,所以只能採用HEAD或者NEVER,對於StreamSource,採用的是HEAD策略。
StreamOperator是Flink中流操作符的基礎介面,其抽象子類AbstractStreamOperator實現了一些公共方法,使用者自定義的資料處理邏輯會被封裝在StreamOperator的具體實現子類中。
在sourceOperator變數被賦值後,即開始進行DataStreamSource的例項構建,並作為資料來源構造呼叫的返回結果。
return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName);
1
DataStreamSource的類繼承圖如下所示,是具有一個預定義輸出型別的DataStream。
在Flink中,DataStream描述了一個具有相同資料型別的資料流,其提供了資料操作的各種API,如map、reduce等,通過這些API,可以對資料流中的資料進行各種操作,DataStreamSource的構建過程如下:
public DataStreamSource(StreamExecutionEnvironment environment,
TypeInformation<T> outTypeInfo, StreamSource<T, ?> operator,
boolean isParallel, String sourceName) {
super(environment, new SourceTransformation<>(sourceName, operator, outTypeInfo, environment.getParallelism()));
this.isParallel = isParallel;
if (!isParallel) {
setParallelism(1);
}
}
protected SingleOutputStreamOperator(StreamExecutionEnvironment environment, StreamTransformation<T> transformation) {
super(environment, transformation);
}
public DataStream(StreamExecutionEnvironment environment, StreamTransformation<T> transformation) {
this.environment = Preconditions.checkNotNull(environment, "Execution Environment must not be null.");
this.transformation = Preconditions.checkNotNull(transformation, "Stream Transformation must not be null.");
}
可見構建過程就是初始化了DataStream中的environment和transformation這兩個屬性。
其中transformation賦值的是SourceTranformation的一個例項,SourceTransformation是StreamTransformation的子類,而StreamTransformation則描述了建立一個DataStream的操作。對於每個DataStream,其底層都是有一個StreamTransformation的具體例項的,所以在DataStream在構造初始時會為其屬性transformation設定一個具體的例項。並且DataStream的很多介面的呼叫都是直接呼叫的StreamTransformation的相應介面,如並行度、id、輸出資料型別資訊、資源描述等。
通過上述過程,根據指定的hostname和port進行資料產生的資料來源就構造完成了,獲得的是一個DataStreamSource的例項,描述的是一個輸出資料型別是String的資料流的源。
在上述的資料來源的構建過程中,出現Function(SourceFunction)、StreamOperator、StreamTransformation、DataStream這四個介面。
Function介面:使用者通過繼承該介面的不同子類來實現使用者自己的資料處理邏輯,如上述中實現了SourceFunction這個子類,來實現從指定hostname和port來接收資料,並轉發字串的邏輯;
StreamOperator介面:資料流操作符的基礎介面,該介面的具體實現子類中,會有儲存使用者自定義資料處理邏輯的函式的屬性,負責對userFunction的呼叫,以及呼叫時傳入所需引數,比如在StreamSource這個類中,在呼叫SourceFunction的run方法時,會構建一個SourceContext的具體例項,作為入參,用於run方法中,進行資料的轉發;
StreamTransformation介面:該介面描述了構建一個DataStream的操作,以及該操作的並行度、輸出資料型別等資訊,並有一個屬性,用來持有StreamOperator的一個具體例項;
DataStream:描述的是一個具有相同資料型別的資料流,底層是通過具體的StreamTransformation來實現,其負責提供各種對流上的資料進行操作轉換的API介面。
通過上述的關係,終端使用者自定義資料處理邏輯的函式,以及並行度、輸出資料型別等就都包含在了DataStream中,而DataStream也就可以很好的描述一個具體的資料流了。
上述四個介面的包含關係是這樣的:Function –> StreamOperator –> StreamTransformation –> DataStream。
通過資料來源的構造,理清Flink資料流中的幾個介面的關係後,接下來再來看如何在資料來源上進行各種操作,達到最終的資料統計分析的目的。
2、操作資料流
對上述獲取到的DataStreamSource,進行具體的轉換操作,具體操作就是這段邏輯。
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
這段邏輯中,對資料流做了四次操作,分別是flatMap、keyBy、timeWindow、reduce,接下來分別介紹每個轉換都做了些什麼操作。
2.1、flatMap轉換
flatMap的入參是一個FlatMapFunction的具體實現,功能就是將接收到的字串,按空格切割成不同單詞,然後每個單詞構建一個WordWithCount例項,然後向下遊轉發,用於後續的資料統計。然後呼叫DataStream的flatMap方法,進行資料流的轉換,如下:
public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {
TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
getType(), Utils.getCallLocationName(), true);
/** 根據傳入的flatMapper這個Function,構建StreamFlatMap這個StreamOperator的具體子類例項 */
return transform("Flat Map", outType, new StreamFlatMap<>(clean(flatMapper)));
}
public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
/** 讀取輸入轉換的輸出型別, 如果是MissingTypeInfo, 則及時丟擲異常, 終止操作 */
transformation.getOutputType();
OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
this.transformation,
operatorName,
operator,
outTypeInfo,
environment.getParallelism());
@SuppressWarnings({ "unchecked", "rawtypes" })
SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
getExecutionEnvironment().addOperator(resultTransform);
return returnStream;
}
整個構建過程,與構建資料來源的過程相似。
a、先根據傳入的flatMapper這個Function構建一個StreamOperator的具體子類StreamFlatMap的例項;
b、根據a中構建的StreamFlatMap的例項,構建出OneInputTransFormation這個StreamTransformation的子類的例項;
c、再構建出DataStream的子類SingleOutputStreamOperator的例項。
上述邏輯中,除了構建出了SingleOutputStreamOperator這個例項為並返回外,還有一句程式碼:
getExecutionEnvironment().addOperator(resultTransform);
1
這句的最終實現如下:
public void addOperator(StreamTransformation<?> transformation) {
Preconditions.checkNotNull(transformation, "transformation must not be null.");
this.transformations.add(transformation);
}
就是將上述構建的OneInputTransFormation的例項,新增到了StreamExecutionEnvironment的屬性transformations這個型別為List
2.2、keyBy轉換
這裡的keyBy轉換,入參是一個字串”word”,意思是按照WordWithCount中的word欄位進行分割槽操作。
public KeyedStream<T, Tuple> keyBy(String... fields) {
return keyBy(new Keys.ExpressionKeys<>(fields, getType()));
}
先根據傳入的欄位名陣列,以及資料流的輸出資料型別資訊,構建出用來描述key的ExpressionKeys的例項,ExpressionKeys有兩個屬性:
/** key欄位的列表, FlatFieldDescriptor 描述了每個key, 在所在型別中的位置以及key自身的資料類資訊 */
private List<FlatFieldDescriptor> keyFields;
/** 包含key的資料型別的型別資訊, 與建構函式入參中的欄位順序一一對應 */
private TypeInformation<?>[] originalKeyTypes;
在獲取key的描述之後,繼續呼叫keyBy的過載方法:
private KeyedStream<T, Tuple> keyBy(Keys<T> keys) {
return new KeyedStream<>(this, clean(KeySelectorUtil.getSelectorForKeys(keys,
getType(), getExecutionConfig())));
}
這裡首先構建了一個KeySelector的子類ComparableKeySelector的例項,作用就是從具體的輸入例項中,提取出key欄位對應的值(可能是多個key欄位)組成的元組(Tuple)。
對於這裡的例子,就是從每個WordWithCount例項中,提取出word欄位的值。
然後構建一個KeyedStream的例項,KeyedStream也是DataStream的子類。構建過程如下:
public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector) {
this(dataStream, keySelector, TypeExtractor.getKeySelectorTypes(keySelector, dataStream.getType()));
}
public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, TypeInformation<KEY> keyType) {
super(
dataStream.getExecutionEnvironment(),
new PartitionTransformation<>(
dataStream.getTransformation(),
new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)));
this.keySelector = keySelector;
this.keyType = validateKeyType(keyType);
}
在進行父類建構函式呼叫之前,先基於keySelector構造了一個KeyGroupStreamPartitioner的例項,再進一步構造了一個PartitionTransformation例項。
這裡與flatMap的轉換略有不同:
a、flatMap中,根據傳入的flatMapper這個Function構建的是StreamOperator這個介面的子類的例項,而keyBy中,則是根據keySelector構建了ChannelSelector介面的子類例項;
b、keyBy中構建的StreamTransformation例項,並沒有新增到StreamExecutionEnvironment的屬性transformations這個列表中。
ChannelSelector只有一個介面,根據傳入的資料流中的具體資料記錄,以及下個節點的並行度來決定該條記錄需要轉發到哪個通道。
public interface ChannelSelector<T extends IOReadableWritable> {
int[] selectChannels(T record, int numChannels);
}
KeyGroupStreamPartitioner中該方法的實現如下:
public int[] selectChannels(
SerializationDelegate<StreamRecord<T>> record,
int numberOfOutputChannels) {
K key;
try {
/** 通過keySelector從傳入的record中提取出對應的key */
key = keySelector.getKey(record.getInstance().getValue());
} catch (Exception e) {
throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
}
/** 根據提取的key,最大並行度,以及輸出通道數,決定出record要轉發到的通道編號 */
returnArray[0] = KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfOutputChannels);
return returnArray;
}
a、先通過key的hashCode,算出maxParallelism的餘數,也就是可以得到一個[0, maxParallelism)的整數;
b、在通過公式 keyGroupId * parallelism / maxParallelism ,計算出一個[0, parallelism)區間的整數,從而實現分割槽功能。
2.3、timeWindow轉換
這裡timeWindow轉換的入參是兩個時間,第一個引數表示視窗長度,第二個引數表示視窗滑動的時間間隔。
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) {
if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
return window(SlidingProcessingTimeWindows.of(size, slide));
} else {
return window(SlidingEventTimeWindows.of(size, slide));
}
}
根據環境配置的資料流處理時間特徵構建不同的WindowAssigner的具體例項。WindowAssigner的功能就是對於給定的資料流中的記錄,決定出該記錄應該放入哪些視窗中,並提供觸發器等供。預設的時間特徵是ProcessingTime,所以這裡會構建一個SlidingProcessingTimeWindow例項,來看下SlidingProcessingTimeWindow類的assignWindows方法的實現邏輯。
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
/** 根據傳入的WindowAssignerContext獲取當前處理時間 */
timestamp = context.getCurrentProcessingTime();
List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
/** 獲取最近一次的視窗的開始時間 */
long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
/** 迴圈找出滿足條件的所有視窗 */
for (long start = lastStart;
start > timestamp - size;
start -= slide) {
windows.add(new TimeWindow(start, start + size));
}
return windows;
}
看一下根據給定時間戳獲取最近一次的視窗的開始時間的實現邏輯。
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
return timestamp - (timestamp - offset + windowSize) % windowSize;
}
通過一個例子來解釋上述程式碼的邏輯。比如:
a、timestamp = 1520406257000 // 2018-03-07 15:04:17
b、offset = 0
c、windowSize = 60000
d、(timestamp - offset + windowSize) % windowSize = 17000
e、說明在時間戳 1520406257000 之前最近的視窗是在 17000 毫秒的地方
f、timestamp - (timestamp - offset + windowSize) % windowSize = 1520406240000 // 2018-03-07 15:04:00
g、這樣就可以保證每個時間視窗都是從整點開始, 而offset則是由於時區等原因需要時間調整而設定。
通過上述獲取WindowAssigner的子類例項後,呼叫window方法:
public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
return new WindowedStream<>(this, assigner);
}
比keyBy轉換的邏輯還簡單,就是構建了一個WindowedStream例項,然後返回,就結束了。而WindowedStream是一個新的資料流,不是DataStream的子類。
WindowedStream描述一個數據流中的元素會基於key進行分組,並且對於每個key,對應的元素會被劃分到多個時間視窗內。然後視窗會基於觸發器,將對應視窗中的資料轉發到下游節點。
2.4、reduce轉換
reduce轉換的入參是一個ReduceFunction的具體實現,這裡的邏輯就是對收到的WordWithCount例項集合,將其中word欄位相同的實際的count值累加。
public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> function) {
if (function instanceof RichFunction) {
throw new UnsupportedOperationException("ReduceFunction of reduce can not be a RichFunction. " +
"Please use reduce(ReduceFunction, WindowFunction) instead.");
}
/** 閉包清理 */
function = input.getExecutionEnvironment().clean(function);
return reduce(function, new PassThroughWindowFunction<K, W, T>());
}
public <R> SingleOutputStreamOperator<R> reduce(
ReduceFunction<T> reduceFunction,
WindowFunction<T, R, K, W> function) {
TypeInformation<T> inType = input.getType();
TypeInformation<R> resultType = getWindowFunctionReturnType(function, inType);
return reduce(reduceFunction, function, resultType);
}
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
if (reduceFunction instanceof RichFunction) {
throw new UnsupportedOperationException("ReduceFunction of apply can not be a RichFunction.");
}
function = input.getExecutionEnvironment().clean(function);
reduceFunction = input.getExecutionEnvironment().clean(reduceFunction);
String callLocation = Utils.getCallLocationName();
String udfName = "WindowedStream." + callLocation;
String opName;
KeySelector<T, K> keySel = input.getKeySelector();
OneInputStreamOperator<T, R> operator;
if (evictor != null) {
@SuppressWarnings({"unchecked", "rawtypes"})
TypeSerializer<StreamRecord<T>> streamRecordSerializer =
(TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
ListStateDescriptor<StreamRecord<T>> stateDesc =
new ListStateDescriptor<>("window-contents", streamRecordSerializer);
opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
operator =
new EvictingWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
new InternalIterableProcessWindowFunction<>(new ReduceApplyProcessWindowFunction<>(reduceFunction, function)),
trigger,
evictor,
allowedLateness,
lateDataOutputTag);
} else {
ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
reduceFunction,
input.getType().createSerializer(getExecutionEnvironment().getConfig()));
opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
operator =
new WindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
new InternalSingleValueProcessWindowFunction<>(function),
trigger,
allowedLateness,
lateDataOutputTag);
}
return input.transform(opName, resultType, operator);
}
通過對reduce過載方法的逐步呼叫,會走到上述程式碼的else邏輯中,這裡也是先構建了StreamOperator的具體子類例項。
public <R> SingleOutputStreamOperator<R> transform(String operatorName,
TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
SingleOutputStreamOperator<R> returnStream = super.transform(operatorName, outTypeInfo, operator);
OneInputTransformation<T, R> transform = (OneInputTransformation<T, R>) returnStream.getTransformation();
transform.setStateKeySelector(keySelector);
transform.setStateKeyType(keyType);
return returnStream;
}
父類的transform中的邏輯如下:
public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
/** 讀取輸入轉換的輸出型別, 如果是MissingTypeInfo, 則及時丟擲異常, 終止操作 */
transformation.getOutputType();
OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
this.transformation,
operatorName,
operator,
outTypeInfo,
environment.getParallelism());
@SuppressWarnings({ "unchecked", "rawtypes" })
SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
getExecutionEnvironment().addOperator(resultTransform);
return returnStream;
}
邏輯與flatMap相似,也是基於StreamOperator構建了一個StreamTransformation的子類OneInputTransformation的例項,然後構建了DataStream的子類SingleOutputStreamOperator的例項,最後也將構建的StreamTransformation的子類例項新增到了StreamExecutionEnvironment的屬性transformations這個列表中。
經過上述操作,對資料流中的資料進行分組聚合的操作就完成了。
3、輸出統計結果
統計結果的輸出如下:
windowCounts.print();
1
print方法就是在資料流的最後添加了一個Sink,用於承接統計結果。
public DataStreamSink<T> print() {
PrintSinkFunction<T> printFunction = new PrintSinkFunction<>();
return addSink(printFunction);
}
其中PrintSinkFunction的類繼承圖如下所示:
作為一個SinkFunction介面的實現,看下其對invoke方法的實現:
public void invoke(IN record) {
if (prefix != null) {
stream.println(prefix + record.toString());
}
else {
stream.println(record.toString());
}
}
實現邏輯很清晰,就是將記錄輸出列印。繼續看addSink方法:
public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
transformation.getOutputType();
if (sinkFunction instanceof InputTypeConfigurable) {
((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig());
}
StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));
DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator);
getExecutionEnvironment().addOperator(sink.getTransformation());
return sink;
}
實現邏輯與資料來源是相似的,先構建StreamOperator,再構建DataStreamSink,在DataStreamSink的構建中,會構造出StreamTransformation例項,最後會將這個StreamTransformation例項新增到StreamExecutionEnvironment的屬性transformations這個列表中。
經過上述步驟,就完成了資料流的源構造、資料流的轉換操作、資料流的Sink構造,在這個過程中,每次轉換都會產生一個新的資料流,而每個資料流下幾乎都有一個StreamTransformation的子類例項,對於像flatMap、reduce這些轉換得到的資料流裡的StreamTransformation會被新增到StreamExecutionEnvironment的屬性transformations這個列表中,這個屬性在後續構建StreamGraph時會使用到。
另外在這個資料流的構建與轉換過程中,每個DataStream中的StreamTransformation的具體子類中都有一個input屬性,該屬性會記錄該StreamTransformation的上游的DataStream的StreamTransformation引用,從而使得整個資料流中的StreamTransformation構成了一個隱式的連結串列,由於一個數據流可能會轉換成多個輸出資料流,以及多個輸入資料流又有可能會合併成一個輸出資料流,確定的說,不是隱式列表,而是一張隱式的圖。
上述資料轉換完成後,就會進行任務的執行,就是執行如下程式碼:
env.execute("Socket Window WordCount");
這裡就會根據上述的轉換過程,先生成StreamGraph,再根據StreamGraph生成JobGraph,然後通過客戶端提交到叢集進行排程執行。
---------------------
作者:混混fly
來源:CSDN
原文:https://blog.csdn.net/qq_21653785/article/details/79488249
版權宣告:本文為博主原創文章,轉載請附上博文連結!