Spark-SQL 使用SQL和DSL 統計使用者上網流量 案例
需求分析:
統計使用者上網流量,如果兩次上網的時間小於10分鐘,就可以rollup到一起
uid,start_time,end_time,flow
1,2020-02-18 14:20:30,2020-02-18 14:46:30,20
1,2020-02-18 14:47:20,2020-02-18 15:20:30,30
1,2020-02-18 15:37:23,2020-02-18 16:05:26,40
1,2020-02-18 16:06:27,2020-02-18 17:20:49,50
1,2020-02-18 17:21:50,2020-02-18 18:03:27,60
2,2020-02-18 14:18:24,2020-02-18 15:01:40,202,2020-02-18 15:20:49,2020-02-18 15:30:24,30
2,2020-02-18 16:01:23,2020-02-18 16:40:32,40
2,2020-02-18 16:44:56,2020-02-18 17:40:52,50
3,2020-02-18 14:39:58,2020-02-18 15:35:53,20
3,2020-02-18 15:36:39,2020-02-18 15:24:54,30
--向下推一格
select
uid,
start_time,
end_time,
flow,
lag(end_time,1,start_time) over(partition by uid order by start_time) lag_timefrom
t_int
+---+-------------------+-------------------+----+-------------------+
|uid| start_time| end_time|flow| lag_time|
+---+-------------------+-------------------+----+-------------------+
| 3|2020-02-18 14:39:58|2020-02-18 15:35:53| 20| null|
| 3|2020-02-18 15:36:39|2020-02-18 15:24:54| 30|2020-02-18 15:35:53|| 1|2020-02-18 14:20:30|2020-02-18 14:46:30| 20| null|
| 1|2020-02-18 14:47:20|2020-02-18 15:20:30| 30|2020-02-18 14:46:30|
| 1|2020-02-18 15:37:23|2020-02-18 16:05:26| 40|2020-02-18 15:20:30|
| 1|2020-02-18 16:06:27|2020-02-18 17:20:49| 50|2020-02-18 16:05:26|
| 1|2020-02-18 17:21:50|2020-02-18 18:03:27| 60|2020-02-18 17:20:49|
| 2|2020-02-18 14:18:24|2020-02-18 15:01:40| 20| null|
| 2|2020-02-18 15:20:49|2020-02-18 15:30:24| 30|2020-02-18 15:01:40|
| 2|2020-02-18 16:01:23|2020-02-18 16:40:32| 40|2020-02-18 15:30:24|
| 2|2020-02-18 16:44:56|2020-02-18 17:40:52| 50|2020-02-18 16:40:32|
+---+-------------------+-------------------+----+-------------------+--用lag_time 減去 start_time 如果差值大於10分鐘 返回1,小於10分鐘返回0
select
uid,
start_time,
end_time,
flow,
if((to_unix_timestamp(start_time)-to_unix_timestamp(lag_time))/60>10,1,0) flag
from
(
select
uid,
start_time,
end_time,
flow,
lag(end_time,1,start_time) over(partition by uid order by start_time) lag_time
from
t_int
)t1
+---+-------------------+-------------------+----+----+
|uid| start_time| end_time|flow|flag|
+---+-------------------+-------------------+----+----+
| 3|2020-02-18 14:39:58|2020-02-18 15:35:53| 20| 0|
| 3|2020-02-18 15:36:39|2020-02-18 15:24:54| 30| 0|
| 1|2020-02-18 14:20:30|2020-02-18 14:46:30| 20| 0|
| 1|2020-02-18 14:47:20|2020-02-18 15:20:30| 30| 0|
| 1|2020-02-18 15:37:23|2020-02-18 16:05:26| 40| 1|
| 1|2020-02-18 16:06:27|2020-02-18 17:20:49| 50| 0|
| 1|2020-02-18 17:21:50|2020-02-18 18:03:27| 60| 0|
| 2|2020-02-18 14:18:24|2020-02-18 15:01:40| 20| 0|
| 2|2020-02-18 15:20:49|2020-02-18 15:30:24| 30| 1|
| 2|2020-02-18 16:01:23|2020-02-18 16:40:32| 40| 1|
| 2|2020-02-18 16:44:56|2020-02-18 17:40:52| 50| 0|
+---+-------------------+-------------------+----+----+--根據視窗函式聚合flag
select
uid,
start_time,
end_time,
flow,
sum(flag) over(partition by uid order by start_time) sum_flag
from
(
select
uid,
start_time,
end_time,
flow,
if((to_unix_timestamp(start_time)-to_unix_timestamp(lag_time))/60>10,1,0) flag
from
(
select
uid,
start_time,
end_time,
flow,
lag(end_time,1,start_time) over(partition by uid order by start_time) lag_time
from
t_int
)t1
)t2
+---+-------------------+-------------------+----+--------+
|uid| start_time| end_time|flow|sum_flag|
+---+-------------------+-------------------+----+--------+
| 3|2020-02-18 14:39:58|2020-02-18 15:35:53| 20| 0|
| 3|2020-02-18 15:36:39|2020-02-18 15:24:54| 30| 0|
| 1|2020-02-18 14:20:30|2020-02-18 14:46:30| 20| 0|
| 1|2020-02-18 14:47:20|2020-02-18 15:20:30| 30| 0|
| 1|2020-02-18 15:37:23|2020-02-18 16:05:26| 40| 1|
| 1|2020-02-18 16:06:27|2020-02-18 17:20:49| 50| 1|
| 1|2020-02-18 17:21:50|2020-02-18 18:03:27| 60| 1|
| 2|2020-02-18 14:18:24|2020-02-18 15:01:40| 20| 0|
| 2|2020-02-18 15:20:49|2020-02-18 15:30:24| 30| 1|
| 2|2020-02-18 16:01:23|2020-02-18 16:40:32| 40| 2|
| 2|2020-02-18 16:44:56|2020-02-18 17:40:52| 50| 2|
+---+-------------------+-------------------+----+--------+--累加flow
select
uid,
min(start_time) start_time,
max(end_time) end_time,
sum(flow) sum_flow
from
(
select
uid,
start_time,
end_time,
flow,
sum(flag) over(partition by uid order by start_time) sum_flag
from
(
select
uid,
start_time,
end_time,
flow,
if((to_unix_timestamp(start_time)-to_unix_timestamp(lag_time))/60>10,1,0) flag
from
(
select
uid,
start_time,
end_time,
flow,
lag(end_time,1,start_time) over(partition by uid order by start_time) lag_time
from
t_int
)t1
)t2
)t3
group by uid,sum_flag+---+-------------------+-------------------+--------+
|uid| start_time| end_time|sum_flow|
+---+-------------------+-------------------+--------+
| 3|2020-02-18 14:39:58|2020-02-18 15:35:53| 50.0|
| 1|2020-02-18 14:20:30|2020-02-18 15:20:30| 50.0|
| 1|2020-02-18 15:37:23|2020-02-18 18:03:27| 150.0|
| 2|2020-02-18 14:18:24|2020-02-18 15:01:40| 20.0|
| 2|2020-02-18 15:20:49|2020-02-18 15:30:24| 30.0|
| 2|2020-02-18 16:01:23|2020-02-18 17:40:52| 90.0|
+---+-------------------+-------------------+--------+
1.使用SQL
import org.apache.spark.sql.{DataFrame, SparkSession}
object SQLFlowRollupDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName(this.getClass.getSimpleName)
.master("local[*]")
.getOrCreate()
val df: DataFrame = spark.read
.option("header", "true")
.csv("src/main/scala/data/data.csv")
df.createTempView("t_int")
val res = spark.sql(
"""
|
|select
| uid,
| min(start_time) start_time,
| max(end_time) end_time,
| sum(flow) sum_flow
|from
|(
| select
| uid,
| start_time,
| end_time,
| flow,
| sum(flag) over(partition by uid order by start_time) sum_flag
| from
| (
| select
| uid,
| start_time,
| end_time,
| flow,
| if((to_unix_timestamp(start_time)-to_unix_timestamp(lag_time))/60>10,1,0) flag
| from
| (
| select
| uid,
| start_time,
| end_time,
| flow,
| lag(end_time,1,start_time) over(partition by uid order by start_time) lag_time
| from
| t_int
| )t1
| )t2
|)t3
|group by uid,sum_flag
|
|""".stripMargin)
res.show()
spark.stop()
}
}
2.使用DSL
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{DataFrame, SparkSession}
object DSLFlowRollupDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName(this.getClass.getSimpleName)
.master("local[*]")
.getOrCreate()
val df: DataFrame = spark.read
.option("header", "true")
.csv("src/main/scala/data/data.csv")
import spark.implicits._
import org.apache.spark.sql.functions._
val res = df.select('uid,
'start_time,
'end_time,
'flow,
expr("lag(end_time,1,start_time)")
over(Window.partitionBy('uid) orderBy("start_time"))as "lag_time")
.select('uid,'start_time,'end_time,'flow,
expr("if((to_unix_timestamp(start_time)-to_unix_timestamp(lag_time))/60 > 10,1,0)") as "flag"
)
.select('uid,'start_time,'end_time,'flow,
sum('flag) over(Window.partitionBy("uid") orderBy("start_time")) as "sum_flag"
)
.groupBy("uid","sum_flag")
.agg('uid,
min("start_time") as "start_time",
max("end_time") as "end_time",
sum('flow)
)
res.show()
spark.stop()
}
}