1. 程式人生 > 其它 >第五章_Spark核心程式設計_Rdd_轉換運算元_keyValue型_foldByKey

第五章_Spark核心程式設計_Rdd_轉換運算元_keyValue型_foldByKey

1. 定義

  /*
  * 1. 定義
  *    def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
  *    def foldByKey(zeroValue: V,partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]
  *    def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]
  *
  * 2. 功能
  *    1. 分割槽內 根據相同的key,對value分組
  *        示例 :
  *           key,iter(value1,value2,value3)
  *    2. 根據傳入的規則 reduce分割槽內的 相同key的 value
  *        示例 :
  *           func(zeroValue,value1)
  *    3. 聚合後輸出每個分割槽的結果 key,value
  *
  *    4. 拉取每個分割槽 的key,value ,並對相同key 的value做reduce操作(存在Shuffle過程)
  *          示例 :  func(zeroValue,value1)...
  *
  *    5. 對 所有分割槽的key 做完reduce操作後,按照指定的 partitioner 重新對結果分割槽
  *          不指定分割槽器時,用預設分割槽器 HashPartitoner
  *          不指定分割槽個數時,用父Rdd分割槽個數
  *
  * 3. note
  *    1. zeroValue 會參與 分割槽內和分割槽間的 reduce操作
  *    2. foldByKey 等價於 aggregateByKey(分割槽內函式=分割槽間函式)
  *
  * 
*/

2. 示例

  object aggregateTest extends App {

    val sparkconf: SparkConf = new SparkConf().setMaster("local").setAppName("distinctTest")

    val sc: SparkContext = new SparkContext(sparkconf)

    val rdd: RDD[(Int, String)] = sc.makeRDD(List((2, "x1"), (2, "x2"), (2, "x3"), (4, "x4"), (5, "x5"), (5, "x6"), (6, "x7")), 2)

    
private val rdd2 = rdd.foldByKey("")( (zeroValue, value) => zeroValue + "-" + value ) println(s"${rdd2.collect().mkString(",")}") sc.stop() }