1. 程式人生 > >Spark核心RDD:combineByKey函數詳解

Spark核心RDD:combineByKey函數詳解

sta 3.0 vbscript map ner 初始化 partition 得到 new

https://blog.csdn.net/jiangpeng59/article/details/52538254

為什麽單獨講解combineByKey?

因為combineByKey是Spark中一個比較核心的高級函數,其他一些高階鍵值對函數底層都是用它實現的。諸如 groupByKey,reduceByKey等等

如下給出combineByKey的定義,其他的細節暫時忽略(1.6.0版的函數名更新為combineByKeyWithClassTag)

  1. def combineByKey[C](
  2. createCombiner: V => C,
  3. mergeValue: (C, V) => C,
  4. mergeCombiners: (C, C) => C,
  5. partitioner: Partitioner,
  6. mapSideCombine: Boolean = true,
  7. serializer: Serializer = null)

如下解釋下3個重要的函數參數:

  • createCombiner: V => C ,這個函數把當前的值作為參數,此時我們可以對其做些附加操作(類型轉換)並把它返回 (這一步類似於初始化操作)
  • mergeValue: (C, V) => C,該函數把元素V合並到之前的元素C(createCombiner)上 (這個操作在每個分區內進行)
  • mergeCombiners: (C, C) => C,該函數把2個元素C合並 (這個操作在不同分區間進行)
如下看一個使用combineByKey來求解平均數的例子
  1. val initialScores = Array(("Fred", 88.0), ("Fred", 95.0), ("Fred", 91.0), ("Wilma", 93.0), ("Wilma", 95.0), ("Wilma", 98.0))
  2. val d1 = sc.parallelize(initialScores)
  3. type MVType = (Int, Double) //定義一個元組類型(科目計數器,分數)
  4. d1.combineByKey(
  5. score => (1, score),
  6. (c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore),
  7. (c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2)
  8. ).map { case (name, (num, socre)) => (name, socre / num) }.collect
參數含義的解釋
a 、score => (1, score),我們把分數作為參數,並返回了附加的元組類型。 以"Fred"為列,當前其分數為88.0 =>(1,88.0) 1表示當前科目的計數器,此時只有一個科目

b、(c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore),註意這裏的c1就是createCombiner初始化得到的(1,88.0)。在一個分區內,我們又碰到了"Fred"的一個新的分數91.0。當然我們要把之前的科目分數和當前的分數加起來即c1._2 + newScore,然後把科目計算器加1即c1._1 + 1

c、 (c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2),註意"Fred"可能是個學霸,他選修的科目可能過多而分散在不同的分區中。所有的分區都進行mergeValue後,接下來就是對分區間進行合並了,分區間科目數和科目數相加分數和分數相加就得到了總分和總科目數 執行結果如下:
res1: Array[(String, Double)] = Array((Wilma,95.33333333333333), (Fred,91.33333333333333))

例子來源:http://codingjunkie.net/spark-combine-by-key/

Spark核心RDD:combineByKey函數詳解