第五章_Spark核心程式設計_Rdd_轉換運算元_keyValue型_combineByKey
阿新 • • 發佈:2022-03-25
1. 定義
/* * 1. 定義 * def combineByKey[C](createCombiner: V => C, * mergeValue: (C, V) => C, * mergeCombiners: (C, C) => C, * numPartitions: Int): RDD[(K, C)] * * def combineByKey[C](createCombiner: V => C, * mergeValue: (C, V) => C, * mergeCombiners: (C, C) => C, * partitioner: Partitioner, * serializer: Serializer = null): RDD[(K, C)] * * def combineByKey[C](createCombiner: V => C, * mergeValue: (C, V) => C, * mergeCombiners: (C, C) => C): RDD[(K, C)] * * createCombiner : 將相同key的第一個value,進行資料型別轉換 * mergeValue : 分割槽內 相同key 聚合規則 * mergeCombiners : 分割槽間 相同key 聚合規則 * * 2. 功能 * 1. 對 key-value型rdd 按相同的key 對value進行聚合 * 2. 可以將 Rdd[(K,V)] 轉換為 Rdd[(K,C)] * * 3. 操作流程 * 1. 分割槽內 對相同的key 進行分組 * key iter(value1,value2,value3) * * 2. 分割槽內 對相同key 的value進行聚合 * 1. 將 value1 轉換為 createCombiner(value1)=資料型別C * 2. 按照指定的規則對 value進行聚合 * mergeValue(createCombiner(value1),value2) => 資料型別C * note : createCombiner(value1),value2型別可能不一致 * 3. 合併完結果為 * key,資料型別C * * 3. 分割槽間 對相同key 的value進行聚合 * 1.按照指定的規則對 value進行聚合 * mergeCombiners: (資料型別C, 資料型別C) => 資料型別C * **/
2. 示例
object combineByKeyPakTest extends App { val sparkconf: SparkConf = new SparkConf().setMaster("local").setAppName("distinctTest") val sc: SparkContext = new SparkContext(sparkconf) val rdd: RDD[(Int, String)] = sc.makeRDD(List((2, "x1"), (2, "x2"), (2, "x3"), (4, "x4"), (5, "x5"), (5, "x6"), (6, "x7")), 2)private val rdd2 = rdd.combineByKey( //轉換 value型別為List (tp) => { println(s"轉換:${tp}") List[String](tp) }, (list: List[String], value1: String) => { list :+ value1 } , (list1: List[String], list2: List[String]) => { list1.union(list2) } ) println(s"${rdd2.collect().mkString(",")}") sc.stop() }