1. 程式人生 > 其它 >Flink得Aggregate和minBy、maxBy的區別

Flink得Aggregate和minBy、maxBy的區別

技術標籤:Transformatiomflink

Flink得Aggregate和minBy、maxBy的區別

直接程式碼結果分析說明
public class AggregateDemo {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource dataSource = env.readTextFile(“data/apache.log”);
MapOperator<String, Tuple2<String, Integer>> map = dataSource.map(new MapFunction<String, Tuple2<String, Integer>>() {

@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return Tuple2.of(value.split(" ")[0], 1);
}
});

    GroupReduceOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> reduceOperator = map.groupBy(0).reduceGroup(new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
        @Override
        public void reduce(Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) throws Exception {
            String key = null;
            int count = 0;
            for (Tuple2<String, Integer> value : values) {
                key = value.f0;
                count += value.f1;
            }
            out.collect(Tuple2.of(key, count));
        }
    });

    reduceOperator.print();

    System.out.println("--------------------");
    reduceOperator.aggregate(Aggregations.MAX,1).print();
    reduceOperator.aggregate(Aggregations.MIN,1).print();
    reduceOperator.aggregate(Aggregations.SUM,1).print();
}

}

這個程式碼執行結果如下:
.csdnimg.cn/20210121143204699.png)

根據結果可以看出,經過Aggregate.MAX和Aggregations.MIN和Aggregations.SUM處理的結果key和value是不對應的,比如原有資料value最大的應該是(10.0.0.1,7),但是Aggregate.MAX得出的資料是(83.149.9.216,7)。

然後再看maxBy和minBy的程式和結果:
reduceOperator.minBy(1).print();
reduceOperator.maxBy(1).print();
結果如圖
在這裡插入圖片描述
key和value是對應的;

所以兩種都是求最值,但是內部計算邏輯不一樣:
Min在計算的過程中,會記錄最小值,對於其它的列,會取最後一次出現的,然後和最小值組合形成結果返回;

minBy在計算的過程中,當遇到最小值後,將第一次出現的最小值所在的整個元素返回。