1. 程式人生 > 其它 >第五章_Spark核心程式設計_Rdd_轉換運算元_keyValue型_aggregateByKey

第五章_Spark核心程式設計_Rdd_轉換運算元_keyValue型_aggregateByKey

1. 定義

  /*
  * 1. 定義
  *     def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)
  *     (seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)]
  *
  *     def aggregateByKey[U: ClassTag](zeroValue: U)
  *     (seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)]
  *
  *     def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)
  *     (seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)]
  *
  *
  * 2.功能
  *    將資料根據不同的規則進行 分割槽內計算 和 分割槽間計算
  *    操作流程
* 1. 分割槽內 對相同的key 分組 * 示例 : key iter(value1,value2,value3) * * 2. 根據出入的規則 seqOp: (U, V) => U 對分割槽內相同的key 做聚合操作 * 示例 : seqOp(zeroValue,value1)... * * 3. 聚合後輸出每個分割槽的結果 key,value * * 4. 拉取每個分割槽 的key,value ,並對相同key 的value做reduce操作(存在Shuffle過程) * 示例 : combOp(zeroValue,value1)... * * 5. 對 所有分割槽的key 做完reduce操作後,按照指定的 partitioner 重新對結果分割槽 * 不指定分割槽器時,用預設分割槽器 HashPartitoner * 不指定分割槽個數時,用父Rdd分割槽個數 * * 3.note * zeroValue 會參與 分割槽內和分割槽間的 reduce操作 * *
*/

2. 示例

  object aggregateTest extends App {

    val sparkconf: SparkConf = new SparkConf().setMaster("local").setAppName("distinctTest")

    val sc: SparkContext = new SparkContext(sparkconf)

    val rdd: RDD[(Int, String)] = sc.makeRDD(List((2, "x1"), (2, "x2"), (2, "x3"), (4, "x4"), (5, "x5"), (5, "x6"), (6, "x7")), 2)

    
private val rdd2 = rdd.aggregateByKey("")( //分割槽內計算規則 (從左往右計算) //對分割槽內 相同key 的value 做reduce操作 (zeroValue: String, value1: String) => { println(s"key:${zeroValue} value:${value1}") zeroValue + "-" + value1 } //拉取各個分割槽 key-value,對相同key 的value 做reduce操作 , (zeroValue, par_value) => zeroValue + par_value ) println(s"${rdd2.collect().mkString(",")}") sc.stop() }