1. 程式人生 > >講明白combineByKey()運算元,不是談原始碼

講明白combineByKey()運算元,不是談原始碼

簡單介紹

 combineByKey()是最通用的對key-value型rdd進行聚集操作的聚集函式(aggregation function)。類似於aggregate(),combineByKey()允許使用者返回值的型別與輸入不一致。

其定義如下,我們可以根據這個形式來分別定義createCombiner、mergeValue和mergeCombiners三個函式:

def combineByKey[C](
  createCombiner: V => C, ##A
  mergeValue: (C, V) => C, ##B
  mergeCombiners: (C, C) => C,##C 
  partitioner: Partitioner,   
  mapSideCombine: Boolean = true,
  serializer: Serializer = null

)

自定義combineByKey

以實現一個計算平均值的功能為例來分別說明createCombiner、mergeValue和mergeCombiners三個函式的作用和定義方法。

##A createCombiner(value)

createCombiner: V => C ,這個函式把當前rdd中的值(value)作為引數,此時我們可以對其做些附加操作(型別轉換)並把它返回 (這一步類似於初始化操作,分割槽內操作)

def createCombiner(value):

   (value, 1)

##B mergeValue(acc, value)

 mergeValue: (C, V) => C,該函式把元素V合併到之前的元素C(createCombiner)上 (每個分割槽內合併)

def mergeValue(acc, value):
# 注意,這裡的acc即為createCombiner產生的C。
# 這裡,用acc[0]表明為acc這個元組中的第一個元素,在scala中acc._1表示
  (acc[0]+value, acc[1]+1)
###C   mergeCombiners: (acc1, acc2)

 mergeCombiners: (C, C) => C,該函式把2個元素C合併 (此函式作用範圍在rdd的不同分割槽間內,跨分割槽合併

)

def mergeCombiners(acc1, acc2):

# 注意,到這一步,表明這個rdd的每條資料都已經被###A和###B捕獲匹配完畢

   (acc1[0]+acc2[0], acc1[1]+acc2[1])

 

 

案例:

 

 

 

 如圖,有個分割槽,key-value(類別-數量)形式也清楚,我們想知道coffee的平均數量和panda的平均數量。以scala形式寫法如下:

val init_data = Array(("coffee", 1), ("coffee", 2), ("panda", 3), ("coffee", 9))
val data = sc.parallelize(init_data) # 兩個分割槽
type MVType = (Int, Int) //定義一個元組型別
data.combineByKey(
   score => (1, score), # createCombiner函式
   (c: MVType, newScore) => (c._1 + 1, c._2 + newScore), # mergeValue函式
   (c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2) # mergeCombiners函式
).map { case (key, value) => (key, value._2/ value._1) }.map(println(_))

 

 

 

 

 

 

 

 

分析:

Partition 1 trace:
(coffee, 1) => new key
accumulators[coffee] = createCombiner(1)
得到:(coffee, (1, 1))
(coffee, 2) => existing key
accumulators[coffee] = mergeValue(accumulators[coffee], 2)
得到:(coffee, (2, 3))
顯然(panda, 3) => new key,呼叫createCombiner方法。
得到:(panda, (1, 3))

Partition 2 trace:
(coffee, 9) => new key
accumulators[coffee] = createCombiner(9)
得到:(coffee, (1, 9))

接下來,mergeCombiners來合併分割槽:

Merge Partitions
mergeCombiners(partition1.accumulators[coffee], partition2.accumulators[coffee])
得到:(coffee, (3,12))

 

---------------------------------------------細心看 反覆看 不然是假懂--------------------------------