spark:聚合函式填坑記之first
我們有一張表:
val df = spark.createDataset(Seq(
(1, "a", 66),
(2, "a", 22),
(3, "a", 11),
(4, "b", 22),
(5, "b", 66),
(6, "b", 11))).toDF("a", "b", "c")
df.show
+---+---+---+
| a| b| c|
+---+---+---+
| 1| a| 66|
| 2| a| 22|
| 3 | a| 11|
| 4| b| 22|
| 5| b| 66|
| 6| b| 11|
+---+---+---+
我們想要根據b列分組,然後聚合求出sum(c)、max(c)、max(c)所對應的a的值:
df.orderBy(desc("c"))
.groupBy("b")
.agg(sum("c"), max("c"), first("a"))
.show
+---+------+------+---------------+
| b|sum(c)|max(c)|first(a, false )|
+---+------+------+---------------+
| b| 99| 66| 4|
| a| 99| 66| 2|
+---+------+------+---------------+
觀察發現最後一列不對啊!然後多執行幾次,觀察結果:
+---+------+------+---------------+
| b|sum(c)|max(c)|first(a, false)|
+---+------+------+---------------+
| b| 99| 66 | 5|
| a| 99| 66| 2|
+---+------+------+---------------+
+---+------+------+---------------+
| b|sum(c)|max(c)|first(a, false)|
+---+------+------+---------------+
| b| 99| 66| 6|
| a| 99| 66| 2|
+---+------+------+---------------+
+---+------+------+---------------+
| b|sum(c)|max(c)|first(a, false)|
+---+------+------+---------------+
| b| 99| 66| 4|
| a| 99| 66| 1|
+---+------+------+---------------+
first函式返回的結果並不是固定的!我們檢視原始碼裡的文件發現:
/**
* Returns the first value ofchild
for a group of rows. If the first value ofchild
* isnull
, it returnsnull
(respecting nulls). Even if [[First]] is used on an already
* sorted column, if we do partial aggregation and final aggregation (when mergeExpression
* is used) its result will not be deterministic (unless the input table is sorted and has
* a single partition, and we use a single reducer to do the aggregation.).
*/
原來對排序後的Dataset使用first函式獲得的結果是不確定的(除非這個Dataset只有一個partition且只用一個reducer進行的聚合!)。
我們看一下df分割槽數量:
df.rdd.getNumPartitions
res9: Int = 6 //果然並非單一分割槽
知道了原因,那就改造程式,合併分割槽再試:
df.coalesce(1).orderBy(desc("c"))
.groupBy("b")
.agg(sum("c"), max("c"), first("a"))
.show
+---+------+------+---------------+
| b|sum(c)|max(c)|first(a, false)|
+---+------+------+---------------+
| a| 99| 66| 1|
| b| 99| 66| 5|
+---+------+------+---------------+
這次結果沒問題了。
類似的情況還有聚合函式last。
當資料量大的時候,使用合併分割槽的方式解決上面的問題顯然不是最好的,我們也可以用join聚合的方式實現同樣功能:
val df = spark.createDataset(Seq(
(1, "a", 77),
(2, "a", 22),
(3, "a", 11),
(4, "b", 22),
(5, "b", 77),
(6, "b", 77))).toDF("a", "b", "c")
df.show
+---+---+---+
| a| b| c|
+---+---+---+
| 1| a| 77|
| 2| a| 22|
| 3| a| 11|
| 4| b| 22|
| 5| b| 77|
| 6| b| 77|
+---+---+---+
val df1 = df.dropDuplicates("b", "c").withColumnRenamed("c", "max")
df.groupBy("b")
.agg(sum("c").as("sum"), max("c").as("max"))
.join(df1, Seq("b", "max"), "left")
.show
+---+---+---+---+
| b|max|sum| a|
+---+---+---+---+
| b| 77|176| 6|
| a| 77|110| 1|
+---+---+---+---+
當然我們也可以使用sql語句的分窗函式實現同樣功能,這裡就不舉例了。