spark 累加歷史 + 統計全部 + 行轉列
阿新 • • 發佈:2018-12-20
轉載自https://www.cnblogs.com/piaolingzxh/p/5538783.html
感覺寫的特別好,特別有用
spark 累加歷史主要用到了視窗函式,而進行全部統計,則需要用到rollup函式
1 應用場景:
1、我們需要統計使用者的總使用時長(累加歷史)
2、前臺展現頁面需要對多個維度進行查詢,如:產品、地區等等
3、需要展現的表格頭如: 產品、2015-04、2015-05、2015-06
2 原始資料:
product_code | event_date | duration |
---|---|---|
1438 | 2016-05-13 | 165 |
1438 | 2016-05-14 | 595 |
1438 | 2016-05-15 | 105 |
1629 | 2016-05-13 | 12340 |
1629 | 2016-05-14 | 13850 |
1629 | 2016-05-15 | 227 |
3 業務場景實現
3.1 業務場景1:累加歷史:
如資料來源所示:我們已經有當天使用者的使用時長,我們期望在進行統計的時候,14號能累加13號的,15號能累加14、13號的,以此類推
3.1.1 spark-sql實現
//spark sql 使用視窗函式累加歷史資料
sqlContext.sql( """ select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc) as sum_duration from userlogs_date """).show +-----+----------+------------+ |pcode|event_date|sum_duration| +-----+----------+------------+ | 1438|2016-05-13| 165| | 1438|2016-05-14| 760| | 1438|2016-05-15| 865| | 1629|2016-05-13| 12340| | 1629|2016-05-14| 26190| | 1629|2016-05-15| 26417| +-----+----------+------------+
3.1.2 dataframe實現
//使用Column提供的over 函式,傳入視窗操作 import org.apache.spark.sql.expressions._ val first_2_now_window = Window.partitionBy("pcode").orderBy("event_date") df_userlogs_date.select( $"pcode", $"event_date", sum($"duration").over(first_2_now_window).as("sum_duration") ).show +-----+----------+------------+ |pcode|event_date|sum_duration| +-----+----------+------------+ | 1438|2016-05-13| 165| | 1438|2016-05-14| 760| | 1438|2016-05-15| 865| | 1629|2016-05-13| 12340| | 1629|2016-05-14| 26190| | 1629|2016-05-15| 26417| +-----+----------+------------+
3.1.3 擴充套件 累加一段時間範圍內
實際業務中的累加邏輯遠比上面複雜,比如,累加之前N天,累加前N天到後N天等等。以下我們來實現:
3.1.3.1 累加歷史所有:
select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc) as sum_duration from userlogs_date
select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between unbounded preceding and current row) as sum_duration from userlogs_date
Window.partitionBy("pcode").orderBy("event_date").rowsBetween(Long.MinValue,0)
Window.partitionBy("pcode").orderBy("event_date")
上邊四種寫法完全相等
3.1.3.2 累加N天之前,假設N=3
//如果,不想要分割槽,想從每月的第一天累加的當前天 可以去掉partition
select pcode,event_date,sum(duration) over (partition by pcode order by
event_date asc rows between 3 preceding and current row) as sum_duration
from userlogs_date
Window.partitionBy("pcode").orderBy("event_date").rowsBetween(-3,0)
3.1.3.3 累加前N天,後M天: 假設N=3 M=5
select pcode,event_date,sum(duration) over (partition by pcode order by
event_date asc rows between 3 preceding and 5 following ) as sum_duration
from userlogs_date
Window.partitionBy("pcode").orderBy("event_date").rowsBetween(-3,5)
3.1.3.4 累加該分割槽內所有行
select pcode,event_date,sum(duration) over (partition by pcode order by
event_date asc rows between unbounded preceding and unbounded following )
as sum_duration from userlogs_date
Window.partitionBy("pcode").orderBy("event_date").rowsBetween
(Long.MinValue,Long.MaxValue)
總結如下:
preceding:用於累加前N行(分割槽之內)。若是從分割槽第一行頭開始,則為 unbounded。 N為:相對當前行向前的偏移量
following :與preceding相反,累加後N行(分割槽之內)。若是累加到該分割槽結束,則為 unbounded。N為:相對當前行向後的偏移量
current row:顧名思義,當前行,偏移量為0
說明:上邊的前N,後M,以及current row均會累加該偏移量所在行
3.1.3.4 實測結果
累加歷史:分割槽內當天及之前所有 寫法
1:select pcode,event_date,sum(duration) over (partition by pcode order by
event_date asc) as sum_duration from userlogs_date
+-----+----------+------------+
|pcode|event_date|sum_duration|
+-----+----------+------------+
| 1438|2016-05-13| 165|
| 1438|2016-05-14| 760|
| 1438|2016-05-15| 865|
| 1629|2016-05-13| 12340|
| 1629|2016-05-14| 26190|
| 1629|2016-05-15| 26417|
+-----+----------+------------+
累加歷史:分割槽內當天及之前所有 寫法2:
select pcode,event_date,sum(duration) over (partition by pcode order by
event_date asc rows between unbounded preceding and current row) as
sum_duration from userlogs_date
+-----+----------+------------+
|pcode|event_date|sum_duration|
+-----+----------+------------+
| 1438|2016-05-13| 165|
| 1438|2016-05-14| 760|
| 1438|2016-05-15| 865|
| 1629|2016-05-13| 12340|
| 1629|2016-05-14| 26190|
| 1629|2016-05-15| 26417|
+-----+----------+------------+
累加當日和昨天:
select pcode,event_date,sum(duration) over (partition by pcode order by
event_date asc rows between 1 preceding and current row) as sum_duration
from userlogs_date
+-----+----------+------------+
|pcode|event_date|sum_duration|
+-----+----------+------------+
| 1438|2016-05-13| 165|
| 1438|2016-05-14| 760|
| 1438|2016-05-15| 700|
| 1629|2016-05-13| 12340|
| 1629|2016-05-14| 26190|
| 1629|2016-05-15| 14077|
+-----+----------+------------+
累加當日、昨日、明日:
select pcode,event_date,sum(duration) over (partition by pcode order by
event_date asc rows between 1 preceding and 1 following ) as sum_duration
from userlogs_date
+-----+----------+------------+
|pcode|event_date|sum_duration|
+-----+----------+------------+
| 1438|2016-05-13| 760|
| 1438|2016-05-14| 865|
| 1438|2016-05-15| 700|
| 1629|2016-05-13| 26190|
| 1629|2016-05-14| 26417|
| 1629|2016-05-15| 14077|
+-----+----------+------------+
累加分割槽內所有:當天和之前之後所有:
select pcode,event_date,sum(duration) over (partition by pcode order by
event_date asc rows between unbounded preceding and unbounded following )
as sum_duration from userlogs_date
+-----+----------+------------+
|pcode|event_date|sum_duration|
+-----+----------+------------+
| 1438|2016-05-13| 865|
| 1438|2016-05-14| 865|
| 1438|2016-05-15| 865|
| 1629|2016-05-13| 26417|
| 1629|2016-05-14| 26417|
| 1629|2016-05-15| 26417|
+-----+----------+------------+
3.2 業務場景2:統計全部
3.2.1 spark sql實現
//spark sql 使用rollup新增all統計
sqlContext.sql(
"""
select pcode,event_date,sum(duration) as sum_duration
from userlogs_date_1
group by pcode,event_date with rollup
order by pcode,event_date
""").show()
+-----+----------+------------+
|pcode|event_date|sum_duration|
+-----+----------+------------+
| null| null| 27282|
| 1438| null| 865|
| 1438|2016-05-13| 165|
| 1438|2016-05-14| 595|
| 1438|2016-05-15| 105|
| 1629| null| 26417|
| 1629|2016-05-13| 12340|
| 1629|2016-05-14| 13850|
| 1629|2016-05-15| 227|
+-----+----------+------------+
3.2.2 dataframe函式實現
//使用dataframe提供的rollup函式,進行多維度all統計
df_userlogs_date.rollup($"pcode", $"event_date").agg(sum($"duration")).
orderBy($"pcode", $"event_date")
+-----+----------+-------------+
|pcode|event_date|sum(duration)|
+-----+----------+-------------+
| null| null| 27282|
| 1438| null| 865|
| 1438|2016-05-13| 165|
| 1438|2016-05-14| 595|
| 1438|2016-05-15| 105|
| 1629| null| 26417|
| 1629|2016-05-13| 12340|
| 1629|2016-05-14| 13850|
| 1629|2016-05-15| 227|
+-----+----------+-------------+
3.3 行轉列 ->pivot
pivot目前還沒有sql語法,先用df語法吧
複製程式碼
val userlogs_date_all = sqlContext.sql("select dcode, pcode,event_date,
sum(duration) as duration from userlogs group by dognum,
pcode,event_date ")
userlogs_date_all.registerTempTable("userlogs_date_all")
val dates = userlogs_date_all.select($"event_date").map
(row => row.getAs[String]("event_date")).distinct().collect().toList
userlogs_date_all.groupBy($"dcode", $"pcode").pivot("event_date", dates)
.sum("duration").na.fill(0).show
+-----------------+-----+----------+----------+----------+----------+
| dcode|pcode|2016-05-26|2016-05-13|2016-05-14|2016-05-15|
+-----------------+-----+----------+----------+----------+----------+
| F2429186| 1438| 0| 0| 227| 0|
| AI2342441| 1438| 0| 0| 0| 345|
| A320018711| 1438| 0| 939| 0| 0|
| H2635817| 1438| 0| 522| 0| 0|
| D0288196| 1438| 0| 101| 0| 0|
| Y0242218| 1438| 0| 1036| 0| 0|
| H2392574| 1438| 0| 0| 689| 0|
| D2245588| 1438| 0| 0| 1| 0|
| Y2514906| 1438| 0| 0| 118| 4|
| H2540419| 1438| 0| 465| 242| 5|
| R2231926| 1438| 0| 0| 305| 0|
| H2684591| 1438| 0| 136| 0| 0|
| A2548470| 1438| 0| 412| 0| 0|
| GH000309| 1438| 0| 0| 0| 4|
| H2293216| 1438| 0| 0| 0| 534|
| R2170601| 1438| 0| 0| 0| 0|
|B2365238;B2559538| 1438| 0| 0| 0| 0|
| BQ005465| 1438| 0| 0| 642| 78|
| AH2180324| 1438| 0| 608| 146| 36|
| H0279306| 1438| 0| 490| 0| 0|
+-----------------+-----+----------+----------+----------+----------+
附錄
下面是這兩個函式的官方api說明:
org.apache.spark.sql.scala
1
def rollup(col1: String, cols: String*): GroupedData
Create a multi-dimensional rollup for the current DataFrame using the specified columns, so we can run aggregation on them. See GroupedData for all the available aggregate functions.
This is a variant of rollup that can only group by existing columns using column names (i.e. cannot construct expressions).
// Compute the average for all numeric columns rolluped by department and group.
df.rollup("department", "group").avg()
// Compute the max age and average salary, rolluped by department and gender.
df.rollup($"department", $"gender").agg(Map(
"salary" -> "avg",
"age" -> "max"
))
def rollup(cols: Column*): GroupedData
Create a multi-dimensional rollup for the current DataFrame using the specified columns, so we can run aggregation on them. See GroupedData for all the available aggregate functions.
df.rollup($"department", $"group").avg()
// Compute the max age and average salary, rolluped by department and gender.
df.rollup($"department", $"gender").agg(Map(
"salary" -> "avg",
"age" -> "max"
))
org.apache.spark.sql.Column.scala
def over(window: WindowSpec): Column
Define a windowing column.
val w = Window.partitionBy("name").orderBy("id")
df.select(
sum("price").over(w.rangeBetween(Long.MinValue, 2)),
avg("price").over(w.rowsBetween(0, 4))
)