1. 程式人生 > 其它 >第五章_Spark核心程式設計_Rdd_轉換運算元_keyValue型_combineByKey

第五章_Spark核心程式設計_Rdd_轉換運算元_keyValue型_combineByKey

1. 定義

  /*
  * 1. 定義
  *   def combineByKey[C](createCombiner: V => C,
  *                       mergeValue: (C, V) => C,
  *                       mergeCombiners: (C, C) => C,
  *                       numPartitions: Int): RDD[(K, C)]
  *
  *   def combineByKey[C](createCombiner: V => C,
  *                       mergeValue: (C, V) => C,
  *                       mergeCombiners: (C, C) => C,
  *                       partitioner: Partitioner,
  *                       serializer: Serializer = null): RDD[(K, C)]
  *
  *   def combineByKey[C](createCombiner: V => C,
  *                       mergeValue: (C, V) => C,
  *                       mergeCombiners: (C, C) => C): RDD[(K, C)]
  *
  *   createCombiner : 將相同key的第一個value,進行資料型別轉換
  *   mergeValue : 分割槽內 相同key 聚合規則
  *   mergeCombiners : 分割槽間 相同key 聚合規則
  *
  * 2. 功能
  *     1. 對 key-value型rdd 按相同的key 對value進行聚合
  *     2. 可以將 Rdd[(K,V)]  轉換為 Rdd[(K,C)]
  *
  * 3. 操作流程
  *     1. 分割槽內 對相同的key 進行分組
  *         key iter(value1,value2,value3)
  *
  *     2. 分割槽內 對相同key 的value進行聚合
  *         1. 將 value1 轉換為 createCombiner(value1)=資料型別C
  *         2. 按照指定的規則對 value進行聚合
  *            mergeValue(createCombiner(value1),value2) => 資料型別C
  *            note : createCombiner(value1),value2型別可能不一致
  *         3. 合併完結果為
  *            key,資料型別C
  *
  *     3. 分割槽間 對相同key 的value進行聚合
  *         1.按照指定的規則對 value進行聚合
  *            mergeCombiners: (資料型別C, 資料型別C) => 資料型別C
  *
  * 
*/

2. 示例

  object combineByKeyPakTest extends App {

    val sparkconf: SparkConf = new SparkConf().setMaster("local").setAppName("distinctTest")

    val sc: SparkContext = new SparkContext(sparkconf)

    val rdd: RDD[(Int, String)] = sc.makeRDD(List((2, "x1"), (2, "x2"), (2, "x3"), (4, "x4"), (5, "x5"), (5, "x6"), (6, "x7")), 2)

    
private val rdd2 = rdd.combineByKey( //轉換 value型別為List (tp) => { println(s"轉換:${tp}") List[String](tp) }, (list: List[String], value1: String) => { list :+ value1 } , (list1: List[String], list2: List[String]) => { list1.union(list2) } ) println(s
"${rdd2.collect().mkString(",")}") sc.stop() }