Flink運算元使用方法及例項之keyBy、aggregation、reduce
Flink常用運算元之map、filter和flatMap使用方法示例
Flink實現UDF函式之FilterFunction、MapFunction函式
本文將對Flink Transformation中keyBy、reduce運算元進行介紹,並結合例子展示具體使用方法
一、keyBy運算元
keyBy運算元根據事件的某種屬性或資料的某個欄位做為Key進行分組,相同Key的元素被分到了一起,進行後續運算元統一處理。
示例如下:
功能描述:根據感測器id進行分組
SingleOutputStreamOperator<SensorReading> aggStream = stream1.keyBy(new KeySelector<SensorReading, String>() { @Override public String getKey(SensorReading sensorReading) throws Exception { return sensorReading.sensorId; } });
二、aggregation聚合運算元
aggregation聚合運算元常用有sum、min、max 這些運算元需要指定按照哪個欄位(一個引數)進行聚合。
準備一組資料:
sensor_6,1608112830l,15.4
sensor_7,1608112837l,6.7
sensor_10,1608112842l,38.1
sensor_1,1608112731l,35.8
sensor_1,1608112851l,32
sensor_1,1608112731l,36.2
sensor_1,1608112837l,30.9
sensor_1,1608112842l,29.7
示例如下:
1、功能描述:使用min運算元,根據id進行分組聚合,輸出每個感測器溫度最小值
SingleOutputStreamOperator<SensorReading> aggStream = stream1.keyBy(new KeySelector<SensorReading, String>() { @Override public String getKey(SensorReading sensorReading) throws Exception { return sensorReading.sensorId; } }).min("temperature"); aggStream.print("aggStream");
輸出結果:
aggStream:7> SensorReading{sensorId='sensor_7', timeStamp=1608112837l, temperature=6.7}
aggStream:6> SensorReading{sensorId='sensor_6', timeStamp=1608112830l, temperature=15.4}
aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112842l, temperature=29.7}
aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112842l, temperature=29.7}
aggStream:4> SensorReading{sensorId='sensor_10', timeStamp=1608112842l, temperature=38.1}
aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112842l, temperature=29.7}
aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112842l, temperature=29.7}
aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112842l, temperature=29.7}
min運算元對temperature欄位求最小值,並將結果儲存在temperature欄位上。對於其他欄位(如:timeStamp欄位),該操作並不能保證其數值。
2、功能描述:使用minBy運算元,根據id進行分組聚合,輸出每個感測器溫度最小值
SingleOutputStreamOperator<SensorReading> aggStream = stream1.keyBy(new KeySelector<SensorReading, String>() {
@Override
public String getKey(SensorReading sensorReading) throws Exception {
return sensorReading.sensorId;
}
}).minBy("temperature");
aggStream.print("aggStream");
輸出結果:
aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112851l, temperature=29.7}
aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112851l, temperature==29.7}
aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112837l, temperature==29.7}
aggStream:4> SensorReading{sensorId='sensor_10', timeStamp=1608112842l, temperature=38.1}
aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112837l, temperature==29.7}
aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112842l, temperature=29.7}
aggStream:7> SensorReading{sensorId='sensor_7', timeStamp=1608112837l, temperature=6.7}
aggStream:6> SensorReading{sensorId='sensor_6', timeStamp=1608112830l, temperature=15.4}
minBy與min的區別在於,minBy同時保留其他欄位(如:timeStamp欄位)的數值,min運算元對temperature欄位求最小值,minBy返回具有最小值的元素。
max、maxBy運算元與類似min、minBy 相反過程,這裡略示例
3、功能描述:使用sum運算元,對temperature欄位進行加和,並將結果儲存在temperature欄位上
SingleOutputStreamOperator<SensorReading> aggStream = stream1.keyBy(new KeySelector<SensorReading, String>() {
@Override
public String getKey(SensorReading sensorReading) throws Exception {
return sensorReading.sensorId;
}
}).sum("temperature");
aggStream.print("aggStream");
輸出結果:
aggStream:4> SensorReading{sensorId='sensor_10', timeStamp=1608112842l, temperature=38.1}
aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112731l, temperature=35.8}
aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112731l, temperature=67.8}
aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112731l, temperature=98.69999999999999}
aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112731l, temperature=128.39999999999998}
aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112731l, temperature=164.59999999999997}
aggStream:7> SensorReading{sensorId='sensor_7', timeStamp=1608112837l, temperature=6.7}
aggStream:6> SensorReading{sensorId='sensor_6', timeStamp=1608112830l, temperature=15.4}
三、reduce運算元
reduce運算元是按照一個欄位分組的資料流上,接受兩個輸入,生成一個輸出,生成一個同類型的新元素。(即:合併當前的元素 和上次聚合的結果,產生一個新的值,返回的流中包含每一次聚合的結果,而不是 只返回最後一次聚合的最終結果)
示例如下:
1、功能描述:根據id進行分組聚合,輸出每個感測器溫度最小值,以及最近的時間戳
DataStream<SensorReading> resultStream = stream1.keyBy(new KeySelector<SensorReading, String>() {
@Override
public String getKey(SensorReading sensorReading) throws Exception {
return sensorReading.sensorId;
}
}).reduce(new ReduceFunction<SensorReading>() {
@Override
public SensorReading reduce(SensorReading t0, SensorReading t1) throws Exception {
if(t0.temperature.compareTo(t1.temperature)==1){
return new SensorReading(t1.sensorId,t1.timeStamp,t1.temperature);
}
return t0;
}
});
輸出結果:
resultReduceStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112842l, temperature=29.7}
resultReduceStream:6> SensorReading{sensorId='sensor_6', timeStamp=1608112830l, temperature=15.4}
resultReduceStream:7> SensorReading{sensorId='sensor_7', timeStamp=1608112837l, temperature=6.7}
resultReduceStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112842l, temperature=29.7}
resultReduceStream:4> SensorReading{sensorId='sensor_10', timeStamp=1608112842l, temperature=38.1}
resultReduceStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112842l, temperature=29.7}
resultReduceStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112842l, temperature=29.7}
resultReduceStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112842l, temperature=29.7}
到此keyBy、aggregation、reduce 運算元示例介紹完成
如果覺得文章能幫到您,歡迎關注,共同進步!
持續分享大資料、人工智慧等科技類原創文章 (公眾號:大資料技術天涯)。