Flink基礎(63):FLINK SQL(40) 視窗函式(4)會話視窗
阿新 • • 發佈:2021-08-07
本文為您介紹如何使用實時計算Flink版會話視窗函式。
什麼是會話視窗
會話視窗(SESSION)通過SESSION活動來對元素進行分組。會話視窗與滾動視窗和滑動視窗相比,沒有視窗重疊,沒有固定視窗大小。相反,當它在一個固定的時間週期內不再收到元素,即會話斷開時,該視窗就會關閉。
會話視窗通過一個間隔時間(Gap)來配置,這個間隔定義了非活躍週期的長度。例如,一個表示滑鼠單擊活動的資料流可能具有長時間的空閒時間,並在兩段空閒之間散佈著高濃度的單擊。如果資料在指定的間隔(Gap)之後到達,則會開始一個新的視窗。
會話視窗示例如下圖,每個Key由於不同的資料分佈,形成了不同的Window。會話視窗函式語法
SESSION(<time-attr>, <gap-interval>)
<gap-interval>: INTERVAL 'string' timeUnit
說明<time-attr>
引數必須是資料流中的一個合法的時間屬性欄位,指定為Processing Time或Event Time,詳情請參見概述,瞭解如何定義時間屬性和Watermark。
會話視窗標識函式
使用標識函式選出視窗的起始時間或者結束時間,視窗的時間屬性用於下級Window的聚合。
視窗標識函式 | 返回型別 | 描述 |
---|---|---|
SESSION_START(<time-attr>, <gap-interval>) |
Timestamp | 返回視窗的起始時間(包含邊界)。例如[00:10,00:15) 的視窗,返回00:10 ,即為此會話視窗內第一條記錄的時間。 |
SESSION_END(<time-attr>, <gap-interval>) |
Timestamp | 返回視窗的結束時間(包含邊界)。例如[00:00,00:15) 的視窗,返回00:15 ,即為此會話視窗內最後一條記錄的時間+<gap-interval> 。 |
SESSION_ROWTIME(<time-attr>, <gap-interval>) |
Timestamp(rowtime-attr) | 返回視窗的結束時間(不包含邊界)。例如[00:00,00:15) 的視窗,返回00:14:59.999 。返回值是一個rowtime attribute,也就是可以基於該欄位進行時間型別的操作,如級聯視窗。該引數只能用於基於Event Time的Window。 |
SESSION_PROCTIME(<time-attr>, <gap-interval>) |
Timestamp(rowtime-attr) | 返回視窗的結束時間(不包含邊界)。例如[00:00,00:15) 的視窗,返回00:14:59.999 。返回值是一個Proctime Attribute,也就是可以基於該欄位進行時間型別的操作,如級聯視窗。該引數只能用於基於Processing Time的Window。 |
示例
統計每個使用者在每個活躍會話期間的單擊次數,會話超時時長為30秒。
- 測試資料
username (VARCHAR) click_url (VARCHAR) ts (TIMESTAMP) Jark http://taobao.com/xxx
2017-10-10 10:00:00.0
Jark http://taobao.com/xxx
2017-10-10 10:00:10.0
Jark http://taobao.com/xxx
2017-10-10 10:00:49.0
Jark http://taobao.com/xxx
2017-10-10 10:01:05.0
Jark http://taobao.com/xxx
2017-10-10 10:01:58.0
Timo http://taobao.com/xxx
2017-10-10 10:02:10.0
- 測試語句
CREATE TABLE user_clicks( username varchar, click_url varchar, ts timeStamp, WATERMARK wk FOR ts as withOffset(ts, 2000) -- 為rowtime定義watermark ) with ( type='datahub', ... ); CREATE TABLE session_output( window_start TIMESTAMP, window_end TIMESTAMP, username VARCHAR, clicks BIGINT ) with ( type='rds', ... ); INSERT INTO session_output SELECT SESSION_START(ts, INTERVAL '30' SECOND), SESSION_END(ts, INTERVAL '30' SECOND), username, COUNT(click_url) FROM user_clicks GROUP BY SESSION(ts, INTERVAL '30' SECOND), username;
- 測試結果
window_start (TIMESTAMP) window_end (TIMESTAMP) username (VARCHAR) clicks (BIGINT) 2017-10-10 10:00:00.0
2017-10-10 10:00:40.0
Jark 2 2017-10-10 10:00:49.0
2017-10-10 10:01:35.0
Jark 2 2017-10-10 10:01:58.0
2017-10-10 10:02:28.0
Jark 1 2017-10-10 10:02:10.0
2017-10-10 10:02:40.0
Timo 1