1. 程式人生 > >結合Spark原始碼分析, combineByKey, aggregateByKey, foldByKey, reduceByKey

結合Spark原始碼分析, combineByKey, aggregateByKey, foldByKey, reduceByKey

combineByKey

def combineByKey[C](
  createCombiner: V => C,
  mergeValue: (C, V) => C,
  mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
}
  • createCombiner:當combineByKey第一次遇到值為k的Key時,呼叫createCombiner函式,將v轉換為c

  • mergeValue:combineByKey不是第一次遇到值為k的Key時,呼叫mergeValue函式,將v累加到c中

  • mergeCombiners:將兩個c,合併成一個

// 例項
SparkConf conf = new SparkConf().setAppName("test").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf); 

List<Tuple2<Integer, String>> list = new ArrayList<>();

list
.add(new Tuple2<>(1, "www")); list.add(new Tuple2<>(1, "iteblog")); list.add(new Tuple2<>(1, "com")); list.add(new Tuple2<>(2, "bbs")); list.add(new Tuple2<>(2, "iteblog")); list.add(new Tuple2<>(2, "com")); list.add(new Tuple2<>(3, "good")); JavaPairRDD<
Integer, String> data = sc.parallelizePairs(list); JavaPairRDD<Integer, List<String>> result = data.combineByKey(v -> { ArrayList<String> strings = new ArrayList<>(); strings.add(v); return strings; }, (c, v) -> { c.add(v); return c; }, (c1, c2) -> { c1.addAll(c2); return c1; }); result.collect().forEach(System.out::println);

aggregateByKey

def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
  combOp: (U, U) => U): RDD[(K, U)] = self.withScope {

// 中間程式碼省略,主要看最後一個,呼叫combineByKey

val cleanedSeqOp = self.context.clean(seqOp)
// seqOp,同時是,createCombiner,mergeValue。而combOp是mergeCombiners
combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v),
  cleanedSeqOp, combOp, partitioner)
}
  • createCombiner:cleanedSeqOp(createZero(), v)是createCombiner, 也就是傳入的seqOp函式, 只不過其中一個值是傳入的zeroValue

  • mergeValue:seqOp函式同樣是mergeValue, createCombiner和mergeValue函式相同是aggregateByKey函式的關鍵

  • mergeCombiners:combOp函式

因此, 當createCombiner和mergeValue函式的操作相同, aggregateByKey更為合適

// 例子與combineByKey相同, 只是改用aggregateByKey實現
SparkConf conf = new SparkConf().setAppName("test").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);

List<Tuple2<Integer, String>> list = new ArrayList<>();

list.add(new Tuple2<>(1, "www"));
list.add(new Tuple2<>(1, "iteblog"));
list.add(new Tuple2<>(1, "com"));
list.add(new Tuple2<>(2, "bbs"));
list.add(new Tuple2<>(2, "iteblog"));
list.add(new Tuple2<>(2, "com"));
list.add(new Tuple2<>(3, "good"));

JavaPairRDD<Integer, String> data = sc.parallelizePairs(list);

JavaPairRDD<Integer, List<String>> result = data.aggregateByKey(new ArrayList<String>(), (c, v) -> {
    c.add(v);
    return c;
}, (Function2<List<String>, List<String>, List<String>>) (c1, c2) -> {
    c1.addAll(c2);
    return c1;
});

result.collect().forEach(System.out::println);

foldByKey

def foldByKey(
  zeroValue: V,
  partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = self.withScope {

// 中間程式碼省略,主要看最後一個,呼叫combineByKey

val cleanedFunc = self.context.clean(func)
// 傳入的func函式,同時是,createCombiner,mergeValue,mergeCombiners
// createCombiner函式傳入了零值,首次遇到一個key時,根據零值進行初始化
combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v),
  cleanedFunc, cleanedFunc, partitioner)
}
  • createCombiner:cleanedFunc(createZero(), v)是createCombiner, 也就是func函式, 只不過其中一個值是傳入的zeroValue

  • mergeValue, mergeCombiners:func函式也是mergeValue和 mergeCombiners

當createCombiner,mergeValue和mergeCombiners函式操作都相同, 唯獨需要一個zeroValue時, 適用

// 根據Key把Value相加, 但是不從0開始, 設定初始值為100
val conf = new SparkConf().setAppName("test").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

var rdd = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))

rdd.foldByKey(100)(_+_).collect.foreach(println)

reduceByKey

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
  combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}
  • createCombiner:與foldByKey相比, reduceByKey沒有初始值, createCombiner也沒有呼叫func函式, 而是直接將引數作為返回值返回了,

  • mergeValue, mergeCombiners:func函式同時是mergeValue和 mergeCombiners

當不需要createCombiner,且mergeValue和mergeCombiners函式操作都相同時, 適用

val conf = new SparkConf().setAppName("test").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

var rdd = sc.makeRDD(Array(("A", 0), ("A", 2), ("B", 1), ("B", 2), ("C", 1)))

rdd.reduceByKey(_ + _).collect.foreach(println)

總結

這幾個運算元, 核心就要弄明白combineByKey, 其他三個都是呼叫它. 上文主要也是從combingByKey傳入的三個函式的角度在分析.

而在實際運用中, 最先要考慮的應該是型別. combingByKey和aggregateByKey輸入跟輸出的型別可以不一致, 而foldByKey和reduceByKey不行. 型別確定後再根據自己的業務選擇最簡潔的運算元.