Flink基礎(62):FLINK SQL(39) 視窗函式(3)滑動視窗
阿新 • • 發佈:2021-08-07
本文為您介紹如何使用實時計算滑動視窗函式。
說明實時計算滑動視窗(HOP)暫不支援與LAST_VALUE、FIRST_VALUE或TopN函式共同使用。什麼是滑動視窗
滑動視窗(HOP),也被稱作Sliding Window。不同於滾動視窗,滑動視窗的視窗可以重疊。
滑動視窗有兩個引數:slide和size。slide為每次滑動的步長,size為視窗的大小。
- slide < size,則視窗會重疊,每個元素會被分配到多個視窗。
- slide = size,則等同於滾動視窗(TUMBLE)。
- slide > size,則為跳躍視窗,視窗之間不重疊且有間隙。
滑動視窗函式語法
HOP函式用在group by子句中,用來定義滑動視窗。
HOP(<time-attr>, <slide-interval>,<size-interval>)
<slide-interval>: INTERVAL 'string' timeUnit
<size-interval>: INTERVAL 'string' timeUnit
說明
<time-attr>
引數必須是流中的一個合法的時間屬性欄位,指定為Processing Time或Event Time。 請參見 視窗函式概述,瞭解如何定義時間屬性和Watermark。
滑動視窗標識函式
使用滑動視窗標識函式選出視窗的起始時間或者結束時間,視窗的時間屬性用於下級Window的聚合。視窗標識函式 | 返回型別 | 描述 |
---|---|---|
HOP_START(<time-attr>, <slide-interval>, <size-interval>) |
TIMESTAMP | 返回視窗的起始時間(包含邊界)。例如[00:10, 00:15) 視窗,返回00:10 。 |
HOP_END(<time-attr>, <slide-interval>, <size-interval>) |
TIMESTAMP | 返回視窗的結束時間(包含邊界)。例如[00:00, 00:15) 視窗,返回00:15 。 |
HOP_ROWTIME(<time-attr>, <slide-interval>, <size-interval>) |
TIMESTAMP(rowtime-attr) | 返回視窗的結束時間(不包含邊界)。例如[00:00, 00:15) 視窗,返回00:14:59.999 。返回值是一個rowtime attribute,即可以基於該欄位做時間型別的操作,例如級聯視窗,只能用在基於event time的window上。 |
HOP_PROCTIME(<time-attr>, <slide-interval>, <size-interval>) |
TIMESTAMP(rowtime-attr) | 返回視窗的結束時間(不包含邊界)。例如[00:00, 00:15) 視窗,返回00:14:59.999 。返回值是一個proctime attribute,即可以基於該欄位做時間型別的操作,例如級聯視窗,只能用在基於processing time的window上。 |
示例
統計每個使用者過去1分鐘的單擊次數,每30秒更新1次,即1分鐘的視窗,30秒滑動1次。- 測試資料
username(VARCHAR) click_url(VARCHAR) ts(TIMESTAMP) Jark http://taobao.com/xxx
2017-10-10 10:00:00.0
Jark http://taobao.com/xxx
2017-10-10 10:00:10.0
Jark http://taobao.com/xxx
2017-10-10 10:00:49.0
Jark http://taobao.com/xxx
2017-10-10 10:01:05.0
Jark http://taobao.com/xxx
2017-10-10 10:01:58.0
Timo http://taobao.com/xxx
2017-10-10 10:02:10.0
- 測試語句
CREATE TABLE user_clicks ( username VARCHAR, click_url VARCHAR, ts TIMESTAMP, WATERMARK wk FOR ts AS WITHOFFSET (ts, 2000)--為rowtime定義Watermark。 ) WITH ( TYPE = 'datahub', ...); CREATE TABLE hop_output ( window_start TIMESTAMP, window_end TIMESTAMP, username VARCHAR, clicks BIGINT ) WITH (TYPE = 'rds', ...); INSERT INTO hop_output SELECT HOP_START (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE), HOP_END (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE), username, COUNT (click_url) FROM user_clicks GROUP BY HOP (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE), username
- 測試結果
HOP視窗無法讀取資料進入的時間,第一個視窗的開啟時間會前移。前移時長=視窗時長-滑動步長,示例如下表。window_start (TIMESTAMP) window_end (TIMESTAMP) username (VARCHAR) clicks (BIGINT) 2017-10-10 09:59:30.0
2017-10-10 10:00:30.0
Jark 2 2017-10-10 10:00:00.0
2017-10-10 10:01:00.0
Jark 3 2017-10-10 10:00:30.0
2017-10-10 10:01:30.0
Jark 2 2017-10-10 10:01:00.0
2017-10-10 10:02:00.0
Jark 2 2017-10-10 10:01:30.0
2017-10-10 10:02:30.0
Jark 1 2017-10-10 10:02:00.0
2017-10-10 10:03:00.0
Timo 1 2017-10-10 10:02:30.0
2017-10-10 10:03:30.0
Timo 1 視窗時長(秒) 滑動步長(秒) Event Time 第一個視窗StartTime 第一個視窗EndTime 120 30 2019-07-31 10:00:00.0
2019-07-31 09:58:30.0
2019-07-31 10:00:30.0
60 10 2019-07-31 10:00:00.0
2019-07-31 09:59:10.0
2019-07-31 10:00:10.0