1. 程式人生 > >Spark中的groupByKey 、aggregateByKey、reduceByKey 的區別

Spark中的groupByKey 、aggregateByKey、reduceByKey 的區別

1.reduceByKey vs aggregateByKey

    假設你有一系列元組,以使用者ID為key,以使用者在某一時間點採訪的站為value:

val userAccesses = sc.parallelize(Array("u1", "site1"), ("u2", "site1"), ("u1", "site1"), ("u2", "site3"), ("u2", "site4")))

    我們要對這個列表進行處理,獲得某個使用者訪問過且去重後的所有站點。因groupByKey運算量較大,可選方案有reduceByKey,aggregateByKey。

    reduceByKey程式碼如下:

val mapedUserAccess = userAccesses.map(userSite => (userSite._1, Set(userSite._2)))
val distinctSite = mapedUserAccess.reduceByKey(_++_)

    但上述程式碼的問題是,RDD的每個值都將建立一個Set,如果處理一個巨大的RDD,這些物件將大量吞噬記憶體,並且對垃圾回收造成壓力。

    如果使用aggregateByKey:

val zeroValue = collecyion.mutable.set[String]()
val aggregated = userAccesses.aggregateByKey(zeroValue)((set,v) => set += v, (setOne, setTwo) => setOne ++= setTwo)

    為避免reduceByKey記憶體問題,可用aggregateByKey。

    aggregateByKey函式的使用,需為它提供以下三個引數:

    1.零值(zero):即聚合的初始值

    2.函式f:(U, V)

    把值V合併到資料結構U, 該函式在分割槽內合併值時使用

    3.函式 g:(U, V)

    合併兩個資料結構U,在分割槽間合併值時呼叫此函式。

2.原理差別

    (1)groupByKey()是對RDD中的所有資料做shuffle,根據不同的Key對映到不同的partition中再進行aggregate。

   (2)aggregateByKey()是先對每個partition中的資料根據不同的Key進行aggregate,然後將結果進行shuffle,完成各個partition之間的aggregate。因此,和groupByKey()相比,運算量小了很多。

      (3)  distinct()也是對RDD中的所有資料做shuffle進行aggregate後再去重。

    (4)reduceByKey()也是先在單臺機器中計算,再將結果進行shuffle,減小運算量