1. 程式人生 > 實用技巧 >Flink基礎(十五):Table API 和 Flink SQL(四)視窗(Windows)

Flink基礎(十五):Table API 和 Flink SQL(四)視窗(Windows)

  時間語義,要配合視窗操作才能發揮作用。最主要的用途,當然就是開視窗、根據時間段做計算了。下面我們就來看看 Table API 和 SQL 中,怎麼利用時間欄位做視窗操作。 在 Table API 和 SQL 中,主要有兩種視窗:Group Windows 和 Over Windows

1 分組視窗(Group Windows)

  分組視窗(Group Windows)會根據時間或行計數間隔,將行聚合到有限的組(Group)中,並對每個組的資料執行一次聚合函式。   Table API 中的 Group Windows 都是使用.window(w:GroupWindow)子句定義的,並且必須由 as 子句指定一個別名。為了按視窗對錶進行分組,視窗的別名必須在 group by 子句 中,像常規的分組欄位一樣引用。
val table = input
 .window([w: GroupWindow] as 'w) // 定義視窗,別名 w
 .groupBy('w, 'a) //
以屬性 a 和視窗 w 作為分組的 key .select('a, 'b.sum) // 聚合欄位 b 的值,求和
或者,還可以把視窗的相關資訊,作為欄位新增到結果表中:
val table = input
 .window([w: GroupWindow] as 'w) 
 .groupBy('w, 'a) 
 .select('a, 'w.start, 'w.end, 'w.rowtime, 'b.count)
  Table API 提供了一組具有特定語義的預定義 Window 類,這些類會被轉換為底層DataStream 或 DataSet 的視窗操作。   Table API 支援的視窗定義,和我們熟悉的一樣,主要也是三種:滾動(Tumbling)、滑動(Sliding)和會話(Session)。 1.1 滾動視窗   滾動視窗(Tumbling windows)要用 Tumble 類來定義,另外還有三個方法:   ⚫ over:定義視窗長度   ⚫ on:用來分組(按時間間隔)或者排序(按行數)的時間欄位   ⚫ as:別名,必須出現在後面的 groupBy 中 程式碼如下:
//
Tumbling Event-time Window(事件時間欄位 rowtime) .window(Tumble over 10.minutes on 'rowtime as 'w) // Tumbling Processing-time Window(處理時間欄位 proctime) .window(Tumble over 10.minutes on 'proctime as 'w) // Tumbling Row-count Window (類似於計數視窗,按處理時間排序,10 行一組) .window(Tumble over 10.rows on 'proctime as 'w)
1.2 滑動視窗 滑動視窗(Sliding windows)要用 Slide 類來定義,另外還有四個方法: ⚫ over:定義視窗長度 ⚫ every:定義滑動步長 ⚫ on:用來分組(按時間間隔)或者排序(按行數)的時間欄位 ⚫ as:別名,必須出現在後面的 groupBy 中 程式碼如下:
//
Sliding Event-time Window .window(Slide over 10.minutes every 5.minutes on 'rowtime as 'w) // Sliding Processing-time window .window(Slide over 10.minutes every 5.minutes on 'proctime as 'w) // Sliding Row-count window .window(Slide over 10.rows every 5.rows on 'proctime as 'w)
1.3 會話視窗 會話視窗(Session windows)要用 Session 類來定義,另外還有三個方法: ⚫ withGap:會話時間間隔 ⚫ on:用來分組(按時間間隔)或者排序(按行數)的時間欄位 ⚫ as:別名,必須出現在後面的 groupBy 中 程式碼如下:
// Session Event-time Window
.window(Session withGap 10.minutes on 'rowtime as 'w)
// Session Processing-time Window 
.window(Session withGap 10.minutes on 'proctime as 'w)

2 Over Windows

  Over window 聚合是標準 SQL 中已有的(Over 子句),可以在查詢的 SELECT 子句中定義。Over window 聚合,會針對每個輸入行,計算相鄰行範圍內的聚合。Over windows   使用.window(w:overwindows*)子句定義,並在 select()方法中通過別名來引用。   比如這樣:
val table = input
 .window([w: OverWindow] as 'w)
 .select('a, 'b.sum over 'w, 'c.min over 'w)
  Table API 提供了 Over 類,來配置 Over 視窗的屬性。可以在事件時間或處理時間,以及指定為時間間隔、或行計數的範圍內,定義 Over windows。   無界的over window是使用常量指定的。也就是說,時間間隔要指定UNBOUNDED_RANGE,或者行計數間隔要指定 UNBOUNDED_ROW。而有界的 over window 是用間隔的大小指定的。 實際程式碼應用如下: 1) 無界的 over window
// 無界的事件時間 over window (時間欄位 "rowtime")
.window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w)
//無界的處理時間 over window (時間欄位"proctime")
.window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_RANGE as 'w)
// 無界的事件時間 Row-count over window (時間欄位 "rowtime")
.window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 'w)
//無界的處理時間 Row-count over window (時間欄位 "rowtime")
.window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_ROW as 'w)
2) 有界的 over window
// 有界的事件時間 over window (時間欄位 "rowtime",之前 1 分鐘)
.window(Over partitionBy 'a orderBy 'rowtime preceding 1.minutes as 'w)
// 有界的處理時間 over window (時間欄位 "rowtime",之前 1 分鐘)
.window(Over partitionBy 'a orderBy 'proctime preceding 1.minutes as 'w)
// 有界的事件時間 Row-count over window (時間欄位 "rowtime",之前 10 行)
.window(Over partitionBy 'a orderBy 'rowtime preceding 10.rows as 'w)
// 有界的處理時間 Row-count over window (時間欄位 "rowtime",之前 10 行)
.window(Over partitionBy 'a orderBy 'proctime preceding 10.rows as 'w)

3 SQL 中視窗的定義

  我們已經瞭解了在 Table API 裡 window 的呼叫方式,同樣,我們也可以在 SQL 中直接加入視窗的定義和使用。 3.1 Group Windows   Group Windows 在 SQL 查詢的 Group BY 子句中定義。與使用常規 GROUP BY 子句的查詢一樣,使用 GROUP BY 子句的查詢會計算每個組的單個結果行。   SQL 支援以下 Group 視窗函式:     ⚫TUMBLE(time_attr, interval)定義一個滾動視窗,第一個引數是時間欄位,第二個引數是視窗長度。     ⚫HOP(time_attr, interval, interval)定義一個滑動視窗,第一個引數是時間欄位,第二個引數是視窗滑動步長,第三個是視窗長度。     ⚫SESSION(time_attr, interval)定義一個會話視窗,第一個引數是時間欄位,第二個引數是視窗間隔(Gap)。另外還有一些輔助函式,可以用來選擇 Group Window 的開始和結束時間戳,以及時間     屬性。這裡只寫 TUMBLE_*,滑動和會話視窗是類似的(HOP_*,SESSION_*)。     ⚫TUMBLE_START(time_attr, interval)     ⚫TUMBLE_END(time_attr, interval)     ⚫TUMBLE_ROWTIME(time_attr, interval)     ⚫ TUMBLE_PROCTIME(time_attr, interval) 3.2 Over Windows   由於 Over 本來就是 SQL 內建支援的語法,所以這在 SQL 中屬於基本的聚合操作。所有聚合必須在同一視窗上定義,也就是說,必須是相同的分割槽、排序和範圍。目前僅支援在當 前行範圍之前的視窗(無邊界和有邊界)。   注意,ORDER BY 必須在單一的時間屬性上指定。   程式碼如下:
SELECT COUNT(amount) OVER (
 PARTITION BY user
 ORDER BY proctime
 ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
FROM Orders
// 也可以做多個聚合
SELECT COUNT(amount) OVER w, SUM(amount) OVER w
FROM Orders
WINDOW w AS (
 PARTITION BY user
 ORDER BY proctime
 ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)

4 程式碼練習(以分組滾動視窗為例)

我們可以綜合學習過的內容,用一段完整的程式碼實現一個具體的需求。例如,可以開一個滾動視窗,統計 10 秒內出現的每個 sensor 的個數。 程式碼如下:
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val streamFromFile: DataStream[String] = env.readTextFile("sensor.txt")
val dataStream: DataStream[SensorReading] = streamFromFile
.map( data => {
val dataArray = data.split(",")
SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, 
dataArray(2).trim.toDouble)
} )
.assignTimestampsAndWatermarks( new 
BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) 
{
override def extractTimestamp(element: SensorReading): Long = 
element.timestamp * 1000L
} )
val settings: EnvironmentSettings = EnvironmentSettings
.newInstance()
.useOldPlanner()
.inStreamingMode()
.build()
val tableEnv: StreamTableEnvironment = 
StreamTableEnvironment.create(env, settings)
val dataTable: Table = tableEnv
.fromDataStream(dataStream, 'id, 'temperature, 'timestamp.rowtime)
val resultTable: Table = dataTable
.window(Tumble over 10.seconds on 'timestamp as 'tw)
.groupBy('id, 'tw)
.select('id, 'id.count)
val sqlDataTable: Table = dataTable
.select('id, 'temperature, 'timestamp as 'ts)
val resultSqlTable: Table = tableEnv
.sqlQuery("select id, count(id) from "
+ sqlDataTable 
+ " group by id,tumble(ts,interval '10' second)")
// 把 Table 轉化成資料流
val resultDstream: DataStream[(Boolean, (String, Long))] = resultSqlTable
.toRetractStream[(String, Long)]
resultDstream.filter(_._1).print()
env.execute()
}
View Code