1. 程式人生 > >spark-聚合運算元aggregatebykey

spark-聚合運算元aggregatebykey

spark-聚合運算元aggregatebykey

Aggregate the values of each key, using given combine functions and a neutral "zero value". This function can return a different result type, U, than the type of the values in this RDD, V. Thus, we need one operation for merging a V into a U and one operation for merging two U's, as in scala.TraversableOnce. The former operation is used for merging values within a partition, and the latter is used for merging values between partitions. To avoid memory allocation, both of these functions are allowed to modify and return their first argument instead of creating a new U.

使用給定的聚合函式和中性的“零值”聚合每個鍵的值。這個函式可以返回與這個RDD V中的值型別不同的結果型別U。

前一個操作用於合併分割槽內的值,而後一個操作用於合併分割槽之間的值。為了避免記憶體分配,允許這兩個函式修改並返回它們的第一個引數,而不是建立一個新的U。

 

  def aggregateByKey[U: ClassTag](zeroValue: U)(
        seqOp: (U, V) => U,
        combOp: (U, U) => U
        ): RDD[(K, U)] = self.withScope {
    aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp)
  }

  def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(
        seqOp: (U, V) => U,
        combOp: (U, U) => U
        ): RDD[(K, U)] = self.withScope {

    // Serialize the zero value to a byte array so that we can get a new clone of it on each key
    val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
    val zeroArray = new Array[Byte](zeroBuffer.limit)
    zeroBuffer.get(zeroArray)

    lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
    val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))

    // We will clean the combiner closure later in `combineByKey`
    val cleanedSeqOp = self.context.clean(seqOp)
    combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v),
      cleanedSeqOp, combOp, partitioner)
  }

  

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {

  combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}

  

def combineByKeyWithClassTag[C](
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C,
    partitioner: Partitioner,
    mapSideCombine: Boolean = true,
    serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)]{
	...
  }

  

/**
  * 按key聚合Demo
  */
object AggregateByKeyDemo {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setAppName("wcDemo")
        conf.setMaster("local[4]")
        val sc = new SparkContext(conf)
        val rdd1 = sc.textFile("file:///e:/wc/1.txt" , 3)
        val rdd2 = rdd1.flatMap(_.split(" ")).mapPartitionsWithIndex((idx, it) => {
            var list: List[(String, String)] = Nil
            for (e <- it) {
                list = (e, e + "_" + idx) :: list
            }
            list.iterator
        })
        rdd2.collect().foreach(println)
        println("=======================")
        val zeroU:String = "[]"
        def seqOp(a:String,b:String) = {
            a + b + " ,"
        }
        def comOp(a:String,b:String) = {
            a + "$" + b
        }

        val rdd3 = rdd2.aggregateByKey(zeroU)(seqOp,comOp)
        rdd3.collect().foreach(println)
      
    }

}

 

	  
(hello,hello_0)		=>[hello_0]hello_0,hello_0,hello_0,		=>[hello_0]hello_0,hello_0,hello_0,$[hello_1]hello_1,hello_1,$[hello_2]hello_2,hello_2,
(hello,hello_0)
(hello,hello_0)
(hello,hello_0)

(hello,hello_1)		=>[hello_1]hello_1,hello_1,
(hello,hello_1)
(hello,hello_1)

(hello,hello_2)		=>[hello_2]hello_2,hello_2,
(hello,hello_2)
(hello,hello_2)


(hello,[]hello_0 ,hello_0 ,hello_0 ,hello_0 ,$[]hello_1 ,hello_1 ,hello_1 ,$[]hello_2 ,hello_2 ,hello_2 ,)

(tom2,tom2_0)
(world,world_0)
(tom1,tom1_0)
(world,world_0)
(tom7,tom7_1)
(world,world_1)
(tom6,tom6_1)
(world,world_1)
(tom5,tom5_1)
(world,world_1)
(tom10,tom10_2)
(world,world_2)
(tom9,tom9_2)
(world,world_2)
(tom8,tom8_2)
(world,world_2)

 

spark PairRDDFunction聚合函式
------------------------------
1.reduceByKey
V型別不變,有map端合成。
2.groupByKey
按照key分組,生成的v是集合,map端不能合成。
3.aggregateByKey
可以改變v的型別,map端還可以合成。
4.combineByKeyWithClassTag
按照key合成,可以指定是否進行map端合成、任意的combiner建立函式,值合併函式以及合成器合併函式。