Spark 按key聚合求平均值與佔比
1.求key的平均值
k,v結構的資料中,求每個key對應的平均值,在spark中怎麼應該怎麼求?
例如有如下的資料:
("a",10)
("b",4)
("a",10)
("b",20)
想求a,b對應的平均值。
直接上程式碼
sc.parallelize(List(("a",10),("b",4),("a",10),("b",20))).mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)).mapValues(x => (x._1, x._2, x._1.toDouble / x._2.toDouble)) .collect()
在spark-shell中執行上述的程式碼以後,輸出如下:
Array[(String, (Int, Int, Double))] = Array((a,(20,2,10.0)), (b,(24,2,12.0)))
簡單分析一下上面的程式碼邏輯:
mapValues
是PairRDDFunctions
中的方法,顧名思義,是對kv結構的rdd的value進行map的操作。
然後進行reduceByKey
操作,此時將value中的值累加,對應出現的次數也累加。
最後再呼叫mapValues
方法,求每個key的平均值即可。
2.求key對應的value值的佔比
同樣是上面的資料,我們想求a,b對應的value值分別佔比是多少,該怎麼計算?
val array = sc.parallelize(List(("a",10),("b",4),("a",10),("b",20))).reduceByKey(_ + _).collect()
val sum = array.foldLeft(0)({ (z, f) => z + f._2 })
array.map(x => println("%s\t%s\t%s".format(x._1, x._2, x._2.toDouble / sum)))
在spark-shell中執行上述的程式碼以後,輸出如下:
a 20 0.45454545454545453
b 24 0.5454545454545454
上面程式碼的邏輯如下:
1.先用reduceByKey
根據key聚合。
2.用foldLeft
方法算出所有key的總和。
3.對包含所有key的陣列進行遍歷,得到各個key的佔比。
3.foldLeft的簡單講解
上面用到了foldLeft
函式。從本質上來講,fold函式是將一種格式的輸入資料轉化為另外一種格式返回。
foldLeft
的原型如下:
override /*TraversableLike*/
def foldLeft[B](z: B)(op: (B, A) => B): B =
foldl(0, length, z, op)
foldLeft
有兩個輸入引數:初始值以及一個函式。而這個函式也包含有兩個輸入引數:累加值z與TraversableLike
的當前item。
foldLeft
方法開始執行以後,步驟如下:
1.初始值0作為第一個引數傳入foldLeft,array中的第一個item作為第二個引數f傳入foldLeft中。
2.foldLeft對兩個引數進行計算,上面的例子是將引數相加並返回。
3.foldLeft將上一步返回的值作為輸入函式的第一個引數,並且把array的下面一個item作為第二個引數傳入繼續計算。
4.重複上面的步驟,直到遍歷完array中的所有item。