1. 程式人生 > 其它 >Flink運算元使用方法及例項之keyBy、aggregation、reduce

Flink運算元使用方法及例項之keyBy、aggregation、reduce

技術標籤:大資料之Flinkflink實時大資料大資料

​Flink常用運算元之map、filter和flatMap使用方法示例

Flink計算支援的資料型別

Flink實現UDF函式之FilterFunction、MapFunction函式

本文將對Flink Transformation中keyBy、reduce運算元進行介紹,並結合例子展示具體使用方法

一、keyBy運算元

keyBy運算元根據事件的某種屬性或資料的某個欄位做為Key進行分組,相同Key的元素被分到了一起,進行後續運算元統一處理。

示例如下:

功能描述:根據感測器id進行分組

 SingleOutputStreamOperator<SensorReading> aggStream = 
        stream1.keyBy(new KeySelector<SensorReading, String>() {
            @Override
            public String getKey(SensorReading sensorReading) throws Exception {
                return sensorReading.sensorId;
            }
        });

二、aggregation聚合運算元

aggregation聚合運算元常用有sum、min、max 這些運算元需要指定按照哪個欄位(一個引數)進行聚合。

準備一組資料:

sensor_6,1608112830l,15.4
sensor_7,1608112837l,6.7
sensor_10,1608112842l,38.1
sensor_1,1608112731l,35.8
sensor_1,1608112851l,32
sensor_1,1608112731l,36.2
sensor_1,1608112837l,30.9
sensor_1,1608112842l,29.7

示例如下:

1、功能描述:使用min運算元,根據id進行分組聚合,輸出每個感測器溫度最小值

       SingleOutputStreamOperator<SensorReading> aggStream = stream1.keyBy(new KeySelector<SensorReading, String>() {
            @Override
            public String getKey(SensorReading sensorReading) throws Exception {
                return sensorReading.sensorId;
            }
        }).min("temperature");

        aggStream.print("aggStream");

輸出結果:

aggStream:7> SensorReading{sensorId='sensor_7', timeStamp=1608112837l, temperature=6.7}
aggStream:6> SensorReading{sensorId='sensor_6', timeStamp=1608112830l, temperature=15.4}
aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112842l, temperature=29.7}
aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112842l, temperature=29.7}
aggStream:4> SensorReading{sensorId='sensor_10', timeStamp=1608112842l, temperature=38.1}
aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112842l, temperature=29.7}
aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112842l, temperature=29.7}
aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112842l, temperature=29.7}

min運算元對temperature欄位求最小值,並將結果儲存在temperature欄位上。對於其他欄位(如:timeStamp欄位),該操作並不能保證其數值。

2、功能描述:使用minBy運算元,根據id進行分組聚合,輸出每個感測器溫度最小值

       SingleOutputStreamOperator<SensorReading> aggStream = stream1.keyBy(new KeySelector<SensorReading, String>() {
            @Override
            public String getKey(SensorReading sensorReading) throws Exception {
                return sensorReading.sensorId;
            }
        }).minBy("temperature");

        aggStream.print("aggStream");

輸出結果:

aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112851l, temperature=29.7}
aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112851l, temperature==29.7}
aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112837l, temperature==29.7}
aggStream:4> SensorReading{sensorId='sensor_10', timeStamp=1608112842l, temperature=38.1}
aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112837l, temperature==29.7}
aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112842l, temperature=29.7}
aggStream:7> SensorReading{sensorId='sensor_7', timeStamp=1608112837l, temperature=6.7}
aggStream:6> SensorReading{sensorId='sensor_6', timeStamp=1608112830l, temperature=15.4}

minBy與min的區別在於,minBy同時保留其他欄位(如:timeStamp欄位)的數值,min運算元對temperature欄位求最小值,minBy返回具有最小值的元素。

max、maxBy運算元與類似min、minBy 相反過程,這裡略示例

3、功能描述:使用sum運算元,對temperature欄位進行加和,並將結果儲存在temperature欄位上

       SingleOutputStreamOperator<SensorReading> aggStream = stream1.keyBy(new KeySelector<SensorReading, String>() {
            @Override
            public String getKey(SensorReading sensorReading) throws Exception {
                return sensorReading.sensorId;
            }
        }).sum("temperature");

        aggStream.print("aggStream");

輸出結果:

aggStream:4> SensorReading{sensorId='sensor_10', timeStamp=1608112842l, temperature=38.1}
aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112731l, temperature=35.8}
aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112731l, temperature=67.8}
aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112731l, temperature=98.69999999999999}
aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112731l, temperature=128.39999999999998}
aggStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112731l, temperature=164.59999999999997}
aggStream:7> SensorReading{sensorId='sensor_7', timeStamp=1608112837l, temperature=6.7}
aggStream:6> SensorReading{sensorId='sensor_6', timeStamp=1608112830l, temperature=15.4}

三、reduce運算元

reduce運算元是按照一個欄位分組的資料流上,接受兩個輸入,生成一個輸出,生成一個同類型的新元素。(即:合併當前的元素 和上次聚合的結果,產生一個新的值,返回的流中包含每一次聚合的結果,而不是 只返回最後一次聚合的最終結果)

示例如下:

1、功能描述:根據id進行分組聚合,輸出每個感測器溫度最小值,以及最近的時間戳

        DataStream<SensorReading> resultStream = stream1.keyBy(new KeySelector<SensorReading, String>() {
            @Override
            public String getKey(SensorReading sensorReading) throws Exception {
                return sensorReading.sensorId;
            }
        }).reduce(new ReduceFunction<SensorReading>() {
            @Override
            public SensorReading reduce(SensorReading t0, SensorReading t1) throws Exception {
                if(t0.temperature.compareTo(t1.temperature)==1){
                    return new SensorReading(t1.sensorId,t1.timeStamp,t1.temperature);
                }
                return t0;
            }
        });

輸出結果:

resultReduceStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112842l, temperature=29.7}
resultReduceStream:6> SensorReading{sensorId='sensor_6', timeStamp=1608112830l, temperature=15.4}
resultReduceStream:7> SensorReading{sensorId='sensor_7', timeStamp=1608112837l, temperature=6.7}
resultReduceStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112842l, temperature=29.7}
resultReduceStream:4> SensorReading{sensorId='sensor_10', timeStamp=1608112842l, temperature=38.1}
resultReduceStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112842l, temperature=29.7}
resultReduceStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112842l, temperature=29.7}
resultReduceStream:5> SensorReading{sensorId='sensor_1', timeStamp=1608112842l, temperature=29.7}

到此keyBy、aggregation、reduce 運算元示例介紹完成

如果覺得文章能幫到您,歡迎關注,共同進步!

持續分享大資料、人工智慧等科技類原創文章 (公眾號:大資料技術天涯)。