spark-聚合運算元aggregatebykey
spark-聚合運算元aggregatebykey
Aggregate the values of each key, using given combine functions and a neutral "zero value". This function can return a different result type, U, than the type of the values in this RDD, V. Thus, we need one operation for merging a V into a U and one operation for merging two U's, as in scala.TraversableOnce. The former operation is used for merging values within a partition, and the latter is used for merging values between partitions. To avoid memory allocation, both of these functions are allowed to modify and return their first argument instead of creating a new U.
使用給定的聚合函式和中性的“零值”聚合每個鍵的值。這個函式可以返回與這個RDD V中的值型別不同的結果型別U。
前一個操作用於合併分割槽內的值,而後一個操作用於合併分割槽之間的值。為了避免記憶體分配,允許這兩個函式修改並返回它們的第一個引數,而不是建立一個新的U。
def aggregateByKey[U: ClassTag](zeroValue: U)( seqOp: (U, V) => U, combOp: (U, U) => U ): RDD[(K, U)] = self.withScope { aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp) } def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)( seqOp: (U, V) => U, combOp: (U, U) => U ): RDD[(K, U)] = self.withScope { // Serialize the zero value to a byte array so that we can get a new clone of it on each key val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue) val zeroArray = new Array[Byte](zeroBuffer.limit) zeroBuffer.get(zeroArray) lazy val cachedSerializer = SparkEnv.get.serializer.newInstance() val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray)) // We will clean the combiner closure later in `combineByKey` val cleanedSeqOp = self.context.clean(seqOp) combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v), cleanedSeqOp, combOp, partitioner) }
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope { combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner) }
def combineByKeyWithClassTag[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)]{ ... }
/** * 按key聚合Demo */ object AggregateByKeyDemo { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setAppName("wcDemo") conf.setMaster("local[4]") val sc = new SparkContext(conf) val rdd1 = sc.textFile("file:///e:/wc/1.txt" , 3) val rdd2 = rdd1.flatMap(_.split(" ")).mapPartitionsWithIndex((idx, it) => { var list: List[(String, String)] = Nil for (e <- it) { list = (e, e + "_" + idx) :: list } list.iterator }) rdd2.collect().foreach(println) println("=======================") val zeroU:String = "[]" def seqOp(a:String,b:String) = { a + b + " ," } def comOp(a:String,b:String) = { a + "$" + b } val rdd3 = rdd2.aggregateByKey(zeroU)(seqOp,comOp) rdd3.collect().foreach(println) } }
(hello,hello_0) =>[hello_0]hello_0,hello_0,hello_0, =>[hello_0]hello_0,hello_0,hello_0,$[hello_1]hello_1,hello_1,$[hello_2]hello_2,hello_2, (hello,hello_0) (hello,hello_0) (hello,hello_0) (hello,hello_1) =>[hello_1]hello_1,hello_1, (hello,hello_1) (hello,hello_1) (hello,hello_2) =>[hello_2]hello_2,hello_2, (hello,hello_2) (hello,hello_2) (hello,[]hello_0 ,hello_0 ,hello_0 ,hello_0 ,$[]hello_1 ,hello_1 ,hello_1 ,$[]hello_2 ,hello_2 ,hello_2 ,) (tom2,tom2_0) (world,world_0) (tom1,tom1_0) (world,world_0) (tom7,tom7_1) (world,world_1) (tom6,tom6_1) (world,world_1) (tom5,tom5_1) (world,world_1) (tom10,tom10_2) (world,world_2) (tom9,tom9_2) (world,world_2) (tom8,tom8_2) (world,world_2)
spark PairRDDFunction聚合函式
------------------------------
1.reduceByKey
V型別不變,有map端合成。
2.groupByKey
按照key分組,生成的v是集合,map端不能合成。
3.aggregateByKey
可以改變v的型別,map端還可以合成。
4.combineByKeyWithClassTag
按照key合成,可以指定是否進行map端合成、任意的combiner建立函式,值合併函式以及合成器合併函式。