1. 程式人生 > 資料庫 >SparkSQL | 視窗函式

SparkSQL | 視窗函式

視窗函式的定義引用一個大佬的定義: a window function calculates a return value for every input row of a table based on a group of rows。視窗函式與與其他函式的區別:

  • 普通函式: 作用於每一條記錄,計算出一個新列(記錄數不變);
  • 聚合函式: 作用於一組記錄(全部資料按照某種方式分為多組),計算出一個聚合值(記錄數變小);
  • 視窗函式: 作用於每一條記錄,逐條記錄去指定多條記錄來計算一個值(記錄數不變)。

視窗函式語法結構: 函式名(引數)OVER(PARTITION BY 子句 ORDER BY 子句 ROWS/RANGE子句)

  • 函式名:
  • OVER: 關鍵字,說明這是視窗函式,不是普通的聚合函式;
  • 子句
    • PARTITION BY: 分組欄位
    • ORDER BY: 排序欄位
    • ROWS/RANG視窗子句: 用於控制視窗的尺寸邊界,有兩種(ROW,RANGE)
      • ROW: 物理視窗,資料篩選基於排序後的index
      • RANGE: 邏輯視窗,資料篩選基於值

主要有以下三種視窗函式

  • ranking functions
  • analytic functions
  • aggregate functions

資料載入

from pyspark.sql.types import *


schema = StructType().add('name',StringType(),True).add('department',True).add('salary',IntegerType(),True)
df = spark.createDataFrame([
    ("Tom","Sales",4500),("Georgi",4200),("Kyoichi",3000),("Berni",4700),("Guoxiang",("Parto","Finance",2700),("Anneke",3300),("Sumant",3900),("Jeff","Marketing",3100),("Patricio",2500)
],schema=schema)
df.createOrReplaceTempView('salary')
df.show()

+--------+----------+------+
|    name|department|salary|
+--------+----------+------+
|     Tom|     Sales|  4500|
|  Georgi|     Sales|  4200|
| Kyoichi|     Sales|  3000|
|   Berni|     Sales|  4700|
|Guoxiang|     Sales|  4200|
|   Parto|   Finance|  2700|
|  Anneke|   Finance|  3300|
|  Sumant|   Finance|  3900|
|    Jeff| Marketing|  3100|
|Patricio| Marketing|  2500|
+--------+----------+------+

ranking functions

sql DataFrame 功能
row_number rowNumber 從1~n的唯一序號值
rank rank 與denseRank一樣,都是排名,對於相同的數值,排名一致。區別:rank不會跳過並列的排名
dense_rank denseRank 同rank
percent_rank percentRank 計算公式: (組內排名-1)/(組內行數-1),如果組內只有1行,則結果為0
ntile ntile 將組內資料排序後,按照指定的n切分為n個桶,該值為當前行的桶號(桶號從1開始)
spark.sql("""
SELECT
    name,department,salary,row_number() over(partition by department order by salary) as index,rank() over(partition by department order by salary) as rank,dense_rank() over(partition by department order by salary) as dense_rank,percent_rank() over(partition by department order by salary) as percent_rank,ntile(2) over(partition by department order by salary) as ntile
FROM salary
""").toPandas()
name department salary index rank dense_rank percent_rank ntile
0 Patricio Marketing 2500 1 1 1 0.00 1
1 Jeff Marketing 3100 2 2 2 1.00 2
2 Kyoichi Sales 3000 1 1 1 0.00 1
3 Georgi Sales 4200 2 2 2 0.25 1
4 Guoxiang Sales 4200 3 2 2 0.25 1
5 Tom Sales 4500 4 4 3 0.75 2
6 Berni Sales 4700 5 5 4 1.00 2
7 Parto Finance 2700 1 1 1 0.00 1
8 Anneke Finance 3300 2 2 2 0.50 1
9 Sumant Finance 3900 3 3 3 1.00 2

analytic functions

sql DataFrame 功能
cume_dist cumeDist 計算公式: 組內小於等於值當前行數/組內總行數
lag lag lag(input,[offset,[default]]) 當前index<offset返回defalult(預設defalult=null),否則返回input
lead lead 與lag相反
spark.sql("""
SELECT
    name,cume_dist() over(partition by department order by salary) as cume_dist,lag('salary',2) over(partition by department order by salary) as lag,lead('salary',2) over(partition by department order by salary) as lead    
    
FROM salary
""").toPandas()
name department salary index cume_dist lag lead
0 Patricio Marketing 2500 1 0.500000 None None
1 Jeff Marketing 3100 2 1.000000 None None
2 Kyoichi Sales 3000 1 0.200000 None salary
3 Georgi Sales 4200 2 0.600000 None salary
4 Guoxiang Sales 4200 3 0.600000 salary salary
5 Tom Sales 4500 4 0.800000 salary None
6 Berni Sales 4700 5 1.000000 salary None
7 Parto Finance 2700 1 0.333333 None salary
8 Anneke Finance 3300 2 0.666667 None None
9 Sumant Finance 3900 3 1.000000 salary None

aggregate functions

只是在一定窗口裡實現一些普通的聚合函式。

sql 功能
avg 平均值
sum 求和
min 最小值
max 最大值
spark.sql("""
SELECT
    name,sum(salary) over(partition by department order by salary) as sum,avg(salary) over(partition by department order by salary) as avg,min(salary) over(partition by department order by salary) as min,max(salary) over(partition by department order by salary) as max    
FROM salary
""").toPandas()
name department salary index sum avg min max
0 Patricio Marketing 2500 1 2500 2500.0 2500 2500
1 Jeff Marketing 3100 2 5600 2800.0 2500 3100
2 Kyoichi Sales 3000 1 3000 3000.0 3000 3000
3 Georgi Sales 4200 2 11400 3800.0 3000 4200
4 Guoxiang Sales 4200 3 11400 3800.0 3000 4200
5 Tom Sales 4500 4 15900 3975.0 3000 4500
6 Berni Sales 4700 5 20600 4120.0 3000 4700
7 Parto Finance 2700 1 2700 2700.0 2700 2700
8 Anneke Finance 3300 2 6000 3000.0 2700 3300
9 Sumant Finance 3900 3 9900 3300.0 2700 3900

視窗子句

ROWS/RANG視窗子句: 用於控制視窗的尺寸邊界,有兩種(ROW,RANGE)

  • ROWS: 物理視窗,資料篩選基於排序後的index
  • RANGE: 邏輯視窗,資料篩選基於值

語法:OVER (PARTITION BY … ORDER BY … frame_type BETWEEN start AND end)

有以下5種邊界

  • CURRENT ROW:
  • UNBOUNDED PRECEDING: 分割槽第一行
  • UNBOUNDED FOLLOWING: 分割槽最後一行
  • n PRECEDING: 前n行
  • n FOLLOWING: 後n行
  • UNBOUNDED: 起點
spark.sql("""
SELECT
    name,row_number() over(partition by department order by salary rows between UNBOUNDED PRECEDING and CURRENT ROW) as index1
FROM salary
""").toPandas()
name department salary index index1
0 Patricio Marketing 2500 1 1
1 Jeff Marketing 3100 2 2
2 Kyoichi Sales 3000 1 1
3 Georgi Sales 4200 2 2
4 Guoxiang Sales 4200 3 3
5 Tom Sales 4500 4 4
6 Berni Sales 4700 5 5
7 Parto Finance 2700 1 1
8 Anneke Finance 3300 2 2
9 Sumant Finance 3900 3 3

混合應用

spark.sql("""
SELECT
    name,salary - (min(salary) over(partition by department order by salary)) as salary_diff 
FROM salary
""").toPandas()
name department salary index salary_diff
0 Patricio Marketing 2500 1 0
1 Jeff Marketing 3100 2 600
2 Kyoichi Sales 3000 1 0
3 Georgi Sales 4200 2 1200
4 Guoxiang Sales 4200 3 1200
5 Tom Sales 4500 4 1500
6 Berni Sales 4700 5 1700
7 Parto Finance 2700 1 0
8 Anneke Finance 3300 2 600
9 Sumant Finance 3900 3 1200

參考