1. 程式人生 > >spark sql視窗函式

spark sql視窗函式

視窗函式是spark sql模組從1.4之後開始支援的,主要用於解決對一組資料進行操作,同時為每條資料返回單個結果,比如計算指定訪問資料的均值、計算累進和或訪問當前行之前行資料等,這些場景使用普通函式實現是比較困難的。
視窗函式計算的一組行,被稱為Frame。每一個被處理的行都有一個唯一的frame相關聯。

Spark SQL支援三類視窗函式:排名函式、分析函式和聚合函式。以下彙總了Spark SQL支援的排名函式和分析函式。對於聚合函式來說,普通的聚合函式都可以作為視窗聚合函式使用

類別 SQL DataFrame 含義
排名函式 rank rank 為相同組的資料計算排名,如果相同組中排序欄位相同,當前行的排名值和前一行相同;如果相同組中排序欄位不同,則當前行的排名值為該行在當前組中的行號;因此排名序列會出現間隙
排名函式 dense_rank denseRank 為相同組內資料計算排名,如果相同組中排序欄位相同,當前行的排名值和前一行相同;如果相同組中排序欄位不同,則當前行的排名值為前一行排名值加1;排名序列不會出現間隙
排名函式 percent_rank percentRank 該值的計算公式(組內排名-1)/(組內行數-1),如果組內只有1行,則結果為0
排名函式 ntile ntile 將組內資料排序然後按照指定的n切分成n個桶,該值為當前行的桶號(桶號從1開始)
排名函式 row_number rowNumber 將組內資料排序後,該值為當前行在當前組內的從1開始的遞增的唯一序號值
分析函式 cume_dist cumeDist 該值的計算公式為:組內小於等於當前行值的行數/組內總行數
分析函式 lag lag 用法:lag(input, [offset, [default]]),計算組內當前行按照排序欄位排序的之前offset行的input列的值,如果offset大於當前視窗(組內當前行之前行數)則返回default值,default值預設為null
分析函式 lead lead 用法:lead(input, [offset, [default]]),計算組內當前行按照排序欄位排序的之後offset行的input列的值,如果offset大於當前視窗(組內當前行之後行數)則返回default值,default值預設為null

spark支援兩種方式使用視窗函式:

  • 在SQL語句中的支援的函式中新增OVER語句。例如avg(revenue) OVER (…)
  • 使用DataFrame API在支援的函式呼叫over()方法。例如rank().over(…)。

當一個函式被作為視窗函式使用時,需要為該視窗函式定義相關的視窗規範。視窗規範定義了哪些行會包括到給定輸入行相關聯的幀(frame)中。視窗規範包括三部分:

  • 分割槽規範:定義哪些行屬於相同分割槽,這樣在對幀中資料排序和計算之前相同分割槽的資料就可以被收集到同一臺機器上。如果沒有指定分割槽規範,那麼所有資料都會被收集到單個機器上處理。
  • 排序規範:定義同一個分割槽中所有資料的排序方式,從而確定了給定行在他所屬分割槽中的位置
  • 幀規範:指定哪些行會被當前輸入行的幀包括,通過其他行對於當前行的相對位置實現。

    如果使用sql語句的話,PARTITION BY關鍵字用來為分割槽規範定義分割槽表示式、 ORDER BY關鍵字用來為排序規範定義排序表示式。格式:OVER (PARTITION BY ... ORDER BY ... )
    如果使用DataFrame API的話,API提供了函式來定義視窗規範。例項如下:
import org.apache.spark.sql.expressions.Window

val windowSpec = Window.partitionBy(...).orderBy(...)

為了分割槽和排序操作,需要定義幀的開始邊界、結束邊界和幀的型別,這也是一個幀規範的三部分。一共有五種邊界:UNBOUNDED PRECEDING(分割槽第一行),UNBOUNDED FOLLOWING(分割槽最後一行),CURRENT ROW<value> PRECEDING(當前行之前行)和<value> FOLLOWING(當前行之後行)。有兩種幀型別:ROW幀和RANGE幀。

ROW幀是基於當前輸入行的位置的物理偏移量,比如:CURRENT ROW被用作邊界表示當前輸入行,<value> PRECEDING<value> FOLLOWING分別表示出現在當前行之前和之後的行數。例如SQL語句ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING表示一個包括當前行、當前行之前1行和之後1行的幀。圖示如下:
image
RANGE幀是基於當前行位置的邏輯偏移。邏輯偏移為當前輸入行的排序表示式的值和幀邊界行的排序表示式的值之差。也因為這種定義,使用RANGE幀時,只能允許使用單個排序表示式。對於RANGE幀,就邊界計算而言,和當前輸入行的排序表示式的值相同的行都被認為是相同行。例如:排序表示式為revenue,SQL語句為RANGE BETWEEN 2000 PRECEDING AND 1000 FOLLOWING,則邊界為[current revenue value - 2000, current revenue value + 1000]。圖示如下:
image
使用DataFrame API,可以使用以下方法實現ROW幀和RANGE幀

Window.partitionBy(...).orderBy(...).rowBetween(start, end)
Window.partitionBy(...).orderBy(...).rangeBetween(start, end)

下面體驗以下視窗函式的強大。
比如有以下產品收入表:
image

如果我們需要統計每個類別最暢銷和次暢銷的產品,首先需要基於產品的收入對相同類別的產品進行排名,然後基於排名取出最暢銷和次暢銷的產品。使用視窗函式實現的sql語句如下:

SELECT product, category, revenue 
FROM (
    SELECT produce, category, revenue, dense_rank() OVER (PARTITION BY category ORDER BY revenue DESC) as rank 
    FROM productRevenue) tmp 
WHERE rank <= 2

執行結果如下:
image

如果需要統計相同類別中每種產品與該類別中最暢銷產品收入差距又該如何呢?首先為了計算差距,需要先找到每個類別中收入最高的產品

SELECT product, category, revenue, (max(revenue) OVER(PARTITION BY category ORDER BY revenue DESC) - revenue) as revenue_diff
FROM produceRevenue