1. 程式人生 > 其它 >Spark-SQL 使用SQL和DSL 統計使用者上網流量 案例

Spark-SQL 使用SQL和DSL 統計使用者上網流量 案例

技術標籤:Sparkspark

需求分析:

統計使用者上網流量,如果兩次上網的時間小於10分鐘,就可以rollup到一起

uid,start_time,end_time,flow
1,2020-02-18 14:20:30,2020-02-18 14:46:30,20
1,2020-02-18 14:47:20,2020-02-18 15:20:30,30
1,2020-02-18 15:37:23,2020-02-18 16:05:26,40
1,2020-02-18 16:06:27,2020-02-18 17:20:49,50
1,2020-02-18 17:21:50,2020-02-18 18:03:27,60
2,2020-02-18 14:18:24,2020-02-18 15:01:40,20

2,2020-02-18 15:20:49,2020-02-18 15:30:24,30
2,2020-02-18 16:01:23,2020-02-18 16:40:32,40
2,2020-02-18 16:44:56,2020-02-18 17:40:52,50
3,2020-02-18 14:39:58,2020-02-18 15:35:53,20
3,2020-02-18 15:36:39,2020-02-18 15:24:54,30


--向下推一格


select
uid,
start_time,
end_time,
flow,
lag(end_time,1,start_time) over(partition by uid order by start_time) lag_time

from
t_int

+---+-------------------+-------------------+----+-------------------+
|uid| start_time| end_time|flow| lag_time|
+---+-------------------+-------------------+----+-------------------+
| 3|2020-02-18 14:39:58|2020-02-18 15:35:53| 20| null|
| 3|2020-02-18 15:36:39|2020-02-18 15:24:54| 30|2020-02-18 15:35:53|
| 1|2020-02-18 14:20:30|2020-02-18 14:46:30| 20| null|
| 1|2020-02-18 14:47:20|2020-02-18 15:20:30| 30|2020-02-18 14:46:30|
| 1|2020-02-18 15:37:23|2020-02-18 16:05:26| 40|2020-02-18 15:20:30|
| 1|2020-02-18 16:06:27|2020-02-18 17:20:49| 50|2020-02-18 16:05:26|
| 1|2020-02-18 17:21:50|2020-02-18 18:03:27| 60|2020-02-18 17:20:49|
| 2|2020-02-18 14:18:24|2020-02-18 15:01:40| 20| null|
| 2|2020-02-18 15:20:49|2020-02-18 15:30:24| 30|2020-02-18 15:01:40|
| 2|2020-02-18 16:01:23|2020-02-18 16:40:32| 40|2020-02-18 15:30:24|
| 2|2020-02-18 16:44:56|2020-02-18 17:40:52| 50|2020-02-18 16:40:32|
+---+-------------------+-------------------+----+-------------------+

--用lag_time 減去 start_time 如果差值大於10分鐘 返回1,小於10分鐘返回0

select
uid,
start_time,
end_time,
flow,
if((to_unix_timestamp(start_time)-to_unix_timestamp(lag_time))/60>10,1,0) flag
from
(
select
uid,
start_time,
end_time,
flow,
lag(end_time,1,start_time) over(partition by uid order by start_time) lag_time
from
t_int
)t1


+---+-------------------+-------------------+----+----+
|uid| start_time| end_time|flow|flag|
+---+-------------------+-------------------+----+----+
| 3|2020-02-18 14:39:58|2020-02-18 15:35:53| 20| 0|
| 3|2020-02-18 15:36:39|2020-02-18 15:24:54| 30| 0|
| 1|2020-02-18 14:20:30|2020-02-18 14:46:30| 20| 0|
| 1|2020-02-18 14:47:20|2020-02-18 15:20:30| 30| 0|
| 1|2020-02-18 15:37:23|2020-02-18 16:05:26| 40| 1|
| 1|2020-02-18 16:06:27|2020-02-18 17:20:49| 50| 0|
| 1|2020-02-18 17:21:50|2020-02-18 18:03:27| 60| 0|
| 2|2020-02-18 14:18:24|2020-02-18 15:01:40| 20| 0|
| 2|2020-02-18 15:20:49|2020-02-18 15:30:24| 30| 1|
| 2|2020-02-18 16:01:23|2020-02-18 16:40:32| 40| 1|
| 2|2020-02-18 16:44:56|2020-02-18 17:40:52| 50| 0|
+---+-------------------+-------------------+----+----+

--根據視窗函式聚合flag
select
uid,
start_time,
end_time,
flow,
sum(flag) over(partition by uid order by start_time) sum_flag
from
(
select
uid,
start_time,
end_time,
flow,
if((to_unix_timestamp(start_time)-to_unix_timestamp(lag_time))/60>10,1,0) flag
from
(
select
uid,
start_time,
end_time,
flow,
lag(end_time,1,start_time) over(partition by uid order by start_time) lag_time
from
t_int
)t1
)t2


+---+-------------------+-------------------+----+--------+
|uid| start_time| end_time|flow|sum_flag|
+---+-------------------+-------------------+----+--------+
| 3|2020-02-18 14:39:58|2020-02-18 15:35:53| 20| 0|
| 3|2020-02-18 15:36:39|2020-02-18 15:24:54| 30| 0|
| 1|2020-02-18 14:20:30|2020-02-18 14:46:30| 20| 0|
| 1|2020-02-18 14:47:20|2020-02-18 15:20:30| 30| 0|
| 1|2020-02-18 15:37:23|2020-02-18 16:05:26| 40| 1|
| 1|2020-02-18 16:06:27|2020-02-18 17:20:49| 50| 1|
| 1|2020-02-18 17:21:50|2020-02-18 18:03:27| 60| 1|
| 2|2020-02-18 14:18:24|2020-02-18 15:01:40| 20| 0|
| 2|2020-02-18 15:20:49|2020-02-18 15:30:24| 30| 1|
| 2|2020-02-18 16:01:23|2020-02-18 16:40:32| 40| 2|
| 2|2020-02-18 16:44:56|2020-02-18 17:40:52| 50| 2|
+---+-------------------+-------------------+----+--------+

--累加flow

select
uid,
min(start_time) start_time,
max(end_time) end_time,
sum(flow) sum_flow
from
(
select
uid,
start_time,
end_time,
flow,
sum(flag) over(partition by uid order by start_time) sum_flag
from
(
select
uid,
start_time,
end_time,
flow,
if((to_unix_timestamp(start_time)-to_unix_timestamp(lag_time))/60>10,1,0) flag
from
(
select
uid,
start_time,
end_time,
flow,
lag(end_time,1,start_time) over(partition by uid order by start_time) lag_time
from
t_int
)t1
)t2
)t3
group by uid,sum_flag

+---+-------------------+-------------------+--------+
|uid| start_time| end_time|sum_flow|
+---+-------------------+-------------------+--------+
| 3|2020-02-18 14:39:58|2020-02-18 15:35:53| 50.0|
| 1|2020-02-18 14:20:30|2020-02-18 15:20:30| 50.0|
| 1|2020-02-18 15:37:23|2020-02-18 18:03:27| 150.0|
| 2|2020-02-18 14:18:24|2020-02-18 15:01:40| 20.0|
| 2|2020-02-18 15:20:49|2020-02-18 15:30:24| 30.0|
| 2|2020-02-18 16:01:23|2020-02-18 17:40:52| 90.0|
+---+-------------------+-------------------+--------+

1.使用SQL

import org.apache.spark.sql.{DataFrame, SparkSession}

object SQLFlowRollupDemo {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName(this.getClass.getSimpleName)
      .master("local[*]")
      .getOrCreate()

    val df: DataFrame = spark.read
      .option("header", "true")
      .csv("src/main/scala/data/data.csv")

    df.createTempView("t_int")

    val res = spark.sql(
      """
        |
        |select
        |  uid,
        |  min(start_time) start_time,
        |  max(end_time) end_time,
        |  sum(flow) sum_flow
        |from
        |(
        |  select
        |    uid,
        |    start_time,
        |    end_time,
        |    flow,
        |    sum(flag) over(partition by uid order by start_time) sum_flag
        |  from
        |  (
        |    select
        |      uid,
        |      start_time,
        |      end_time,
        |      flow,
        |      if((to_unix_timestamp(start_time)-to_unix_timestamp(lag_time))/60>10,1,0) flag
        |    from
        |    (
        |      select
        |        uid,
        |        start_time,
        |        end_time,
        |        flow,
        |        lag(end_time,1,start_time) over(partition by uid order by start_time) lag_time
        |      from
        |        t_int
        |    )t1
        |  )t2
        |)t3
        |group by uid,sum_flag
        |
        |""".stripMargin)

    res.show()

    spark.stop()
  }

}

2.使用DSL

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{DataFrame, SparkSession}

object DSLFlowRollupDemo {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .appName(this.getClass.getSimpleName)
      .master("local[*]")
      .getOrCreate()

    val df: DataFrame = spark.read
      .option("header", "true")
      .csv("src/main/scala/data/data.csv")

    import spark.implicits._
    import org.apache.spark.sql.functions._
    val res = df.select('uid,
      'start_time,
      'end_time,
      'flow,
      expr("lag(end_time,1,start_time)")
        over(Window.partitionBy('uid) orderBy("start_time"))as "lag_time")
      .select('uid,'start_time,'end_time,'flow,
      expr("if((to_unix_timestamp(start_time)-to_unix_timestamp(lag_time))/60 > 10,1,0)") as "flag"
      )
      .select('uid,'start_time,'end_time,'flow,
        sum('flag) over(Window.partitionBy("uid") orderBy("start_time")) as "sum_flag"
      )
      .groupBy("uid","sum_flag")
      .agg('uid,
        min("start_time") as "start_time",
        max("end_time") as "end_time",
        sum('flow)
      )

    res.show()

    spark.stop()
  }
}