講明白combineByKey()運算元,不是談原始碼
簡單介紹
combineByKey()是最通用的對key-value型rdd進行聚集操作的聚集函式(aggregation function)。類似於aggregate(),combineByKey()允許使用者返回值的型別與輸入不一致。
其定義如下,我們可以根據這個形式來分別定義createCombiner、mergeValue和mergeCombiners三個函式:
def combineByKey[C](
createCombiner: V => C, ##A
mergeValue: (C, V) => C, ##B
mergeCombiners: (C, C) => C,##C
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null
)
自定義combineByKey
以實現一個計算平均值的功能為例來分別說明createCombiner、mergeValue和mergeCombiners三個函式的作用和定義方法。
##A createCombiner(value)
createCombiner: V => C ,這個函式把當前rdd中的值(value)作為引數,此時我們可以對其做些附加操作(型別轉換)並把它返回 (這一步類似於初始化操作,分割槽內操作)
def createCombiner(value):
(value, 1)
##B mergeValue(acc, value)
mergeValue: (C, V) => C,該函式把元素V合併到之前的元素C(createCombiner)上 (每個分割槽內合併)
def mergeValue(acc, value):
# 注意,這裡的acc即為createCombiner產生的C。
# 這裡,用acc[0]表明為acc這個元組中的第一個元素,在scala中acc._1表示
(acc[0]+value, acc[1]+1)
###C mergeCombiners: (acc1, acc2)
mergeCombiners: (C, C) => C,該函式把2個元素C合併 (此函式作用範圍在rdd的不同分割槽間內,跨分割槽合併
def mergeCombiners(acc1, acc2):
# 注意,到這一步,表明這個rdd的每條資料都已經被###A和###B捕獲匹配完畢
(acc1[0]+acc2[0], acc1[1]+acc2[1])
案例:
如圖,有兩個分割槽,key-value(類別-數量)形式也清楚,我們想知道coffee的平均數量和panda的平均數量。以scala形式寫法如下:
val init_data = Array(("coffee", 1), ("coffee", 2), ("panda", 3), ("coffee", 9)) |
分析:
Partition 1 trace:
(coffee, 1) => new key
accumulators[coffee] = createCombiner(1)
得到:(coffee, (1, 1))
(coffee, 2) => existing key
accumulators[coffee] = mergeValue(accumulators[coffee], 2)
得到:(coffee, (2, 3))
顯然(panda, 3) => new key,呼叫createCombiner方法。
得到:(panda, (1, 3))
Partition 2 trace:
(coffee, 9) => new key
accumulators[coffee] = createCombiner(9)
得到:(coffee, (1, 9))
接下來,mergeCombiners來合併分割槽:
Merge Partitions:
mergeCombiners(partition1.accumulators[coffee], partition2.accumulators[coffee])
得到:(coffee, (3,12))
---------------------------------------------細心看 反覆看 不然是假懂--------------------------------