Flink SQL和Table程式設計和案例
宣告:本系列部落格為原創,最先發表在拉勾教育,其中一部分為免費閱讀部分。被讀者各種搬運至各大網站。所有其他的來源均為抄襲。
一、概述
1、背景
Flink自身提供了不同級別的抽象來支援開發者進行流式或者批量處理程式,Flink支援4種不同級別的抽象。
Table API 和SQL處於最頂端,是Flink提供的高階API操作。Flink SQL是Flink 實時計算為簡化計算模型,降低使用者使用實時計算門檻而設計的一套符合標準SQL語義的開發語言。
Flink在程式設計模型上提供了DataSet 和 DataStream兩套API,並沒有做到事實上的批流一體。
2、原理
在離線計算領域Hive幾行扛起了半壁江山,它的底層對SQL的解析用到了Apache Calcite,Flink同樣把SQL的解析、優化和執行交給了Calcite。
下圖是一張經典的Flink Table & SQL 實現原理圖
無論是批查詢SQL還是流式查詢SQL,都會經過對應的轉換器Parser轉換成節點數SQLNode tree,然後生成邏輯執行計劃Logical Plan,邏輯執行計劃在經過優化後生成真正可以執行的物理執行計劃,交給DataSet或者DataStream的API去執行。
一個完成的Flink Table & SQL Job也是由Source、Transformations、Sink構成:
- source:來源於外部資料,常用的有Kafka、Mysql等
- Transformations:是Flink Table & SQL 支援的常用SQL運算元,比如簡單的Select、GroupBy等,然後在這裡也有更為複雜的多流Join、流與維表的Join等。
3、動態表
與傳統的表SQL查詢相比,Flink Table & SQL 在處理流資料時會時時刻刻處於動態的資料變化中,所以便又了一個動態表的概念。 動態表的查詢和靜態表一樣,在查詢動態表的時候,SQL會連續查詢,不會終止。舉個簡單的例子,Kafka流作為輸入:
Kafka訊息會源源不斷的解析成一張不斷增長的動態表,我們在動態表上執行的SQL會不斷生成新的動態表作為結果表。
4、Flink Table & SQL運算元和內建函式
Flink Table & SQL的開發一直在進行中,並沒有支援所有場景下的計算邏輯。
常用運算元
目前Flink SQL支援的語法主要如下:
query:
values
| {
select
| selectWithoutFrom
| query UNION [ ALL ] query
| query EXCEPT query
| query INTERSECT query
}
[ ORDER BY orderItem [, orderItem ]* ]
[ LIMIT { count | ALL } ]
[ OFFSET start { ROW | ROWS } ]
[ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]
orderItem:
expression [ ASC | DESC ]
select:
SELECT [ ALL | DISTINCT ]
{ * | projectItem [, projectItem ]* }
FROM tableExpression
[ WHERE booleanExpression ]
[ GROUP BY { groupItem [, groupItem ]* } ]
[ HAVING booleanExpression ]
[ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]
selectWithoutFrom:
SELECT [ ALL | DISTINCT ]
{ * | projectItem [, projectItem ]* }
projectItem:
expression [ [ AS ] columnAlias ]
| tableAlias . *
tableExpression:
tableReference [, tableReference ]*
| tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]
joinCondition:
ON booleanExpression
| USING '(' column [, column ]* ')'
tableReference:
tablePrimary
[ matchRecognize ]
[ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]
tablePrimary:
[ TABLE ] [ [ catalogName . ] schemaName . ] tableName
| LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
| UNNEST '(' expression ')'
values:
VALUES expression [, expression ]*
groupItem:
expression
| '(' ')'
| '(' expression [, expression ]* ')'
| CUBE '(' expression [, expression ]* ')'
| ROLLUP '(' expression [, expression ]* ')'
| GROUPING SETS '(' groupItem [, groupItem ]* ')'
windowRef:
windowName
| windowSpec
windowSpec:
[ windowName ]
'('
[ ORDER BY orderItem [, orderItem ]* ]
[ PARTITION BY expression [, expression ]* ]
[
RANGE numericOrIntervalExpression {PRECEDING}
| ROWS numericExpression {PRECEDING}
]
')'
Flink SQL和傳統的SQL一樣,支援包含查詢、連線、聚合等場景,另外還支援視窗、排序等場景。
SELECT/AS/WHERE
SELECT、WHERE和傳統的SQL用法一樣,用於篩選和過濾資料,同時適用於DataStream和DataSet。
SELECT * FROM Table;
SELECT name,age FROM Table;
我們也可以在WHERE條件中使用=、<、>、<>、>=、<=,以及 AND、OR 等表示式的組合:
SELECT name,age FROM Table where name LIKE '%小明%';
SELECT * FROM Table WHERE age = 20;
SELECT name, age
FROM Table
WHERE name IN (SELECT name FROM Table2)
GROUP BY / DISTINCT/HAVING
GROUP BY 用於進行分組操作,DISTINCT 用於結果去重。HAVING 和傳統 SQL 一樣,可以用來在聚合函式之後進行篩選。
SELECT DISTINCT name FROM Table;
SELECT name, SUM(score) as TotalScore FROM Table GROUP BY name;
SELECT name, SUM(score) as TotalScore FROM Table GROUP BY name HAVING
SUM(score) > 300;
JOIN
JOIN 可以用於把來自兩個表的資料聯合起來形成結果表,目前 Flink 的 Join 只支援等值連線。Flink 支援的 JOIN 型別包括:
JOIN - INNER JOIN
LEFT JOIN - LEFT OUTER JOIN
RIGHT JOIN - RIGHT OUTER JOIN
FULL JOIN - FULL OUTER JOIN
例如:
SELECT *
FROM User LEFT JOIN Product ON User.name = Product.buyer
SELECT *
FROM User RIGHT JOIN Product ON User.name = Product.buyer
SELECT *
FROM User FULL OUTER JOIN Product ON User.name = Product.buyer
LEFT JOIN、RIGHT JOIN 、FULL JOIN 相與我們傳統 SQL 中含義一樣。
WINDOW
根據視窗資料劃分的不同,目前 Apache Flink 有如下 3 種:
- 滾動視窗,視窗資料有固定的大小,視窗中的資料不會疊加;
- 滑動視窗,視窗資料有固定大小,並且有生成間隔;
- 會話視窗,視窗資料沒有固定的大小,根據使用者傳入的引數進行劃分,視窗資料無疊加;
滾動視窗:滾動視窗的特點是:有固定大小、視窗中的資料不會重疊,如下圖所示:
滾動視窗常用的語法:
SELECT
[gk],
[TUMBLE_START(timeCol, size)],
[TUMBLE_END(timeCol, size)],
agg1(col1),
...
aggn(colN)
FROM Tab1
GROUP BY [gk], TUMBLE(timeCol, size)
舉例說明,我們需要計算每個使用者每天的訂單資料:
SELECT user, TUMBLE_START(timeLine, INTERVAL '1' DAY) as winStart, SUM(amount) FROM Orders GROUP BY TUMBLE(timeLine, INTERVAL '1' DAY), user;
其中,TUMBLE_START 和 TUMBLE_END 代表視窗的開始時間和視窗的結束時間,TUMBLE (timeLine, INTERVAL '1' DAY) 中的 timeLine 代表時間欄位所在的列,INTERVAL '1' DAY 表示時間間隔為一天。
滑動視窗:滑動視窗有固定的大小,與滾動視窗不同的是滑動視窗可以通過 slide 引數控制滑動視窗的建立頻率。需要注意的是,多個滑動視窗可能會發生資料重疊,具體語義如下:
滑動視窗的語法與滾動視窗相比,只多了一個 slide 引數:
SELECT
[gk],
[HOP_START(timeCol, slide, size)] ,
[HOP_END(timeCol, slide, size)],
agg1(col1),
...
aggN(colN)
FROM Tab1
GROUP BY [gk], HOP(timeCol, slide, size)
例如,我們要每間隔一小時計算一次過去 24 小時內每個商品的銷量:
SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product
上述案例中的 INTERVAL '1' HOUR 代表滑動視窗生成的時間間隔。
會話視窗:會話視窗定義了一個非活動時間,假如在指定的時間間隔內沒有出現事件或訊息,則會話視窗關閉。
會話視窗的語法如下:
SELECT
[gk],
SESSION_START(timeCol, gap) AS winStart,
SESSION_END(timeCol, gap) AS winEnd,
agg1(col1),
...
aggn(colN)
FROM Tab1
GROUP BY [gk], SESSION(timeCol, gap)
舉例,我們需要計算每個使用者過去 1 小時內的訂單量:
SELECT user, SESSION_START(rowtime, INTERVAL '1' HOUR) AS sStart, SESSION_ROWTIME(rowtime, INTERVAL '1' HOUR) AS sEnd, SUM(amount) FROM Orders GROUP BY SESSION(rowtime, INTERVAL '1' HOUR), user
內建函式
Flink 中還有大量的內建函式,我們可以直接使用,將內建函式分類如下:
- 比較函式
- 邏輯函式
- 算術函式
- 字串處理函式
- 時間函式
比較函式
邏輯函式
算術函式
字串處理函式
時間函式
二、Flink Table & SQL案例
public class MyStreamingSource implements SourceFunction<Item> {
private boolean isRunning = true;
/**
* 重寫run方法產生一個源源不斷的資料傳送源
* @param ctx
* @throws Exception
*/
public void run(SourceContext<Item> ctx) throws Exception {
while(isRunning){
Item item = generateItem();
ctx.collect(item);
//每秒產生一條資料
Thread.sleep(1000);
}
}
@Override
public void cancel() {
isRunning = false;
}
//隨機產生一條商品資料
private Item generateItem(){
int i = new Random().nextInt(100);
ArrayList<String> list = new ArrayList();
list.add("HAT");
list.add("TIE");
list.add("SHOE");
Item item = new Item();
item.setName(list.get(new Random().nextInt(3)));
item.setId(i);
return item;
}
}
我們把實時的商品資料流進行分流,分成 even 和 odd 兩個流進行 JOIN,條件是名稱相同,最後,把兩個流的 JOIN 結果輸出。
class StreamingDemo {
public static void main(String[] args) throws Exception {
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
SingleOutputStreamOperator<Item> source = bsEnv.addSource(new MyStreamingSource()).map(new MapFunction<Item, Item>() {
@Override
public Item map(Item item) throws Exception {
return item;
}
});
DataStream<Item> evenSelect = source.split(new OutputSelector<Item>() {
@Override
public Iterable<String> select(Item value) {
List<String> output = new ArrayList<>();
if (value.getId() % 2 == 0) {
output.add("even");
} else {
output.add("odd");
}
return output;
}
}).select("even");
DataStream<Item> oddSelect = source.split(new OutputSelector<Item>() {
@Override
public Iterable<String> select(Item value) {
List<String> output = new ArrayList<>();
if (value.getId() % 2 == 0) {
output.add("even");
} else {
output.add("odd");
}
return output;
}
}).select("odd");
bsTableEnv.createTemporaryView("evenTable", evenSelect, "name,id");
bsTableEnv.createTemporaryView("oddTable", oddSelect, "name,id");
Table queryTable = bsTableEnv.sqlQuery("select a.id,a.name,b.id,b.name from evenTable as a join oddTable as b on a.name = b.name");
queryTable.printSchema();
bsTableEnv.toRetractStream(queryTable, TypeInformation.of(new TypeHint<Tuple4<Integer,String,Integer,String>>(){})).print();
bsEnv.execute("streaming sql job");
}
}
結果如下: