Spark中的groupByKey 、aggregateByKey、reduceByKey 的區別
1.reduceByKey vs aggregateByKey
假設你有一系列元組,以使用者ID為key,以使用者在某一時間點採訪的站為value:
val userAccesses = sc.parallelize(Array("u1", "site1"), ("u2", "site1"), ("u1", "site1"), ("u2", "site3"), ("u2", "site4")))
我們要對這個列表進行處理,獲得某個使用者訪問過且去重後的所有站點。因groupByKey運算量較大,可選方案有reduceByKey,aggregateByKey。
reduceByKey程式碼如下:
val mapedUserAccess = userAccesses.map(userSite => (userSite._1, Set(userSite._2)))
val distinctSite = mapedUserAccess.reduceByKey(_++_)
但上述程式碼的問題是,RDD的每個值都將建立一個Set,如果處理一個巨大的RDD,這些物件將大量吞噬記憶體,並且對垃圾回收造成壓力。
如果使用aggregateByKey:
val zeroValue = collecyion.mutable.set[String]() val aggregated = userAccesses.aggregateByKey(zeroValue)((set,v) => set += v, (setOne, setTwo) => setOne ++= setTwo)
為避免reduceByKey記憶體問題,可用aggregateByKey。
aggregateByKey函式的使用,需為它提供以下三個引數:
1.零值(zero):即聚合的初始值
2.函式f:(U, V)
把值V合併到資料結構U, 該函式在分割槽內合併值時使用
3.函式 g:(U, V)
合併兩個資料結構U,在分割槽間合併值時呼叫此函式。
2.原理差別
(1)groupByKey()是對RDD中的所有資料做shuffle,根據不同的Key對映到不同的partition中再進行aggregate。
(2)aggregateByKey()是先對每個partition中的資料根據不同的Key進行aggregate,然後將結果進行shuffle,完成各個partition之間的aggregate。因此,和groupByKey()相比,運算量小了很多。
(3) distinct()也是對RDD中的所有資料做shuffle進行aggregate後再去重。
(4)reduceByKey()也是先在單臺機器中計算,再將結果進行shuffle,減小運算量