RDD運算元原始碼《二》基於combineByKey()的運算元
鍵值對的actions運算元
combineByKey()
//引數: //建立聚合器,如果K已經建立,則調mergeValue,沒建立,則建立,將V生成一個新值 //作用於分割槽內的資料,將相同K對應的V聚合 //作用於各分割槽,將各分割槽K相同的V聚合 def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) : RDD[(K, C)] = { combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self)) }
//numSplits分割槽數,聚合完成後生成的RDD有幾個分割槽
def combineByKey[C](createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
numSplits: Int): RDD[(K, C)] = {
combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numSplits))
}
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner): RDD[(K, C)] = { val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners) new ShuffledRDD(self, aggregator, partitioner) }
可以看到combineByKey()有三個過載的方法,最終都會呼叫第三個然後建立一個ShuffledRDD物件,
Partitioner預設按HashPartitioner分割槽,有關分割槽的原始碼可看Spark原始碼《一》RDD,
mergeValue與mergeCombiners兩個方法與aggregate傳的兩個方法挺類似的,一個是處理某個分割槽內資料,一個處理所有分割槽的結果資料,有關aggregate可看RDD運算元原始碼《一》返回一個結果的actions運算元,spark的聚合運算元大都呼叫combineByKey()運算元,所以我們先去看別的運算元createCombiner,mergeValue,mergeCombiners如何定義,更方便理解。
reduceByKey()
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {
reduceByKey(defaultPartitioner(self), func)
}
def reduceByKey(func: (V, V) => V, numSplits: Int): RDD[(K, V)] = {
//新RDD分割槽數將改變
reduceByKey(new HashPartitioner(numSplits), func)
}
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
combineByKey[V]((v: V) => v, func, func, partitioner)
}
可以看到reduceByKey()有3個過載方法,一個傳方法,一個傳方法+新RDD分割槽數,一個傳Partitioner+方法,最終呼叫
combineByKey(),mergeValue,mergeCombiners都為傳入的方法。
val w=sc.parallelize(List(("tom",1),("tom",3),("jerry",1),("jerry",7),("tom",5)),3)
val s=w.reduceByKey(_*_)//將key相同的value相乘
println(s.partitions.length)//列印新RDD分割槽數
s.foreach(println)//列印新RDD元素
列印結果為:3 (jerry,7)(tom,15)
計算過程:假設3個分割槽資料為:
分割槽1 ("tom",1),("tom",3) 給tom建立聚合器(tom,1),因tom已經建立過,直接(tom,1*3) 該分割槽結果為(tom,3)
分割槽2 ("jerry",1),("jerry",7) 同上,該分割槽結果為(jerry,7)
分割槽3 ("tom",5) 該分割槽結果為(tom,5)
最後再聚合各分割槽結果,(tom,3),(jerry,7),(tom,5) 最終結果為:(tom,3*5),(jerry,7)
接下來我們用combineByKey()實現一下:
val w=sc.parallelize(List(("tom",1),("tom",3),("jerry",1),("jerry",7),("tom",5)),3)
w.combineByKey(
x => (1, x),
(c1: (Int,Int), y) => (c1._1 + 1, c1._2*y),//分割槽內資料計算方式
(c1: (Int,Int), c2: (Int,Int)) => (c1._1 + c2._1, c1._2 * c2._2)//各分割槽結果之間計算方式
).foreach(println)
列印結果為:(jerry,(2,7))(tom,(3,15))
combineByKey()與reduceByKey()的區別類似 fold()與aggregate()的區別,fold()與reduceByKey()是傳一個方法,不僅作用於分割槽內資料,也作用於分割槽間結果資料,而aggregate()與combineByKey()是傳兩個方法,一個作用於分割槽內資料,另一個作用於分割槽間結果資料。
groupBy()&&groupByKey()
groupBy()
def groupBy[K: ClassManifest](f: T => K): RDD[(K, Seq[T])] = groupBy[K](f, sc.defaultParallelism)
//引數為,方法+新RDD分割槽數
def groupBy[K: ClassManifest](f: T => K, numSplits: Int): RDD[(K, Seq[T])] = {
val cleanF = sc.clean(f)
this.map(t => (cleanF(t), t)).groupByKey(numSplits)
}
可以看到groupBy()是每個元素執行完方法後的結果作為key,元素作為value,再呼叫groupByKey()方法
val w=sc.parallelize(0 to 9,3)
w.groupBy(x=>x%3).foreach(println)
列印結果為:(0,CompactBuffer(0, 3, 6, 9)) (1,CompactBuffer(1, 4, 7)) (2,CompactBuffer(2, 5, 8))
元素除3餘數相同的會放在同一個集合。
groupByKey()
def groupByKey(): RDD[(K, Seq[V])] = {
groupByKey(defaultPartitioner(self))
}
def groupByKey(numSplits: Int): RDD[(K, Seq[V])] = {
groupByKey(new HashPartitioner(numSplits))
}
def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = {
def createCombiner(v: V) = ArrayBuffer(v),
def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v//放入變長陣列中
def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2//拼接兩個陣列
val bufs = combineByKey[ArrayBuffer[V]](
createCombiner _, mergeValue _, mergeCombiners _, partitioner)
bufs.asInstanceOf[RDD[(K, Seq[V])]]
}
可以看到最終呼叫的是combineByKey()方法,每個分割槽先將Key相同的value放入陣列,最後再將各分割槽key相同的合併一個數組。
val w=sc.parallelize(List(("tom",1),("tom",3),("jerry",1),("jerry",7),("tom",5)),2)
w.groupByKey().foreach(println)
println(w.groupByKey(5).partitions.length)//改變分割槽數
列印結果為:(tom,CompactBuffer(1, 3, 5)) (jerry,CompactBuffer(1, 7)) 5(分割槽數)
基於combineByKey()的運算元到此為止,現在回過頭看combineByKey()
def combineByKey[C](createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner): RDD[(K, C)] = {
val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
new ShuffledRDD(self, aggregator, partitioner)
}
建立了Aggregator物件,然後建立了ShuffledRDD物件。
ShuffledRDD類
class ShuffledRDD[K, V, C](
parent: RDD[(K, V)],
aggregator: Aggregator[K, V, C],
part : Partitioner)
extends RDD[(K, C)](parent.context) {
//override val partitioner = Some(part)
override val partitioner = Some(part)
@transient
val splits_ = Array.tabulate[Split](part.numPartitions)(i => new ShuffledRDDSplit(i))
//長度為分割槽數的陣列,儲存各分割槽
override def splits = splits_
override def preferredLocations(split: Split) = Nil
//建立shuffle依賴,註冊shuffleId
val dep = new ShuffleDependency(context.newShuffleId, parent, aggregator, part)
override val dependencies = List(dep)
override def compute(split: Split): Iterator[(K, C)] = {
val combiners = new JHashMap[K, C]//儲存key,聚合值的map
def mergePair(k: K, c: C) {
val oldC = combiners.get(k)
if (oldC == null) {//判斷key之前是否遍歷過
combiners.put(k, c)//沒的話,將k,v放入map中
} else {
//否則,將k對應的v,與之前的v的聚合值聚合
combiners.put(k, aggregator.mergeCombiners(oldC, c))
}
}
val fetcher = SparkEnv.get.shuffleFetcher//取資料
fetcher.fetch[K, C](dep.shuffleId, split.index, mergePair)
return new Iterator[(K, C)] {
var iter = combiners.entrySet().iterator()
def hasNext(): Boolean = iter.hasNext()
def next(): (K, C) = {
val entry = iter.next()
(entry.getKey, entry.getValue)
}
}
}
}