Flink得Aggregate和minBy、maxBy的區別
阿新 • • 發佈:2021-02-02
技術標籤:Transformatiomflink
Flink得Aggregate和minBy、maxBy的區別
直接程式碼結果分析說明
public class AggregateDemo {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource dataSource = env.readTextFile(“data/apache.log”);
MapOperator<String, Tuple2<String, Integer>> map = dataSource.map(new MapFunction<String, Tuple2<String, Integer>>() {
public Tuple2<String, Integer> map(String value) throws Exception {
return Tuple2.of(value.split(" ")[0], 1);
}
});
GroupReduceOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> reduceOperator = map.groupBy(0).reduceGroup(new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() { @Override public void reduce(Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) throws Exception { String key = null; int count = 0; for (Tuple2<String, Integer> value : values) { key = value.f0; count += value.f1; } out.collect(Tuple2.of(key, count)); } }); reduceOperator.print(); System.out.println("--------------------"); reduceOperator.aggregate(Aggregations.MAX,1).print(); reduceOperator.aggregate(Aggregations.MIN,1).print(); reduceOperator.aggregate(Aggregations.SUM,1).print(); }
}
這個程式碼執行結果如下:
根據結果可以看出,經過Aggregate.MAX和Aggregations.MIN和Aggregations.SUM處理的結果key和value是不對應的,比如原有資料value最大的應該是(10.0.0.1,7),但是Aggregate.MAX得出的資料是(83.149.9.216,7)。
然後再看maxBy和minBy的程式和結果:
reduceOperator.minBy(1).print();
reduceOperator.maxBy(1).print();
結果如圖
key和value是對應的;
所以兩種都是求最值,但是內部計算邏輯不一樣:
Min在計算的過程中,會記錄最小值,對於其它的列,會取最後一次出現的,然後和最小值組合形成結果返回;