1. 程式人生 > 其它 >Flink基礎(63):FLINK SQL(40) 視窗函式(4)會話視窗

Flink基礎(63):FLINK SQL(40) 視窗函式(4)會話視窗

本文為您介紹如何使用實時計算Flink版會話視窗函式。

什麼是會話視窗

會話視窗(SESSION)通過SESSION活動來對元素進行分組。會話視窗與滾動視窗和滑動視窗相比,沒有視窗重疊,沒有固定視窗大小。相反,當它在一個固定的時間週期內不再收到元素,即會話斷開時,該視窗就會關閉。

會話視窗通過一個間隔時間(Gap)來配置,這個間隔定義了非活躍週期的長度。例如,一個表示滑鼠單擊活動的資料流可能具有長時間的空閒時間,並在兩段空閒之間散佈著高濃度的單擊。如果資料在指定的間隔(Gap)之後到達,則會開始一個新的視窗。

會話視窗示例如下圖,每個Key由於不同的資料分佈,形成了不同的Window。

會話視窗函式語法

SESSION函式用於在GROUP BY子句中定義會話視窗。
SESSION(<time-attr>, <gap-interval>)
<gap-interval>: INTERVAL 'string' timeUnit
說明<time-attr>引數必須是資料流中的一個合法的時間屬性欄位,指定為Processing Time或Event Time,詳情請參見概述,瞭解如何定義時間屬性Watermark

會話視窗標識函式

使用標識函式選出視窗的起始時間或者結束時間,視窗的時間屬性用於下級Window的聚合。

視窗標識函式返回型別描述
SESSION_START(<time-attr>, <gap-interval>) Timestamp 返回視窗的起始時間(包含邊界)。例如[00:10,00:15)的視窗,返回00:10,即為此會話視窗內第一條記錄的時間。
SESSION_END(<time-attr>, <gap-interval>) Timestamp 返回視窗的結束時間(包含邊界)。例如[00:00,00:15)的視窗,返回00:15,即為此會話視窗內最後一條記錄的時間+<gap-interval>
SESSION_ROWTIME(<time-attr>, <gap-interval>)
Timestamp(rowtime-attr) 返回視窗的結束時間(不包含邊界)。例如[00:00,00:15)的視窗,返回00:14:59.999。返回值是一個rowtime attribute,也就是可以基於該欄位進行時間型別的操作,如級聯視窗。該引數只能用於基於Event Time的Window。
SESSION_PROCTIME(<time-attr>, <gap-interval>) Timestamp(rowtime-attr) 返回視窗的結束時間(不包含邊界)。例如[00:00,00:15)的視窗,返回00:14:59.999。返回值是一個Proctime Attribute,也就是可以基於該欄位進行時間型別的操作,如級聯視窗。該引數只能用於基於Processing Time的Window。

示例

統計每個使用者在每個活躍會話期間的單擊次數,會話超時時長為30秒。

  • 測試資料
    username (VARCHAR)click_url (VARCHAR)ts (TIMESTAMP)
    Jark http://taobao.com/xxx 2017-10-10 10:00:00.0
    Jark http://taobao.com/xxx 2017-10-10 10:00:10.0
    Jark http://taobao.com/xxx 2017-10-10 10:00:49.0
    Jark http://taobao.com/xxx 2017-10-10 10:01:05.0
    Jark http://taobao.com/xxx 2017-10-10 10:01:58.0
    Timo http://taobao.com/xxx 2017-10-10 10:02:10.0
  • 測試語句
CREATE TABLE user_clicks(
username varchar,
click_url varchar,
ts timeStamp,
WATERMARK wk FOR ts as withOffset(ts, 2000) -- 為rowtime定義watermark
) with (
type='datahub',
...
);

CREATE TABLE session_output(
window_start TIMESTAMP,
window_end TIMESTAMP,
username VARCHAR,
clicks BIGINT
) with (
type='rds',
...
);

INSERT INTO session_output
SELECT
SESSION_START(ts, INTERVAL '30' SECOND),
SESSION_END(ts, INTERVAL '30' SECOND),
username,
COUNT(click_url)
FROM user_clicks
GROUP BY SESSION(ts, INTERVAL '30' SECOND), username;
  • 測試結果
    window_start (TIMESTAMP)window_end (TIMESTAMP)username (VARCHAR)clicks (BIGINT)
    2017-10-10 10:00:00.0 2017-10-10 10:00:40.0 Jark 2
    2017-10-10 10:00:49.0 2017-10-10 10:01:35.0 Jark 2
    2017-10-10 10:01:58.0 2017-10-10 10:02:28.0 Jark 1
    2017-10-10 10:02:10.0 2017-10-10 10:02:40.0 Timo 1