1. 程式人生 > >Spark core運算元aggregateByKey例項

Spark core運算元aggregateByKey例項

groupbykey、reducebykey以及aggregateByKey

groupbykey是全域性聚合運算元,將所有map task中的資料都拉取到shuffle中將key相同的資料進行聚合,它存在很多弊端,例如:將大量的資料進行網路傳輸,浪費大量的資源,最重要的是如果資料量太大還會出現GC和OutOfMemoryError的錯誤,如果資料某個key的資料量遠大於其他key的資料,在進行全域性聚合的時候還會出現資料傾斜的問題。

所以在實際的專案中要謹慎使用groupbykey,隨著資料量的增加,groupbykey暴露出來的問題會越來越多。

reducebykey是在map階段進行本地聚合以後才會到shuffle中進行全域性聚合,相當於是進入shuffle之前已經做了一部分聚合,那麼它的網路傳輸速度會比groupbykey快很多而且佔用資源也會減少很多,但是運算元本身就如它的名字一樣,主要是進行計算的將相同key的資料進行計算,返回計算結果。

但是實際的專案中在進行聚合之後我們不一定只是要計算,還會找聚合後某個欄位的最大值,最小值等等操作,groupbykey聚合後返回的是(K,Iterable[V]),我們可以把iterable[V]這個集合的資料進行二次處理來實現我們實際的專案需求,但是上面已經提到了groupbykey的諸多問題,reducebykey也是隻有在單純的對資料進行計算的時候才能和groupbykey有等價效果。既想像reducebykey那樣進行本地聚合,又想像groupbykey那樣返回一個集合便於我們操作。

說了這麼多也就引出來了我們今天的主題aggregatebykey。

aggregatebykey和reducebykey一樣首先在本地聚合,然後再在全域性聚合。它的返回值也是由我們自己設定的。

aggregatebykey使用需要提供三個引數:

zeroValue: U 這個引數就會決定最後的返回型別
seqOp: (U, V) => U 將V(資料)放入U中進行本地聚合
combOp: (U, U) => U 將不同的U進行全域性聚合

可能說的大家有點暈,那麼我介紹一個工作中遇到的需求以及使用aggregatebykey來解決問題的例項:

首先看資料的結構:

List(
      ("84.174.205.5",("2018-11-10 23:23:23",2)),
      ("221.226.113.146",("2018-09-11 23:23:23",3)),
      ("84.174.205.5",("2018-12-15 23:23:23",5)),
      ("108.198.168.20",("2018-01-03 23:23:23",2)),
      ("108.198.168.20",("2018-11-21 23:23:23",4)),
      ("221.226.113.146",("2018-11-01 23:23:23",6)),
        ("221.226.113.146",("2018-12-06 23:23:23",6))
      )

key為IP V為日期以及訪問的次數。

現在的需求:

1、將ip的訪問次數進行累加操作

2、聚合後只保留最早的時間

廢話不多說上程式碼:

val spark = SparkSession.builder().appName(test.getClass.getName).master("local").getOrCreate()
//準備資料
    val pairRdd: RDD[(String, (String, Int))] = spark.sparkContext.parallelize(
      List(
      ("84.174.205.5",("2018-11-10 23:23:23",2)),
      ("221.226.113.146",("2018-09-11 23:23:23",3)),
      ("84.174.205.5",("2018-12-15 23:23:23",5)),
      ("108.198.168.20",("2018-01-03 23:23:23",2)),
      ("108.198.168.20",("2018-11-21 23:23:23",4)),
      ("221.226.113.146",("2018-11-01 23:23:23",6)),
        ("221.226.113.146",("2018-12-06 23:23:23",6))
      ),2)
//運用aggregatebykey
//1、U定義為ArrayBuffer
    val juhe = pairRdd.aggregateByKey(scala.collection.mutable.ArrayBuffer[(String, Int)]())((arr,value)=>{
//2、將value放入集合U中
      arr += value
//3、將所有的集合進行合併
    },_.union(_))
    val juhesum = juhe.mapPartitions(partition=>{
      partition.map(m=>{
        val key = m._1
        val date = m._2.map(m=>m._1).toList.sortWith(_<_)(0)
        val sum = m._2.map(m=>m._2).sum
        Row(key,date,sum)
      })
    })
    juhesum.foreach(println(_))

最後的輸出結果:

[108.198.168.20,2018-01-03 23:23:23,6]

[84.174.205.5,2018-11-10 23:23:23,7]

[221.226.113.146,2018-09-11 23:23:23,15]

到此實現需求。

希望本文會對大家有幫助,歡迎大家評論交流,謝謝!