1. 程式人生 > >Spark 按key聚合求平均值與佔比

Spark 按key聚合求平均值與佔比

1.求key的平均值

k,v結構的資料中,求每個key對應的平均值,在spark中怎麼應該怎麼求?
例如有如下的資料:

("a",10)
("b",4)
("a",10)
("b",20)

想求a,b對應的平均值。
直接上程式碼

sc.parallelize(List(("a",10),("b",4),("a",10),("b",20))).mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)).mapValues(x => (x._1, x._2, x._1.toDouble / x._2.toDouble))
.collect()

在spark-shell中執行上述的程式碼以後,輸出如下:

Array[(String, (Int, Int, Double))] = Array((a,(20,2,10.0)), (b,(24,2,12.0)))

簡單分析一下上面的程式碼邏輯:
mapValuesPairRDDFunctions中的方法,顧名思義,是對kv結構的rdd的value進行map的操作。
然後進行reduceByKey操作,此時將value中的值累加,對應出現的次數也累加。
最後再呼叫mapValues方法,求每個key的平均值即可。

2.求key對應的value值的佔比

同樣是上面的資料,我們想求a,b對應的value值分別佔比是多少,該怎麼計算?

val array = sc.parallelize(List(("a",10),("b",4),("a",10),("b",20))).reduceByKey(_ + _).collect()
val sum = array.foldLeft(0)({ (z, f) => z + f._2 })
array.map(x =>  println("%s\t%s\t%s".format(x._1, x._2, x._2.toDouble / sum)))

在spark-shell中執行上述的程式碼以後,輸出如下:

a   20  0.45454545454545453
b   24  0.5454545454545454

上面程式碼的邏輯如下:
1.先用reduceByKey根據key聚合。
2.用foldLeft方法算出所有key的總和。
3.對包含所有key的陣列進行遍歷,得到各個key的佔比。

3.foldLeft的簡單講解

上面用到了foldLeft函式。從本質上來講,fold函式是將一種格式的輸入資料轉化為另外一種格式返回。
foldLeft的原型如下:

  override /*TraversableLike*/
  def foldLeft[B](z: B)(op: (B, A) => B): B =
    foldl(0, length, z, op)

foldLeft有兩個輸入引數:初始值以及一個函式。而這個函式也包含有兩個輸入引數:累加值z與TraversableLike的當前item。
foldLeft方法開始執行以後,步驟如下:
1.初始值0作為第一個引數傳入foldLeft,array中的第一個item作為第二個引數f傳入foldLeft中。
2.foldLeft對兩個引數進行計算,上面的例子是將引數相加並返回。
3.foldLeft將上一步返回的值作為輸入函式的第一個引數,並且把array的下面一個item作為第二個引數傳入繼續計算。
4.重複上面的步驟,直到遍歷完array中的所有item。