Spark aggregate和combineByKey聚合操作
阿新 • • 發佈:2019-02-12
aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U val pairRDD = listRDD.aggregate(zeroValue)((acc, value) => (acc._1 + value, acc._2 + 1), (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)) combineByKey[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)] val result = pairRDD.combineByKey(v => (v, 1), (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
例:
對於RDD{1,2,3,4},求和並計算個數
Aggregate( (6, 5) )( (acc, value) => (acc._1 + value), (acc._2 + 1), (acc1,acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
zeroValue既是seqOp的acc的初始值,也是combOp的acc1的初始值
acc2的RDD是seqOp的計算結果
1. 執行seqOp
acc = (6, 5) value = 1 => (7, 6)
acc = (7, 6) value = 2 => (9, 7)
acc = (9, 7) value = 3 => (12, 8)
acc = (12, 8) value = 4 => (16, 9)
2. 執行combOp
acc1= (6, 5) acc2=(16, 9) => (22, 14)