SparkSQL | 視窗函式
阿新 • • 發佈:2020-03-01
視窗函式的定義引用一個大佬的定義: a window function calculates a return value for every input row of a table based on a group of rows。視窗函式與與其他函式的區別:
- 普通函式: 作用於每一條記錄,計算出一個新列(記錄數不變);
- 聚合函式: 作用於一組記錄(全部資料按照某種方式分為多組),計算出一個聚合值(記錄數變小);
- 視窗函式: 作用於每一條記錄,逐條記錄去指定多條記錄來計算一個值(記錄數不變)。
視窗函式語法結構: 函式名(引數)OVER(PARTITION BY 子句 ORDER BY 子句 ROWS/RANGE子句)
- 函式名:
- OVER: 關鍵字,說明這是視窗函式,不是普通的聚合函式;
- 子句
- PARTITION BY: 分組欄位
- ORDER BY: 排序欄位
- ROWS/RANG視窗子句: 用於控制視窗的尺寸邊界,有兩種(ROW,RANGE)
- ROW: 物理視窗,資料篩選基於排序後的index
- RANGE: 邏輯視窗,資料篩選基於值
主要有以下三種視窗函式
- ranking functions
- analytic functions
- aggregate functions
資料載入
from pyspark.sql.types import * schema = StructType().add('name',StringType(),True).add('department',True).add('salary',IntegerType(),True) df = spark.createDataFrame([ ("Tom","Sales",4500),("Georgi",4200),("Kyoichi",3000),("Berni",4700),("Guoxiang",("Parto","Finance",2700),("Anneke",3300),("Sumant",3900),("Jeff","Marketing",3100),("Patricio",2500) ],schema=schema) df.createOrReplaceTempView('salary') df.show()
+--------+----------+------+ | name|department|salary| +--------+----------+------+ | Tom| Sales| 4500| | Georgi| Sales| 4200| | Kyoichi| Sales| 3000| | Berni| Sales| 4700| |Guoxiang| Sales| 4200| | Parto| Finance| 2700| | Anneke| Finance| 3300| | Sumant| Finance| 3900| | Jeff| Marketing| 3100| |Patricio| Marketing| 2500| +--------+----------+------+
ranking functions
sql | DataFrame | 功能 |
---|---|---|
row_number | rowNumber | 從1~n的唯一序號值 |
rank | rank | 與denseRank一樣,都是排名,對於相同的數值,排名一致。區別:rank不會跳過並列的排名 |
dense_rank | denseRank | 同rank |
percent_rank | percentRank | 計算公式: (組內排名-1)/(組內行數-1),如果組內只有1行,則結果為0 |
ntile | ntile | 將組內資料排序後,按照指定的n切分為n個桶,該值為當前行的桶號(桶號從1開始) |
spark.sql("""
SELECT
name,department,salary,row_number() over(partition by department order by salary) as index,rank() over(partition by department order by salary) as rank,dense_rank() over(partition by department order by salary) as dense_rank,percent_rank() over(partition by department order by salary) as percent_rank,ntile(2) over(partition by department order by salary) as ntile
FROM salary
""").toPandas()
name | department | salary | index | rank | dense_rank | percent_rank | ntile | |
---|---|---|---|---|---|---|---|---|
0 | Patricio | Marketing | 2500 | 1 | 1 | 1 | 0.00 | 1 |
1 | Jeff | Marketing | 3100 | 2 | 2 | 2 | 1.00 | 2 |
2 | Kyoichi | Sales | 3000 | 1 | 1 | 1 | 0.00 | 1 |
3 | Georgi | Sales | 4200 | 2 | 2 | 2 | 0.25 | 1 |
4 | Guoxiang | Sales | 4200 | 3 | 2 | 2 | 0.25 | 1 |
5 | Tom | Sales | 4500 | 4 | 4 | 3 | 0.75 | 2 |
6 | Berni | Sales | 4700 | 5 | 5 | 4 | 1.00 | 2 |
7 | Parto | Finance | 2700 | 1 | 1 | 1 | 0.00 | 1 |
8 | Anneke | Finance | 3300 | 2 | 2 | 2 | 0.50 | 1 |
9 | Sumant | Finance | 3900 | 3 | 3 | 3 | 1.00 | 2 |
analytic functions
sql | DataFrame | 功能 |
---|---|---|
cume_dist | cumeDist | 計算公式: 組內小於等於值當前行數/組內總行數 |
lag | lag | lag(input,[offset,[default]]) 當前index<offset返回defalult(預設defalult=null),否則返回input |
lead | lead | 與lag相反 |
spark.sql("""
SELECT
name,cume_dist() over(partition by department order by salary) as cume_dist,lag('salary',2) over(partition by department order by salary) as lag,lead('salary',2) over(partition by department order by salary) as lead
FROM salary
""").toPandas()
name | department | salary | index | cume_dist | lag | lead | |
---|---|---|---|---|---|---|---|
0 | Patricio | Marketing | 2500 | 1 | 0.500000 | None | None |
1 | Jeff | Marketing | 3100 | 2 | 1.000000 | None | None |
2 | Kyoichi | Sales | 3000 | 1 | 0.200000 | None | salary |
3 | Georgi | Sales | 4200 | 2 | 0.600000 | None | salary |
4 | Guoxiang | Sales | 4200 | 3 | 0.600000 | salary | salary |
5 | Tom | Sales | 4500 | 4 | 0.800000 | salary | None |
6 | Berni | Sales | 4700 | 5 | 1.000000 | salary | None |
7 | Parto | Finance | 2700 | 1 | 0.333333 | None | salary |
8 | Anneke | Finance | 3300 | 2 | 0.666667 | None | None |
9 | Sumant | Finance | 3900 | 3 | 1.000000 | salary | None |
aggregate functions
只是在一定窗口裡實現一些普通的聚合函式。
sql | 功能 |
---|---|
avg | 平均值 |
sum | 求和 |
min | 最小值 |
max | 最大值 |
spark.sql("""
SELECT
name,sum(salary) over(partition by department order by salary) as sum,avg(salary) over(partition by department order by salary) as avg,min(salary) over(partition by department order by salary) as min,max(salary) over(partition by department order by salary) as max
FROM salary
""").toPandas()
name | department | salary | index | sum | avg | min | max | |
---|---|---|---|---|---|---|---|---|
0 | Patricio | Marketing | 2500 | 1 | 2500 | 2500.0 | 2500 | 2500 |
1 | Jeff | Marketing | 3100 | 2 | 5600 | 2800.0 | 2500 | 3100 |
2 | Kyoichi | Sales | 3000 | 1 | 3000 | 3000.0 | 3000 | 3000 |
3 | Georgi | Sales | 4200 | 2 | 11400 | 3800.0 | 3000 | 4200 |
4 | Guoxiang | Sales | 4200 | 3 | 11400 | 3800.0 | 3000 | 4200 |
5 | Tom | Sales | 4500 | 4 | 15900 | 3975.0 | 3000 | 4500 |
6 | Berni | Sales | 4700 | 5 | 20600 | 4120.0 | 3000 | 4700 |
7 | Parto | Finance | 2700 | 1 | 2700 | 2700.0 | 2700 | 2700 |
8 | Anneke | Finance | 3300 | 2 | 6000 | 3000.0 | 2700 | 3300 |
9 | Sumant | Finance | 3900 | 3 | 9900 | 3300.0 | 2700 | 3900 |
視窗子句
ROWS/RANG視窗子句: 用於控制視窗的尺寸邊界,有兩種(ROW,RANGE)
- ROWS: 物理視窗,資料篩選基於排序後的index
- RANGE: 邏輯視窗,資料篩選基於值
語法:OVER (PARTITION BY … ORDER BY … frame_type BETWEEN start AND end)
有以下5種邊界
- CURRENT ROW:
- UNBOUNDED PRECEDING: 分割槽第一行
- UNBOUNDED FOLLOWING: 分割槽最後一行
- n PRECEDING: 前n行
- n FOLLOWING: 後n行
- UNBOUNDED: 起點
spark.sql("""
SELECT
name,row_number() over(partition by department order by salary rows between UNBOUNDED PRECEDING and CURRENT ROW) as index1
FROM salary
""").toPandas()
name | department | salary | index | index1 | |
---|---|---|---|---|---|
0 | Patricio | Marketing | 2500 | 1 | 1 |
1 | Jeff | Marketing | 3100 | 2 | 2 |
2 | Kyoichi | Sales | 3000 | 1 | 1 |
3 | Georgi | Sales | 4200 | 2 | 2 |
4 | Guoxiang | Sales | 4200 | 3 | 3 |
5 | Tom | Sales | 4500 | 4 | 4 |
6 | Berni | Sales | 4700 | 5 | 5 |
7 | Parto | Finance | 2700 | 1 | 1 |
8 | Anneke | Finance | 3300 | 2 | 2 |
9 | Sumant | Finance | 3900 | 3 | 3 |
混合應用
spark.sql("""
SELECT
name,salary - (min(salary) over(partition by department order by salary)) as salary_diff
FROM salary
""").toPandas()
name | department | salary | index | salary_diff | |
---|---|---|---|---|---|
0 | Patricio | Marketing | 2500 | 1 | 0 |
1 | Jeff | Marketing | 3100 | 2 | 600 |
2 | Kyoichi | Sales | 3000 | 1 | 0 |
3 | Georgi | Sales | 4200 | 2 | 1200 |
4 | Guoxiang | Sales | 4200 | 3 | 1200 |
5 | Tom | Sales | 4500 | 4 | 1500 |
6 | Berni | Sales | 4700 | 5 | 1700 |
7 | Parto | Finance | 2700 | 1 | 0 |
8 | Anneke | Finance | 3300 | 2 | 600 |
9 | Sumant | Finance | 3900 | 3 | 1200 |
參考
- Introducing Window Functions in Spark SQL
- Standard Functions for Window Aggregation (Window Functions
- List Of Spark SQL Window Functions
- 在hive、Spark SQL中引入視窗函式