rowsBetween + over視窗函式實際應用
阿新 • • 發佈:2021-07-05
over視窗函式的應用參見我上一篇部落格:https://www.cnblogs.com/wanpi/p/14969000.html
rows between函式:
- SQL語句中的rows between unbounded preceding and unbounded following ,其中:
- unbounded preceding:表示Long.MIN_VALUE,也就是可視當前行之前的所有資料
- unbounded following:表示Long.MAX_VALUE,也就是可視當前行之後的所有資料
- current row:表示當前行,也就是0
下面是幾個案例,幫助理解
需求1
A表裡面有三條記錄,欄位是
ID start_time end_time
2018-02-03 2019-02-03
2019-02-04 2020-03-04
2018-08-04 2019-03-04
根據已知的三條記錄用SQL寫出結果為:
2018-02-03 2018-08-04
2018-08-04 2019-02-03
2019-02-03 2019-02-04
2019-02-04 2019-03-04
2019-03-04 2020-03-04
解決思路
1.拆解時間資料
2.升序排列日期
3.視窗函式
程式碼
package method import org.apache.spark.sql.SparkSession import org.apache.spark.sql.expressions.Window object OnWindowFunction3 { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("windowfunction").master("local[*]").getOrCreate() val rdd = spark.sparkContext.makeRDD(List( (1,"2018-02-03","2019-02-03"), (2,"2019-02-04","2020-03-04"), (3,"2018-08-04","2019-03-04") )) import spark.implicits._ val df = rdd.flatMap(t3 => { Array(t3._2,t3._3) }).toDF("value") import org.apache.spark.sql.functions._ val w1 = Window .orderBy($"value" asc) .rowsBetween(0,1) df .withColumn("end_time",max("value") over(w1)) .show() spark.stop() } } //結果 +----------+----------+ | value| end_time| +----------+----------+ |2018-02-03|2018-08-04| |2018-08-04|2019-02-03| |2019-02-03|2019-02-04| |2019-02-04|2019-03-04| |2019-03-04|2020-03-04| |2020-03-04|2020-03-04| +----------+----------+
需求2
統計網站訪問時長。每個使用者訪問總時長
資料集
findsiteduration.csv
uid,date,dur
111,2019-06-20,1
111,2019-06-21,2
111,2019-06-22,3
222,2019-06-20,4
222,2019-06-21,5
222,2019-06-22,6
333,2019-06-20,7
333,2019-06-21,8
333,2019-06-22,9
444,2019-06-23,10
程式碼
package sparksql import org.apache.spark.sql.SparkSession object FindSiteDuration { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("test").master("local[*]").getOrCreate() val df = spark.read.option("header",true).csv(".\\resources\\findsiteduration.csv") df.createTempView("temp1") spark .sql( """ |select uid,date,dur, |sum(dur) over(partition by uid order by date) as totaldur |from temp1 |""".stripMargin).show() spark.stop() } } //結果 +---+----------+---+--------+ |uid| date|dur|totaldur| +---+----------+---+--------+ |111|2019-06-20| 1| 1.0| |111|2019-06-21| 2| 3.0| |111|2019-06-22| 3| 6.0| |444|2019-06-23| 10| 10.0| |222|2019-06-20| 4| 4.0| |222|2019-06-21| 5| 9.0| |222|2019-06-22| 6| 15.0| |333|2019-06-20| 7| 7.0| |333|2019-06-21| 8| 15.0| |333|2019-06-22| 9| 24.0| +---+----------+---+--------+ //每個使用者訪問當天和前一天兩天訪問時長 spark .sql( """ |select uid,date,dur, |sum(dur) over(partition by uid order by date rows between 1 preceding and current row) as totaldur |from temp1 |""".stripMargin).show() //結果 +---+----------+---+--------+ |uid| date|dur|totaldur| +---+----------+---+--------+ |111|2019-06-20| 1| 1.0| |111|2019-06-21| 2| 3.0| |111|2019-06-22| 3| 5.0| |444|2019-06-23| 10| 10.0| |222|2019-06-20| 4| 4.0| |222|2019-06-21| 5| 9.0| |222|2019-06-22| 6| 11.0| |333|2019-06-20| 7| 7.0| |333|2019-06-21| 8| 15.0| |333|2019-06-22| 9| 17.0| +---+----------+---+--------+ //每個使用者當天和前一天,後一天三天的網站訪問時長 spark .sql( """ |select uid,date,dur, |sum(dur) over(partition by uid order by date rows between 1 preceding and 1 following) as totaldur |from temp1 |""".stripMargin).show() //結果 +---+----------+---+--------+ |uid| date|dur|totaldur| +---+----------+---+--------+ |111|2019-06-20| 1| 3.0| |111|2019-06-21| 2| 6.0| |111|2019-06-22| 3| 5.0| |444|2019-06-23| 10| 10.0| |222|2019-06-20| 4| 9.0| |222|2019-06-21| 5| 15.0| |222|2019-06-22| 6| 11.0| |333|2019-06-20| 7| 15.0| |333|2019-06-21| 8| 24.0| |333|2019-06-22| 9| 17.0| +---+----------+---+--------+