1. 程式人生 > 實用技巧 >二、Flink SQL & Table 程式設計和案例

二、Flink SQL & Table 程式設計和案例

Table API 和 SQL 處於最頂端,是 Flink 提供的高階 API 操作。Flink SQL 是 Flink 實時計算為簡化計算模型,降低使用者使用實時計算門檻而設計的一套符合標準 SQL 語義的開發語言.

一個完整的 Flink Table & SQL Job 也是由 Source、Transformation、Sink 構成:

  • Source 部分來源於外部資料來源,我們經常用的有 Kafka、MySQL 等;
  • Transformation 部分則是 Flink Table & SQL 支援的常用 SQL 運算元,比如簡單的 Select、Groupby 等,當然在這裡也有更為複雜的多流 Join、流與維表的 Join 等;
  • Sink 部分是指的結果儲存比如 MySQL、HBase 或 Kakfa 等。

動態表
與傳統的表 SQL 查詢相比,Flink Table & SQL 在處理流資料時會時時刻刻處於動態的資料變化中,所以便有了一個動態表的概念。
動態表的查詢與靜態表一樣,但是,在查詢動態表的時候,SQL 會做連續查詢,不會終止。

SELECT/AS/WHERE

SELECT、WHERE 和傳統 SQL 用法一樣,用於篩選和過濾資料,同時適用於 DataStream 和 DataSet。

也可以在 WHERE 條件中使用 =、<、>、<>、>=、<=,以及 AND、OR 等表示式的組合:

SELECT name,age FROM Table where name LIKE '%小明%';
SELECT * FROM Table WHERE age = 20;
SELECT name, age
FROM Table
WHERE name IN (SELECT name FROM Table2)

GROUP BY / DISTINCT/HAVING

GROUP BY 用於進行分組操作,DISTINCT 用於結果去重。HAVING 和傳統 SQL 一樣,可以用來在聚合函式之後進行篩選。

JOIN 可以用於把來自兩個表的資料聯合起來形成結果表,目前 Flink 的 Join 只支援等值連線。Flink 支援的 JOIN 型別包括:

WINDOW

根據視窗資料劃分的不同,目前 Apache Flink 有如下 3 種:

  • 滾動視窗,視窗資料有固定的大小,視窗中的資料不會疊加;

  • 滑動視窗,視窗資料有固定大小,並且有生成間隔;

  • 會話視窗,視窗資料沒有固定的大小,根據使用者傳入的引數進行劃分,視窗資料無疊加;

滾動視窗
SELECT
[gk], [TUMBLE_START(timeCol, size)], [TUMBLE_END(timeCol, size)], agg1(col1), ... aggn(colN) FROM Tab1 GROUP BY [gk], TUMBLE(timeCol, size)
#舉例
SELECT user, TUMBLE_START(timeLine, INTERVAL '1' DAY) as winStart, SUM(amount)
FROM Orders GROUP BY TUMBLE(timeLine, INTERVAL '1' DAY), user;
滑動視窗
SELECT
[gk],
[HOP_START(timeCol, slide, size)] ,
[HOP_END(timeCol, slide, size)],
agg1(col1),
...
aggN(colN)
FROM Tab1
GROUP BY [gk], HOP(timeCol, slide, size)
案例
SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product

會話視窗
SELECT
[gk],
SESSION_START(timeCol, gap) AS winStart,
SESSION_END(timeCol, gap) AS winEnd,
agg1(col1),
...
aggn(colN)
FROM Tab1
GROUP BY [gk], SESSION(timeCol, gap)
案例
SELECT user, SESSION_START(rowtime, INTERVAL '1' HOUR) AS sStart
, SESSION_ROWTIME(rowtime, INTERVAL '1' HOUR) AS sEnd, SUM(amount)
FROM Orders GROUP BY SESSION(rowtime, INTERVAL '1' HOUR), user