結合Spark原始碼分析, combineByKey, aggregateByKey, foldByKey, reduceByKey
combineByKey
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
}
createCombiner:當combineByKey第一次遇到值為k的Key時,呼叫createCombiner函式,將v轉換為c
mergeValue:combineByKey不是第一次遇到值為k的Key時,呼叫mergeValue函式,將v累加到c中
mergeCombiners:將兩個c,合併成一個
// 例項
SparkConf conf = new SparkConf().setAppName("test").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Tuple2<Integer, String>> list = new ArrayList<>();
list .add(new Tuple2<>(1, "www"));
list.add(new Tuple2<>(1, "iteblog"));
list.add(new Tuple2<>(1, "com"));
list.add(new Tuple2<>(2, "bbs"));
list.add(new Tuple2<>(2, "iteblog"));
list.add(new Tuple2<>(2, "com"));
list.add(new Tuple2<>(3, "good"));
JavaPairRDD< Integer, String> data = sc.parallelizePairs(list);
JavaPairRDD<Integer, List<String>> result = data.combineByKey(v -> {
ArrayList<String> strings = new ArrayList<>();
strings.add(v);
return strings;
}, (c, v) -> {
c.add(v);
return c;
}, (c1, c2) -> {
c1.addAll(c2);
return c1;
});
result.collect().forEach(System.out::println);
aggregateByKey
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
// 中間程式碼省略,主要看最後一個,呼叫combineByKey
val cleanedSeqOp = self.context.clean(seqOp)
// seqOp,同時是,createCombiner,mergeValue。而combOp是mergeCombiners
combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v),
cleanedSeqOp, combOp, partitioner)
}
createCombiner:cleanedSeqOp(createZero(), v)是createCombiner, 也就是傳入的seqOp函式, 只不過其中一個值是傳入的zeroValue
mergeValue:seqOp函式同樣是mergeValue, createCombiner和mergeValue函式相同是aggregateByKey函式的關鍵
mergeCombiners:combOp函式
因此, 當createCombiner和mergeValue函式的操作相同, aggregateByKey更為合適
// 例子與combineByKey相同, 只是改用aggregateByKey實現
SparkConf conf = new SparkConf().setAppName("test").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Tuple2<Integer, String>> list = new ArrayList<>();
list.add(new Tuple2<>(1, "www"));
list.add(new Tuple2<>(1, "iteblog"));
list.add(new Tuple2<>(1, "com"));
list.add(new Tuple2<>(2, "bbs"));
list.add(new Tuple2<>(2, "iteblog"));
list.add(new Tuple2<>(2, "com"));
list.add(new Tuple2<>(3, "good"));
JavaPairRDD<Integer, String> data = sc.parallelizePairs(list);
JavaPairRDD<Integer, List<String>> result = data.aggregateByKey(new ArrayList<String>(), (c, v) -> {
c.add(v);
return c;
}, (Function2<List<String>, List<String>, List<String>>) (c1, c2) -> {
c1.addAll(c2);
return c1;
});
result.collect().forEach(System.out::println);
foldByKey
def foldByKey(
zeroValue: V,
partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
// 中間程式碼省略,主要看最後一個,呼叫combineByKey
val cleanedFunc = self.context.clean(func)
// 傳入的func函式,同時是,createCombiner,mergeValue,mergeCombiners
// createCombiner函式傳入了零值,首次遇到一個key時,根據零值進行初始化
combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v),
cleanedFunc, cleanedFunc, partitioner)
}
createCombiner:cleanedFunc(createZero(), v)是createCombiner, 也就是func函式, 只不過其中一個值是傳入的zeroValue
mergeValue, mergeCombiners:func函式也是mergeValue和 mergeCombiners
當createCombiner,mergeValue和mergeCombiners函式操作都相同, 唯獨需要一個zeroValue時, 適用
// 根據Key把Value相加, 但是不從0開始, 設定初始值為100
val conf = new SparkConf().setAppName("test").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
var rdd = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
rdd.foldByKey(100)(_+_).collect.foreach(println)
reduceByKey
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}
createCombiner:與foldByKey相比, reduceByKey沒有初始值, createCombiner也沒有呼叫func函式, 而是直接將引數作為返回值返回了,
mergeValue, mergeCombiners:func函式同時是mergeValue和 mergeCombiners
當不需要createCombiner,且mergeValue和mergeCombiners函式操作都相同時, 適用
val conf = new SparkConf().setAppName("test").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
var rdd = sc.makeRDD(Array(("A", 0), ("A", 2), ("B", 1), ("B", 2), ("C", 1)))
rdd.reduceByKey(_ + _).collect.foreach(println)
總結
這幾個運算元, 核心就要弄明白combineByKey, 其他三個都是呼叫它. 上文主要也是從combingByKey傳入的三個函式的角度在分析.
而在實際運用中, 最先要考慮的應該是型別. combingByKey和aggregateByKey輸入跟輸出的型別可以不一致, 而foldByKey和reduceByKey不行. 型別確定後再根據自己的業務選擇最簡潔的運算元.