Spark Streaming--開窗函式over()
阿新 • • 發佈:2019-01-05
over()開窗函式
-
在使用聚合函式後,會將多行變成一行,而開窗函式是將一行變成多行;
-
並且在使用聚合函式後,如果要顯示其他的列必須將列加入到group by中,而使用開窗函式後,可以不使用group by,直接將所有資訊顯示出來。
-
開窗函式適用於在每一行的最後一列新增聚合函式的結果。
開窗函式作用
-
為每條資料顯示聚合資訊.(聚合函式() over())
-
為每條資料提供分組的聚合函式結果(聚合函式() over(partition by 欄位) as 別名)
--按照欄位分組,分組後進行計算 -
與排名函式一起使用(row number() over(order by 欄位) as 別名)
常用分析函式:(最常用的應該是1.2.3 的排序)
-
row_number() over(partition by ... order by ...)
-
rank() over(partition by ... order by ...)
-
dense_rank() over(partition by ... order by ...)
-
count() over(partition by ... order by ...)
-
max() over(partition by ... order by ...)
-
min() over(partition by ... order by ...)
-
sum() over(partition by ... order by ...)
-
avg() over(partition by ... order by ...)
-
first_value() over(partition by ... order by ...)
-
last_value() over(partition by ... order by ...)
-
lag() over(partition by ... order by ...)
-
lead() over(partition by ... order by ...)
lag 和lead 可以獲取結果集中,按一定排序所排列的當前行的上下相鄰若干offset 的某個行的某個列(不用結果集的自關聯);
lag ,lead 分別是向前,向後;
lag 和lead 有三個引數,第一個引數是列名,第二個引數是偏移的offset,第三個引數是超出記錄視窗時的預設值
簡單的開窗函式範例:
package com.jiangnan.spark
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
* 姓名 班級 分數
* 101 1 90
* 120 1 88
*/
case class Score(name:String,clazz:Int,score: Int)
object OpenFunction extends App {
val conf = new SparkConf().setAppName("").setMaster("local[2]")
val spark = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
//生產資料
val score = spark.sparkContext.makeRDD(List(
Score("1001",1,90),
Score("1002",2,95),
Score("1003",3,90),
Score("1004",1,92),
Score("1005",1,88),
Score("1005",2,66),
Score("1005",3,90),
Score("1005",1,93),
Score("1005",3,99),
Score("1005",2,90),
Score("1005",2,92),
Score("1005",3,90)
)).toDF("name","clazz","score")
println("---------原始資料----------")
score.show()
println("---------求每個班級最高成績的學生---原始做法------")
//建立一個表
score.createOrReplaceTempView("score")
println("-----------分組後求出每個班最高分數表結構----------")
spark.sql("select clazz,max(score) max from score group by clazz").show()
println("-----------原始做法,最終結果--------------")
spark.sql("select a.name,b.clazz,b.max from score a,(select clazz,max(score) max from score group by clazz) b where a.score = b.max").show()
println("------------使用開窗函式後的運算過程-------------")
spark.sql("select name,clazz,score,rank() over(partition by clazz order by score desc) rank from score").show()
println("------------使用開窗函式後最終結果-------------")
spark.sql("select * from (select name,clazz,score,rank() over(partition by clazz order by score desc) rank from score) r where r.rank = 1").show()
spark.stop()