Flink編碼:FlinkSQL全面指南
文章目錄
1. FlinkSQL定位
通過SQL開發人員可以只關注業務邏輯,學習成本低,容易理解,而且內建了很多的優化規則,可以簡化開發複雜度,通過SQL還能在高層應用上實現真正的批流一體。
Hive SQL,Spark SQL,Flink SQL給開發人員帶來了極大便捷,讓開發人員只需關注業務場景,而無需關注複雜的API編寫。
2. 流與表的對偶性
以下是利用FlinkSQL做CDC的場景,mysql表可以轉成CDC流,CDC流又可以落盤成mysql表。
表的重要屬性:schema,data,DML操作時間/時間欄位
流的重要屬性:schema(debezium、canal、ogg),data,processTime/eventTime
參考:https://developer.aliyun.com/article/667566?spm=a2c6h.13262185.0.0.36a07e18Wn3kay
3. 持續查詢/增量計算
流上的資料來源源不斷的流入,我們既不能等所有事件流入結束(永遠不會結束)再計算,也不會每次來一條事件就像傳統資料庫一樣將全部事件集合重新整體計算一次。
在持續查詢的計算過程中,Apache Flink採用增量計算的方式,也就是每次計算都會將計算結果儲存到state中,下一條事件到來的時候利用上次計算的結果和當前的事件進行聚合計算。
如以下案例:
// 求訂單總數和所有訂單的總金額
select count(id) as cnt,sum(amount)as sumAmount from order_tab;
將count和sum更新到在state中;當最新一條資料到來時,count+1,sum+amount。
4. 回撤流
Flink中,Kafka Source/Sink是非回撤流,Group By是回撤流。所謂回撤流,就是可以更新歷史資料的流,更新歷史資料並不是將發往下游的歷史資料進行更改,要知道,已經發往下游的訊息是追不回來的。更新歷史資料的含義是,在得知某個Key(接在Key BY / Group By後的欄位)對應資料已經存在的情況下,如果該Key對應的資料再次到來,會生成一條delete訊息和一條新的insert訊息發往下游。
聚合運算元和Sink運算元都有回撤的概念,但是又不盡相同。聚合運算元的回撤用於聚合狀態的更新,保證了FlinkSQL持續查詢/增量查詢的正確語義;Sink運算元的回撤則更多的是應用於CDC場景,保證了CDC場景下的append、upsert、retract等語義的正確性。
詳情見上一篇文章
5. Flink 1.11關於SQL的增強
5.1 DDL寫法
對 DDL 的 WITH 引數相對於 1.10 版本做了簡化,從使用者視角看上就是簡化和規範了引數
Old Key (Flink 1.10) | New Key (Flink 1.11) |
---|---|
connector.type | connector |
connector.url | url |
connector.table | table-name |
connector.driver | driver |
connector.username | username |
connector.password | password |
connector.read.partition.column | scan.partition.column |
connector.read.partition.num | scan.partition.num |
connector.read.partition.lower-bound | scan.partition.lower-bound |
connector.read.partition.upper-bound | scan.partition.upper-bound |
connector.read.fetch-size | scan.fetch-size |
connector.lookup.cache.max-rows | lookup.cache.max-rows |
connector.lookup.cache.ttl | lookup.cache.ttl |
connector.lookup.max-retries | lookup.max-retries |
connector.write.flush.max-rows | sink.buffer-flush.max-rows |
connector.write.flush.interval | sink.buffer-flush.interval |
connector.write.max-retries | sink.max-retries |
5.2 主鍵
Upsert操作需要主鍵約束來進行更新,Flink 1.10之前通過Group By語句推斷主鍵,這種方式有一些情況是推斷不出主鍵的,比如Group By UDF(id)
,Flink1.11引入了主鍵約束語法
-- Flink 1.10
create talbe MyUserTable(
id BIGINT, name STRING, age INT
) WITH (
'connector.type'='jdbc',
'connector.url'='jdbc:mysql://localhost:3306/mydb',
'connector.table'='users'
);
-- upsert write
insert into MyUserTable
select id, max(name), max(age) from T group by id;
-- Flink 1.11
create talbe MyUserTable(
id BIGINT, name STRING, age INT,
-- 設定主鍵,ENFORCED表示主鍵由使用者確保正確,flink不去資料來源做校驗(測了一下,這裡填錯的也沒關係,似乎只是一個啟用upsert的開關而已)
PRIMARY KEY key(id) NOT ENFORCED
) WITH (
'connector'='jdbc',
'url'='jdbc:mysql://localhost:330 6/mydb',
'table-name'='users'
);
-- upsert write
insert into MyUserTable
select id, max(name), max(age) from T group by id;
5.3 Catalog
Flinksql和Table API在calcite validate階段會對sql進行語法校驗,此時需要用到catalog中維護的庫、表、UDF、欄位及型別等元資料。
使用者需要手動創造DDL語句,如果表schema發生變化,則需要使用者停止任務並修改DDL語句,比較繁瑣。JDBC catalog通過JDBC協議連線關係型資料庫,Flink可以自動檢索表,不需要使用者手動輸入和修改。
Flink任務預設會創造一個記憶體中的catalog名為default_catalog,使用catalog自動更新元資料的場景還不多見,大部分場景還是通過ddl來定義表的元資料並存入default_catalog。因此目前flink1.11只支援postgresql這個唯一的JDBC catalog實現,且僅支援以下方法
// The supported methods by Postgres Catalog.
PostgresCatalog.databaseExists(String databaseName)
PostgresCatalog.listDatabases()
PostgresCatalog.getDatabase(String databaseName)
PostgresCatalog.listTables(String databaseName)
PostgresCatalog.getTable(ObjectPath tablePath)
PostgresCatalog.tableExists(ObjectPath tablePath)
參考:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/catalogs.html
https://www.jianshu.com/p/aef22cf8e33f
https://blog.csdn.net/qq_31975963/article/details/109401740
6. JOIN運算元
參考:https://www.cnblogs.com/cnki/p/10274532.html
6.1 雙流JOIN原理
JOIN運算元會開闢左右兩個State進行資料儲存,左右兩邊的資料到來時候,進行如下操作:
- LeftEvent到來儲存到LState,RightEvent到來的時候儲存到RState;
- LeftEvent會去RightState進行JOIN,併發出所有JOIN之後的Event到下游;
- RightEvent會去LeftState進行JOIN,併發出所有JOIN之後的Event到下游。
6.1.1 Inner Join
- 右流比左流快,當資料1、2、3到來時,存入Rstate,且發現Lstate中沒有可以join的資料
- 資料4到達,存入Lstate,並與Rstate中的所有資料進行join
- 資料5到達,存入Rstate,並與Lstate中的所有資料進行join
6.1.2 Left Join
left join與inner join原理基本類似,都是維護了左右兩個state。
如果左流第一條資料先到達,發現Rstate中沒有資料時,會將右流欄位補充null值往下游傳送,當右流第一次且僅在第一次發現Lstate中有可以join的資料時,會發送回撤訊息,撤回含有null值的記錄;
如果右流第一條資料先到達,則存入Rstate,不往下游傳送資料。
注意:
雙流left join的回撤僅發生在左流第一條資料優先到達且右流第一次發現Lstate中有資料可以join的情況下,接下來並不會產生任何回撤訊息,這裡與聚合運算元的回撤有些區別
6.1.3 State資料結構
Map<JoinKey, Map<rowData, count>>
- 第一級MAP的key是Join key,比如示例中的P001, value是流上面的所有完整事件;
- 第二級MAP的key是行資料,比如示例中的P001, 2,value是相同事件值的個數。
資料結構的利用:
- 記錄重複記錄 - 利用第二級MAP的value記錄重複記錄的個數,這樣大大減少儲存和讀取
- 正向記錄和撤回記錄 - 利用第二級MAP的value記錄,當count=0時候刪除該元素
- 判斷右邊是否產生撤回記錄 - 根據第一級MAP的value的size來判斷是否產生撤回,只有size由0變成1的時候(右流中第一條和左可以JOIN的事件)才產生撤回
參考: https://developer.aliyun.com/article/672760
7. 視窗
flink中有兩種window,一種是OverWindow,即傳統資料庫的標準開窗,每一個元素都對應一個視窗。一種是GroupWindow,目前在SQL中GroupWindow都是基於時間進行視窗劃分的(datastream api中既可以基於時間,又可以基於資料條目)。
7.1 OverWindow
每來一條資料,就建一個視窗,具體有分為基於資料條目的和基於時間兩種OverWindow。
7.1.1 基於資料條目的overwindow
SELECT
agg1(col1) OVER(
[PARTITION BY (value_expression1,..., value_expressionN)]
ORDER BY timeCol
ROWS
BETWEEN (UNBOUNDED | rowCount) PRECEDING AND CURRENT ROW) AS colName,
...
FROM Tab1
SELECT
itemID,
itemType,
onSellTime,
price,
MAX(price) OVER (
PARTITION BY itemType
ORDER BY onSellTime
ROWS BETWEEN 2 preceding AND CURRENT ROW) AS maxPrice
FROM item_tab
7.1.2 基於時間的overwindow
SELECT
agg1(col1) OVER(
[PARTITION BY (value_expression1,..., value_expressionN)]
ORDER BY timeCol
RANGE
BETWEEN (UNBOUNDED | timeInterval) PRECEDING AND CURRENT ROW) AS colName,
...
FROM Tab1
SELECT
itemID,
itemType,
onSellTime,
price,
MAX(price) OVER (
PARTITION BY itemType
ORDER BY rowtime
RANGE BETWEEN INTERVAL '2' MINUTE preceding AND CURRENT ROW) AS maxPrice
FROM item_tab
7.2 GroupWindow
與OverWindow每條資料對應一個視窗不同,GroupWindow通過Window中的assigner元件進行視窗的劃分以及資料落入視窗的選擇。詳情請參看
與DataStream API不同,目前Flink SQL只支援基於時間的GroupWindow,不支援基於資料條目的GroupWindow。
7.2.1 滾動視窗
SELECT
[gk],
[TUMBLE_START(timeCol, size)],
[TUMBLE_END(timeCol, size)],
agg1(col1),
...
aggn(colN)
FROM Tab1
GROUP BY [gk], TUMBLE(timeCol, size)
SELECT
region,
TUMBLE_START(rowtime, INTERVAL '2' MINUTE) AS winStart,
TUMBLE_END(rowtime, INTERVAL '2' MINUTE) AS winEnd,
COUNT(region) AS pv
FROM pageAccess_tab
GROUP BY region, TUMBLE(rowtime, INTERVAL '2' MINUTE)
7.2.2 滑動視窗
SELECT
[gk],
[HOP_START(timeCol, slide, size)] ,
[HOP_END(timeCol, slide, size)],
agg1(col1),
...
aggN(colN)
FROM Tab1
GROUP BY [gk], HOP(timeCol, slide, size)
SELECT
HOP_START(rowtime, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE) AS winStart,
HOP_END(rowtime, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE) AS winEnd,
SUM(accessCount) AS accessCount
FROM pageAccessCount_tab
GROUP BY HOP(rowtime, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE)
7.2.3 Session視窗
SELECT
[gk],
SESSION_START(timeCol, gap) AS winStart,
SESSION_END(timeCol, gap) AS winEnd,
agg1(col1),
...
aggn(colN)
FROM Tab1
GROUP BY [gk], SESSION(timeCol, gap)
SELECT
region,
SESSION_START(rowtime, INTERVAL '3' MINUTE) AS winStart,
SESSION_END(rowtime, INTERVAL '3' MINUTE) AS winEnd,
COUNT(region) AS pv
FROM pageAccessSession_tab
GROUP BY region, SESSION(rowtime, INTERVAL '3' MINUTE)
參考:
https://developer.aliyun.com/article/670202?spm=a2c6h.13262185.0.0.91027e18Q8W1oA
https://www.cnblogs.com/woodytu/p/4709020.html