1. 程式人生 > 其它 >Flink TableAPI&SQL(二)

Flink TableAPI&SQL(二)

2.6 表和流的轉換

一般用於測試時候的資料輸出,針對的是 流資料 。由於Table沒有提供print()方法,所有要將Table資料型別轉換成DataStream資料型別或者DataSet。

2.6.1 將表(Table)轉換成流(DataStream)

  • 呼叫 toDataStream() 方法
Table aggResult = tableEnv.sqlQuery("select user, `url` from clickTable");
tableEnv.toDataStream(aggResult).print("agg");

對於簡單的select查詢表來說,可以直接呼叫toDataStream()

方法來進行轉換輸出。但是在進行一些聚合、組合等操作的時候,需要呼叫toChangelogStream() 方法,否則會報如下錯誤:

Table aggResult = tableEnv.sqlQuery("select user, COUNT(url) as cnt from clickTable group by user");
tableEnv.toDataStream(aggResult).print("agg");

這表示當前的TableSink 並不支援表的更新(update)操作。

因為資料是一條一條來的,當前SQL語句所表示的是進行一個累加聚合操作,當第一條資料為(zhangsan,1)的時候,後面又來的一個zhansan

(按user分組聚合),此時zhangsan的資料就變成了(zhangsan,2),這其實是更改了表的資料,因此對於這樣有更新操作的表,我們不要試圖直接把它轉換成 DataStream 列印輸出,而是記錄一下它的“更新日誌”(change log)。這樣一來,對於表的所有更新操作,就變成了一條更新日誌的流,我們就可以轉換成流列印輸出了。

  • 呼叫 toChangelogStream() 方法
Table aggResult = tableEnv.sqlQuery("select user, COUNT(url) as cnt from clickTable group by user");
tableEnv.toChangelogStream(aggResult).print("agg");

總結:當對有更新操作的表進行流轉換的時候應當呼叫toChangelogStream()方法。

2.6.2 將流(DataStream)轉換成表(Table)

  • fromDataStream() 方法

想要將一個DataStream 轉換成表也很簡單,可以通過呼叫表環境的 fromDataStream()方法來實現,返回的就是一個 Table 物件。例如,我們可以直接將事件流 eventStream 轉換成一個表

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 獲取表環境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// 讀取資料來源
SingleOutputStreamOperator<Event> eventStream = env.addSource(...)

// 將資料流轉換成表
Table eventTable = tableEnv.fromDataStream(eventStream);

由於流中的資料本身就是定義好的 POJO 型別 Event,所以我們將流轉換成表之後,每一行資料就對應著一個Event,而表中的列名就對應著Event 中的屬性。

話雖如此,但有時還會出現欄位被重新命名為f0...的問題。

因此我們還可以在 fromDataStream()方法中增加引數,用來指定提取哪些屬性作為表中的欄位名,並可以任意指定位置

// $表示提取對應欄位
Table eventTable2 = tableEnv.fromDataStream(eventDataStreamSource, $("timestamp").as("ts"),$("url"));

//如果表中只有一個欄位,那麼可以直接重新命名
tableEnv.fromDataStream(stream, $("myLong"))// 將裡面的欄位重新命名為myLong

// 如果不指定欄位,語句會按順序重新命名 
Table eventTable = tableEnv.fromDataStream(eventDataStreamSource).as("user","url", "timestamp");
  • createTemporaryView() 方法

如果我們想要在SQL語句中直接引用該表,那就應該用createTemporaryView()方法,建立一個臨時檢視。

tableEnv.createTemporaryView("EventTable", eventDataStreamSource, $("timestamp").as("ts"),$("url"));

傳入的兩個引數,第一個依然是註冊的表名,而第二個可以直接就是DataStream。之後仍舊可以傳入多個引數,用來指定表中的欄位。

  • fromChangelogStream()方法

2.6.3 支援的資料型別

整體來看,DataStream 中支援的資料型別,Table 中也是都支援的,只不過在進行轉換時需要注意一些細節。

  • 原子型別:基礎資料型別(Integer、Double、String)和通用資料型別(就是不可再拆分的資料型別)

  • Tuple型別

當原子型別不做重新命名的時候,預設欄位名就是f0,欄位還可以通過呼叫表示式的 as()方法來進行重新命名。

StreamTableEnvironment tableEnv = ...; DataStream<Tuple2<Long, Integer>> stream = ...;
// 將資料流轉換成只包含 f1 欄位的表
Table table = tableEnv.fromDataStream(stream, $("f1"));

// 將資料流轉換成包含 f0 和 f1 欄位的表,在表中 f0 和 f1 位置交換
Table table = tableEnv.fromDataStream(stream, $("f1"), $("f0"));

// 將 f1 欄位命名為 myInt,f0 命名為 myLong
Table table = tableEnv.fromDataStream(stream, $("f1").as("myInt"),
$("f0").as("myLong"));

  • POJO型別

POJO也可以理解為Java中的Bean。POJO型別中的欄位也可以同樣被重新排序、提取和重新命名。

StreamTableEnvironment tableEnv = ...; DataStream<Event> stream = ...;
Table table = tableEnv.fromDataStream(stream);
Table table = tableEnv.fromDataStream(stream, $("user"));
Table table = tableEnv.fromDataStream(stream, $("user").as("myUser"),$("url").as("myUrl"));
  • Row型別

欄位按位置,任意數量的欄位對映,支援null值,無型別安全訪問。

?????????

3、 流處理中的表

3.1 動態表和持續查詢

  • 動態表

當流中有新資料到來,初始的表中會插入一行;而基於這個表定義的 SQL 查詢,就應該在之前的基礎上更新結果。這樣得到的表就會不斷地動態變化,被稱為“動態表”(Dynamic Tables)。

動態表是Flink 在Table API 和SQL 中的核心概念,它為流資料處理提供了表和SQL 支援。我們所熟悉的表一般用來做批處理,面向的是固定的資料集,可以認為是“靜態表”;而動態表則完全不同,它裡面的資料會隨時間變化。

其實動態表的概念,我們在傳統的關係型資料庫中已經有所接觸。資料庫中的表,其實是一系列 INSERT、UPDATE 和 DELETE 語句執行的結果;在關係型資料庫中,我們一般把它稱為更新日誌流(changelog stream)。如果我們儲存了表在某一時刻的快照(snapshot),那麼接下來只要讀取更新日誌流,就可以得到表之後的變化過程和最終結果了。在很多高階關係型資料庫(比如 Oracle、DB2)中都有“物化檢視”(Materialized Views)的概念,可以用來快取 SQL 查詢的結果;它的更新其實就是不停地處理更新日誌流的過程。

Flink 中的動態表,就借鑑了物化檢視的思想。

  • 持續查詢

動態表可以像靜態的批處理表一樣進行查詢操作。由於資料在不斷變化,因此基於它定義的 SQL 查詢也不可能執行一次就得到最終結果。這樣一來,我們對動態表的查詢也就永遠不會停止,一直在隨著新資料的到來而繼續執行。這樣的查詢就被稱作“持續查詢”(Continuous Query)。對動態表定義的查詢操作,都是持續查詢;而持續查詢的結果也會是一個動態表。

由於每次資料到來都會觸發查詢操作,因此可以認為一次查詢面對的資料集,就是當前輸入動態表中收到的所有資料。這相當於是對輸入動態表做了一個“快照”(snapshot),當作有限資料集進行批處理;流式資料的到來會觸發連續不斷的快照查詢,像動畫一樣連貫起來,就構成了“持續查詢”。

如下圖所示:描述了持續查詢的過程。這裡我們也可以清晰地看到流、動態表和持續查詢的關係:

持續查詢的步驟如下:

(1) 流(stream)被轉換為動態表(dynamic table);

(2) 對動態表進行持續查詢(continuous query),生成新的動態表;

(3) 生成的動態表被轉換成流。

這樣,只要API 將流和動態表的轉換封裝起來,我們就可以直接在資料流上執行 SQL 查詢,用處理表的方式來做流處理了。

package com.peng.dynamic_table;

import com.peng.Event;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @author 海綿先生
 * @Description TODO 動態表-持續查詢
 * @date 2022/11/22-19:42
 */
public class DynamicTableDemo01 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // source
        DataStreamSource<Event> stream = env.fromElements(
                new Event("Alice", "./home", 1000L),
                new Event("Bob", "./cart", 1000L),
                new Event("Alice", "./prod?id=1", 5 * 1000L),
                new Event("Cary", "./home", 60 * 1000L),
                new Event("Bob", "./prod?id=3", 90 * 1000L),
                new Event("Alice", "./prod?id=7", 105 * 1000L)
        );

        // 將資料轉換成表,資料一條一條的來,表一條一條的插入改變
        tableEnv.createTemporaryView("eventTable", stream);
        // sql持續查詢表的狀態,進行聚合統計
        Table resultTable = tableEnv.sqlQuery("select user, count(url) as cnt from eventTable group by user");
        // 表中涉及到更新,用toChangelogStream()方法
        tableEnv.toChangelogStream(resultTable).print();
        /**列印結果:
         * +I[Alice, 1]         --- +I表示插入增加
         * +I[Bob, 1]
         * -U[Alice, 1]         --- -U表示撤回上次的更改 撤回/撤銷[Alice, 1]資料
         * +U[Alice, 2]         --- +U表示更新修改    資料更新為[Alice, 2]
         * +I[Cary, 1]
         * -U[Bob, 1]
         * +U[Bob, 2]
         * -U[Alice, 2]
         * +U[Alice, 3]
         */

        env.execute();
    }
}

動態表的變化如下圖:

而SQL語句就是擷取期間變化的狀態,持續查詢。

因此一個SQL語句可以引申出追加查詢更新查詢

  • 追加查詢:沒有涉及到聚合累加語句,不會更改原表中已有的資料
  • 更新查詢:涉及到聚合累加語句,會更改原表中已有的資料

3.2 將動態錶轉換成流

動態表也可以通過插入(Insert)、更新(Update)和刪除(Delete)操作,進行持續的更改。

  • 追加流
    • 只涉及到插入
  • 撤回流
    • 涉及到add和retract
  • 更新流
    • 需指定唯一的key值,涉及到update和delete

4、 時間屬性和視窗

基於時間的操作(比如時間視窗),需要定義相關的時間語義和時間資料來源的資訊。在 Table API 和 SQL 中,會給表單獨提供一個邏輯上的時間欄位,專門用來在表處理程式中指示時間。

所以所謂的時間屬性(time attributes),其實就是每個表模式結構(schema)的一部分。它可以在建立表的DDL 裡直接定義為一個欄位,也可以在 DataStream 轉換成表時定義。一旦定義了時間屬性,它就可以作為一個普通欄位引用,並且可以在基於時間的操作中使用。

時間屬性的資料型別TIMESTAMP,它的行為類似於常規時間戳,可以直接訪問並且進行計算。

4.1 事件時間

時間戳的資料型別為 TIMESTAMP

4.1.1 在建立表的DDL 中定義

在建立表的 DDL(CREATE TABLE 語句)中,可以增加一個欄位,通過 WATERMARK 語句來定義事件時間屬性。WATERMARK 語句主要用來定義水位線(watermark)的生成表示式,這個表示式會將帶有事件時間戳的欄位標記為事件時間屬性,並在它基礎上給出水位線的延遲時間。具體定義方式如下:

CREATE TABLE EventTable( 
	user STRING,
	url STRING,
	ts TIMESTAMP(3),
	WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
	) WITH (
	...
	);

這裡我們把 ts 欄位定義為事件時間屬性,而且基於 ts 設定了 5 秒的水位線延遲。這裡的“5 秒”是以“時間間隔”的形式定義的,格式是INTERVAL <數值> <時間單位>:

INTERVAL '5' SECOND     

這裡的數值必須用單引號引起來,而單位用 SECOND 和 SECONDS 是等效的。

Flink 中支援的事件時間屬性資料型別必須為TIMESTAMP 或者TIMESTAMP_LTZ。這裡TIMESTAMP_LTZ 是指帶有本地時區資訊的時間戳(TIMESTAMP WITH LOCAL TIME ZONE);一般情況下如果資料中的時間戳是“年-月-日-時-分-秒”的形式,那就是不帶時區資訊的,可以將事件時間屬性定義為TIMESTAMP 型別。

而如果原始的時間戳就是一個長整型的毫秒數,這時就需要另外定義一個欄位來表示事件時間屬性,型別定義為TIMESTAMP_LTZ 會更方便:

CREATE TABLE events ( 
    user STRING,
	url STRING, 
	ts BIGINT,
	ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),//將時間轉換成TIMESTAMP型別
	WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND//設定 5 秒的水位線延遲
	) WITH (
	...
	);

這裡我們另外定義了一個欄位ts_ltz,是把長整型的 ts 轉換為TIMESTAMP_LTZ 得到的;進而使用 WATERMARK 語句將它設為事件時間屬性,並設定 5 秒的水位線延遲。

可以簡單看下TO_TIMESTAMP_LTZ轉換後的結果:

4.1.2 在資料流轉換為表時定義

看例項吧

package com.peng.time;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @author 海綿先生
 * @Description TODO 在流轉表中建立事件時間
 * @date 2022/11/23-11:16
 */
public class EventTest02 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // source
        DataStreamSource<Event> streamSource = env.addSource(new ClickSource());

        // 想在流轉表中定義事件時間,就要先指定誰是事件時間
        SingleOutputStreamOperator<Event> clickStream = streamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.getTimestamp();
                    }
                }));

        // '$' 進行欄位重新命名
        Table clickTable = tableEnv.fromDataStream(clickStream, $("user"), $("url"), $("timestamp").as("ts"),
                $("et").rowtime());//宣告一個額外的邏輯欄位作為事件時間,因為上一步已經指定了事件時間,.rowtime()會自動進行轉換

        tableEnv.toDataStream(clickTable).print();

        env.execute();
    }
}

執行結果:

4.2 處理時間

4.2.1 在建立表的DDL 中定義

在建立表的 DDL(CREATE TABLE 語句)中,可以增加一個額外的欄位,通過呼叫系統內建的 PROCTIME() 函式來指定當前的處理時間屬性,返回的型別是TIMESTAMP_LTZ。

CREATE TABLE events ( 
    user STRING,
	url STRING, 
	ts BIGINT,
	ts_ltz AS PROCTIME()
	) WITH (
	...
	);

宣告一個而外欄位ts_ltz,用來儲存處理時間

4.2.2 在流轉表時定義

fromDataStream() 方法裡定義欄位名的時候,新增/指定欄位,呼叫 proctime() 方法,進行一個處理時間的定義。

綜合展示:

package com.peng.time;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @author 海綿先生
 * @Description TODO 在DDL中和流中建立處理時間
 * @date 2022/11/23-13:15
 */
public class ProcessTimeTest01 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // TODO 在DDL中建立處理時間
        // 呼叫PROCTIME()函式
        String createDDL = "CREATE TABLE clickTable ( " +
                " user_name STRING, " +
                " url STRING, " +
                " ts BIGINT, " +
                " ts_ltz AS PROCTIME() " +
                " ) WITH (" +
                " 'connector' = 'filesystem'," +
                " 'path' = 'input/clicks.txt'," +
                " 'format' = 'csv'" +
                " )";

        // TODO 在流轉表中建立處理時間
        DataStreamSource<Event> stream = env.addSource(new ClickSource());
        Table clickTable = tableEnv.fromDataStream(stream, $("user"), $("url"), $("timestamp").as("ts"),
                $("ps").proctime());// 這裡重新聲明瞭一個欄位
        tableEnv.toDataStream(clickTable).print();
        env.execute();
    }
}

結果:

5、 視窗

FlinkTable & SQL中進行開窗有下面兩種方法

5.1 分組視窗(Group Window)

在SQL語句中,開窗只能在 group by 中完成

在 Flink 1.12 之前的版本中,Table API 和 SQL 提供了一組“分組視窗”(Group Window)函式,常用的時間視窗如滾動視窗、滑動視窗、會話視窗都有對應的實現;具體在 SQL 中就是呼叫 TUMBLE()、HOP()、SESSION(),傳入時間屬性欄位、視窗大小等引數就可以了。以滾動視窗為例:統計一小時內的使用者點選數

Table result = tableEnv.sqlQuery(
    "SELECT " +
		"user, " +
		"TUMBLE_END(ts, INTERVAL '1' HOUR) as endT, " + //TUPMBLE_END()函式獲取滾動視窗的結束時間,重新命名為 endT 提取出來。
		"COUNT(url) AS cnt " + 
    "FROM EventTable " +
	"GROUP BY " +	// 使用視窗和使用者名稱進行分組
		"user, " +
		"TUMBLE(ts, INTERVAL '1' HOUR)" // 定義 1 小時滾動視窗
	);

可用看出,分組視窗在 GROUP BY SQL查詢語句中定義。

  • 官方文件解釋:
組視窗函式 描述
TUMBLE(time_attr, interval) 定義翻滾時間視窗。翻滾時間視窗將行分配給具有固定持續時間(interval)的非重疊連續視窗。例如,5分鐘的翻滾視窗以5分鐘為間隔對行進行分組。可以在事件時間(流+批處理)或處理時間(流)上定義翻滾視窗。
HOP(time_attr, interval, interval) 定義跳躍時間視窗(在 Table API中稱為滑動視窗)。跳躍時間視窗具有固定的持續時間(第二interval引數)並且按指定的跳躍間隔(第一interval引數)跳躍。如果跳躍間隔小於視窗大小,則跳躍視窗重疊。因此,可以將行分配給多個視窗。例如,15分鐘大小和5分鐘跳躍間隔的跳躍視窗將每行分配給3個不同的15分鐘大小的視窗,這些視窗以5分鐘的間隔進行評估。可以在事件時間(流+批處理)或處理時間(流)上定義跳躍視窗。
SESSION(time_attr, interval) 定義會話時間視窗。會話時間視窗沒有固定的持續時間,但它們的界限由interval不活動時間定義,即如果在定義的間隙期間沒有出現事件,則會話視窗關閉。例如,如果在30分鐘不活動後觀察到一行,則會開始一個30分鐘間隙的會話視窗(否則該行將被新增到現有視窗中),如果在30分鐘內未新增任何行,則會關閉。會話視窗可以在事件時間(流+批處理)或處理時間(流)上工作。

時間屬性

對於流表的SQL查詢,time_attr組視窗函式的引數必須引用指定行的處理時間或事件時間的有效時間屬性。

對於批處理表上的SQL,time_attr組視窗函式的引數必須是型別的屬性TIMESTAMP

選擇組視窗開始和結束時間戳

可以使用以下輔助函式選擇組視窗的開始和結束時間戳以及時間屬性:

輔助函式 描述
TUMBLE_START(time_attr, interval) 返回相應的翻滾,跳躍或會話視窗的包含下限的時間戳。
HOP_START(time_attr, interval, interval)
SESSION_START(time_attr, interval)
TUMBLE_END(time_attr, interval) 返回相應的翻滾,跳躍或會話視窗的_獨佔_上限的時間戳。注意:獨佔上限時間戳_不能_在後續基於時間的 運算元操作中用作行時屬性,例如時間視窗連線和組視窗或視窗聚合。
HOP_END(time_attr, interval, interval)
SESSION_END(time_attr, interval)
TUMBLE_ROWTIME(time_attr, interval) 返回相應的翻滾,跳躍或會話視窗的_包含_上限的時間戳。結果屬性是rowtime屬性,可用於後續基於時間的 運算元操作,例[時間視窗連線和組視窗或視窗聚合。
HOP_ROWTIME(time_attr, interval, interval)
SESSION_ROWTIME(time_attr, interval)
TUMBLE_PROCTIME(time_attr, interval) 返回proctime屬性,該屬性可用於後續基於時間的 運算元操作,例如時間視窗連線和組視窗或視窗聚合。
HOP_PROCTIME(time_attr, interval, interval)
SESSION_PROCTIME(time_attr, interval)

_注意:_必須使用與GROUP BY子句中的組視窗函式完全相同的引數呼叫輔助函式。

下面的示例演示如何在流表上使用組視窗指定SQL查詢。

CREATE TABLE Orders (
  user       BIGINT,
  product    STIRNG,
  amount     INT,
  order_time TIMESTAMP(3),
  WATERMARK FOR order_time AS order_time - INTERVAL '1' MINUTE
) WITH (...);

SELECT
  user,
  TUMBLE_START(order_time, INTERVAL '1' DAY) AS wStart,
  SUM(amount) FROM Orders
GROUP BY
  TUMBLE(order_time, INTERVAL '1' DAY),
  user;

5.2 視窗表值函式(Windowing TVFs)

6、 聚合查詢

6.1 分組聚合

其實就是編寫對應的SQL語句。GROUP BY 分組, 然後呼叫聚合函式SUM()、MAX()、MIN()、AVG()以及 COUNT()...

6.2 視窗聚合

新增資料:

zhangsan, ./home, 1500
zhangsan, ./appliance, 3500
zhangsan, ./food, 4500
zhangsan, ./cart, 5500
lisi, ./shop, 2000
wangwu, ./school, 25000
zhangsan, ./food, 45000
zhangsan, ./cart, 55000
lisi, ./shop, 20000
wangwu, ./school, 25000

6.2.1 基於分組視窗

程式碼:

package com.peng.time_window;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @author 海綿先生
 * @Description TODO
 * @date 2022/11/23-10:04
 */
public class TimeAndWindowTest01 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // DDL中建立事件時間
        String createDDL = "CREATE TABLE clickTable ( " +
                " user_name STRING, " +
                " url STRING, " +
                " ts BIGINT, " +
                " et AS TO_TIMESTAMP_LTZ(ts, 3), " + // 將時間戳轉換成TIMESTAMP型別
                " WATERMARK FOR et AS et - INTERVAL '5' SECOND" +
                " ) WITH (" +
                " 'connector' = 'filesystem'," +
                " 'path' = 'input/clicks.txt'," +
                " 'format' = 'csv'" +
                " )";

        tableEnv.executeSql(createDDL);
        
        DataStreamSource<Event> clickStream = env.addSource(new ClickSource());
        SingleOutputStreamOperator<Event> eventSingleOutputStreamOperator = clickStream.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.getTimestamp();
                    }
                }));

        Table clickTable = tableEnv.fromDataStream(eventSingleOutputStreamOperator, $("user"), $("url"), $("timestamp").as("ts"),
                $("et").rowtime());

        // TODO 分組聚合
        Table aggTable = tableEnv.sqlQuery("select user_name, count(url) from clickTable group by user_name");
        // TODO 分組視窗聚合
        Table groupByWindowTable = tableEnv.sqlQuery("select " +
                "user_name, count(url) as cnt, " +
                "TUMBLE_END(et, INTERVAL '10' SECOND) AS entT " +
                "from clickTable " +
                "group by " +
                "user_name, " +
                "TUMBLE(et, INTERVAL '10' SECOND)"//建立一個10秒的滾動視窗
        );
        //clickTable.printSchema();//列印模板

        tableEnv.toChangelogStream(aggTable).print("agg");
        tableEnv.toChangelogStream(groupByWindowTable).print("groupByWindowTable");
        env.execute();
    }
}

結果:

6.2.2 基於視窗表值函式(Windowing TVFs)

視窗本身返回的是就是一個表,所以視窗會出現在 FROM後面,GROUP BY 後面的則是視窗新增的欄位 window_start 和window_end。

Table result = tableEnv.sqlQuery(
	"SELECT " +
	"user, " +
	"window_end AS endT, " + 
	"COUNT(url) AS cnt " +
	"FROM TABLE( " +
		"TUMBLE( TABLE EventTable, " + // TABLE 要開窗的表名
		"DESCRIPTOR(ts), " + 	// 選定事件時間
		"INTERVAL '1' HOUR)) " + // 設定一小時間隔-每隔一小時滾動一次
	"GROUP BY user, window_start, window_end "
);

完整程式碼:

package com.peng.time_window;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @author 海綿先生
 * @Description TODO
 * @date 2022/11/23-10:04
 */
public class TimeAndWindowTest01 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        String createDDL = "CREATE TABLE clickTable ( " +
                " user_name STRING, " +
                " url STRING, " +
                " ts BIGINT, " +
                " et AS TO_TIMESTAMP_LTZ(ts, 3), " + // 將時間戳轉換成TIMESTAMP型別
                " WATERMARK FOR et AS et - INTERVAL '5' SECOND" +
                " ) WITH (" +
                " 'connector' = 'filesystem'," +
                " 'path' = 'input/clicks.txt'," +
                " 'format' = 'csv'" +
                " )";

        tableEnv.executeSql(createDDL);
        DataStreamSource<Event> clickStream = env.addSource(new ClickSource());
        SingleOutputStreamOperator<Event> eventSingleOutputStreamOperator = clickStream.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.getTimestamp();
                    }
                }));

        Table clickTable = tableEnv.fromDataStream(eventSingleOutputStreamOperator, $("user"), $("url"), $("timestamp").as("ts"),
                $("et").rowtime());

        // TODO 分組聚合
        Table aggTable = tableEnv.sqlQuery("select user_name, count(url) from clickTable group by user_name");
        // TODO 分組視窗聚合
        Table groupByWindowTable = tableEnv.sqlQuery("select " +
                "user_name, count(url) as cnt, " +
                "TUMBLE_END(et, INTERVAL '10' SECOND) AS entT " +
                "from clickTable " +
                "group by " +
                "user_name, " +
                "TUMBLE(et, INTERVAL '10' SECOND)"
        );

        // 基於視窗表值函式(Windowing TVFs)的視窗聚合
        Table tumbleWindowTable = tableEnv.sqlQuery("select user_name, count(url) as cnt, " +
                "window_end as endT " +
                "from TABLE( " +
                "TUMBLE(TABLE clickTable, DESCRIPTOR(et), INTERVAL '10' SECOND) " +
                ")" +
                "group by user_name, window_end, window_start");

        //clickTable.printSchema();//列印模板

        tableEnv.toChangelogStream(aggTable).print("agg");
        tableEnv.toChangelogStream(groupByWindowTable).print("groupByWindowTable");
        tableEnv.toChangelogStream(tumbleWindowTable).print("tumbleWindowTable");
        env.execute();
    }
}

結果一樣:

滑動視窗:

Table hopWindowTable = tableEnv.sqlQuery("select user_name, count(url) as cnt, " +
                "window_end as endT " +
                "from TABLE( " +
                "HOP(TABLE clickTable, DESCRIPTOR(et),INTERVAL '5' SECOND, INTERVAL '10' SECOND) " + // 設定每隔5秒,統計10秒內的資料
                ")" +
                "group by user_name, window_end, window_start");

累計視窗:

		// 累計視窗
        Table cumulateWindowTable = tableEnv.sqlQuery("select user_name, count(url) as cnt, " +
                "window_end as endT " +
                "from TABLE( " +
                "CUMULATE(TABLE clickTable, DESCRIPTOR(et),INTERVAL '5' SECOND, INTERVAL '10' SECOND) " + // 設定每隔5秒,統計10秒內的資料
                ")" +
                "group by user_name, window_end, window_start");

6.3 開窗(Over)聚合

就是以每一行資料為基準,開一個上下的視窗,有點像滑動視窗。只不過之前的滑動視窗是基於時間的,這個有點像DataStreamAPI裡基於資料量的滑動視窗思想。

基本語法:

SELECT
	<聚合函式> OVER (
	[PARTITION BY <欄位 1>[, <欄位 2>, ...]]
	ORDER BY <時間屬性欄位>
	<開窗範圍>),
	...
FROM ...

Top N基本語法

在 Flink SQL 中,是通過 OVER 聚合和一個條件篩選來實現 Top N 的。具體來說,是通過將一個特殊的聚合函式 ROW_NUMBER() 應用到OVER 視窗上,統計出每一行排序後的行號, 作為一個欄位提取出來;然後再用WHERE 子句篩選行號小於等於N 的那些行返回

Flink官方對Top N進行了專門的優化,使得OVER開窗函式後面的 ORDER BY 不在跟時間屬性欄位

SELECT ... 
FROM (
	SELECT ...,
		ROW_NUMBER() OVER (# 先將OVER開窗裡的資料進行一個排序,並給每行資料賦予一個行號
			[PARTITION BY <欄位 1>[, <欄位 1>...]]
			ORDER BY <排序欄位 1> [asc|desc][, <排序欄位 2> [asc|desc]...]
		) AS row_num 
    FROM ...)
WHERE row_num <= N [AND <其它條件>]

下面是一個具體的示例,我們統計每個使用者的訪問事件中,按照字元長度排序的前兩個url

SELECT user, url, ts, row_num 
FROM (
	SELECT *, 
    	ROW_NUMBER() OVER (
			PARTITION BY user
			ORDER BY CHAR_LENGTH(url) desc
		) AS row_num 
    FROM EventTable)
WHERE row_num <= 2

例項:

package com.peng.top_n;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @author 海綿先生
 * @Description TODO 實現一個簡單的TOPN排序
 * @date 2022/11/23-20:03
 */
public class TopNStatic {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        DataStreamSource<Event> streamSource = env.addSource(new ClickSource());

        SingleOutputStreamOperator<Event> clickSource = streamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.getTimestamp();
                    }
                }));

        Table clickTable = tableEnv.fromDataStream(clickSource, $("user").as("user_name"), $("url"), $("timestamp").as("ts"), $("et").rowtime());

        tableEnv.createTemporaryView("clickTable", clickTable);

        // Top N 統計被訪問頁面次數的前2名
        /*Table TopNUrl = tableEnv.sqlQuery("select user_name, count(url) as cnt, " +
                "TUMBLE_END(et, INTERVAL '15' SECOND) AS entT " +
                "from clickTable " +
                "GROUP BY user_name," +
                "TUMBLE(et, INTERVAL '15' SECOND) " +
                "ORDER BY cnt DESC " +
                "LIMIT 2 "
        );*/

        // TopN 統計前2名 user 訪問次數---針對的是全量
        Table topNResultTable = tableEnv.sqlQuery("select user_name, cnt, row_num " +
                "from (" +
                "   select *, ROW_NUMBER() OVER (" +
                "       ORDER BY cnt DESC" +
                "    ) AS row_num " +
                "   from (select user_name, count(url) as cnt from clickTable group by user_name )" +
                ") where row_num <= 2");

        //tableEnv.toChangelogStream(TopNUrl).print("TopN Url: ");
        tableEnv.toChangelogStream(topNResultTable).print("top 2: ");

        env.execute();
    }
}

topNResultTable的結果:

基於視窗的TopN

		String subQuery = "select user_name, count(url) as cnt, window_start, window_end " +
                "from table (" +
                "   tumble(table clickTable, descriptor(et), interval '15' second)" +
                " )" +
                "group by user_name, window_start, window_end";

        Table windowTopNResultTable = tableEnv.sqlQuery("select user_name, cnt, row_num " +
                "from (" +
                "   select *, row_number() over (" +
                "       partition by window_start, window_end " +
                "       order by cnt desc" +
                "       ) as row_num " +
                "   from (" + subQuery + " )" +
                ") where row_num <= 2");
		tableEnv.toChangelogStream(windowTopNResultTable).print("windowTop 2: ");

結果:

此時可以看到,只有 +I 操作,因為視窗隨時間滑動,每張表都在不停更新,相當於每張表都在執行插入操作

7、 函式

SQL 語言一樣,Flink TableAPI & SQL裡也有相應的函式。目前TableAPI裡的函式是少於SQL裡的,因此在實際開發中寫的最多的還是SQL。

字串、欄位轉大小寫

----------------轉大寫--------------
// SQL裡
UPPER(str)
// TableAPI 裡
str.upperCase()
----------------轉小寫--------------
LOWER(str)
str.lowerCase()

7.1 系統函式

1) 標量函式

  • 比較函式(=、<>、IS NOT)

    • value1 = value2 判斷兩個值相等;

    • value1 <> value2 判斷兩個值不相等

    • value IS NOT NULL 判斷value 不為空

  • 邏輯函式(OR、IS、NOT)

    • boolean1 OR boolean2 布林值boolean1 與布林值 boolean2 取邏輯或
    • boolean IS FALSE 判斷布林值 boolean 是否為 false
    • NOT boolean 布林值 boolean 取邏輯非
  • 算術函式(+-*/、POWER()、RAND())

    • numeric1 + numeric2 兩數相加
    • POWER(numeric1, numeric2) 冪運算,取數numeric1 的 numeric2 次方
    • RAND() 返回(0.0, 1.0)區間內的一個double 型別的偽隨機數
  • 字串函式

    • string1 || string2 兩個字串的連線
    • UPPER(string) 將字串 string 轉為全部大寫
    • CHAR_LENGTH(string) 計算字串 string 的長度
  • 時間函式

    • DATE string 按格式"yyyy-MM-dd"解析字串 string,返回型別為 SQL Date
    • TIMESTAMP string 按格式"yyyy-MM-dd HH:mm:ss[.SSS]"解析,返回型別為 SQL timestamp
    • CURRENT_TIME 返回本地時區的當前時間,型別為 SQL time(與 LOCALTIME等價)
    • INTERVAL string range 返回一個時間間隔。string 表示數值;range 可以是DAY, MINUTE,DAT TO HOUR 等單位,也可以是YEAR TO MONTH 這樣的複合單位。如“2 年10 個月”可以寫成:INTERVAL '2-10' YEAR TO MONTH

2) 聚合函式

在SQL語言裡面的聚合函式,Flink SQL裡都是支援的。具體可查SQL函式。

例如:

  • COUNT(*) 返回所有行的數量,統計個數

  • SUM([ ALL | DISTINCT ] expression) 對某個欄位進行求和操作。預設情況下省略了關鍵字 ALL,表示對所有行求和;如果指定 DISTINCT,則會對資料進行去重,每個值只疊加一次。

  • RANK() 返回當前值在一組值中的排名

  • ROW_NUMBER() 對一組值排序後,返回當前值的行號。與RANK()的功能相似

其中,RANK()和ROW_NUMBER()一般用在 OVER 視窗中,進行一個TopN排序。

7.2 自定義函式

如果需要自定義函式,就需要用自定義函式(UserDefinedFunction)——UDF

Flink 的Table API 和SQL 提供了多種自定義函式的介面,以抽象類的形式定義。當前 UDF主要有以下幾類:

  • 標量函式(Scalar Functions):將輸入的標量值轉換成一個新的標量值;

  • 表函式(Table Functions):將標量值轉換成一個或多個新的行資料,也就是擴充套件成一個表;

  • 聚合函式(Aggregate Functions):將多行資料裡的標量值轉換成一個新的標量值;

  • 表聚合函式(Table Aggregate Functions):將多行資料裡的標量值轉換成一個或多個新的行資料。

7.2.1 整體呼叫流程

要想在程式碼中使用自定義的函式,我們需要首先自定義對應UDF 抽象類的實現,並在表環境中註冊這個函式,然後就可以在 Table API 和SQL 中呼叫了。

  1. 註冊函式

註冊函式時需要呼叫表環境的 createTemporarySystemFunction()方法,傳入註冊的函式名以及UDF 類的Class 物件:

// 註冊全域性函式
tableEnv.createTemporarySystemFunction("MyFunction", MyFunction.class);
// 註冊目錄函式
tableEnv.createTemporaryFunction("MyFunction2", MyFunction.class);

我們自定義的 UDF 類叫作 MyFunction,它應該是上面四種 UDF 抽象類中某一個的具體實現;在環境中將它註冊為名叫 MyFunction 的函式。

  1. 使用TableAPI呼叫函式

在 Table API 中,需要使用 call()方法來呼叫自定義函式:

tableEnv.from("MyTable").select(call("MyFunction", $("myField")));
// 或者
tableEnv.from("MyTable").select(call(MyFunction.class, $("myField")));

這裡 call()方法有兩個引數,一個是註冊好的函式名 MyFunction另一個則是 函式呼叫時本身的引數。這裡我們定義 MyFunction 在呼叫時,需要傳入的引數是 myField 欄位。

  1. 在SQL中呼叫函式

當我們將函式註冊為系統函式之後,在 SQL 中的呼叫就與內建系統函式完全一樣了:

tableEnv.sqlQuery("SELECT MyFunction(myField) FROM MyTable");

7.2.2 標量函式

Flink對標量函式提供了ScalarFunction抽象類,從下圖可以看到ScalarFunction抽象類繼承的是UserDefinedFunction抽象類

目前Flink自定義函式還不完善,所以使用起來又點不方便:

想要實現自定義的標量函式,我們需要自定義一個類來繼承抽象類 ScalarFunction,並實現叫作eval() 的求值方法。標量函式的行為就取決於求值方法的定義,它必須是公有的(public),而且名字必須eval。求值方法 eval 可以過載多次,任何資料型別都可作為求值方法的引數和返回值型別。

這裡需要特別說明的是,ScalarFunction 抽象類中並沒有定義 eval()方法,所以我們不能直接在程式碼中重寫(override);但 Table API 的框架底層又要求了求值方法必須名字為 eval()。

下面來實踐下:

package com.peng.defined_function;

import com.peng.top_n.ClickSource;
import com.peng.top_n.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @author 海綿先生
 * @Description TODO 自定義標量函式
 * @date 2022/11/25-10:28
 */
public class DUFTest_ScalarFunction {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        DataStreamSource<Event> streamSource = env.addSource(new ClickSource());

        SingleOutputStreamOperator<Event> clickSource = streamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.getTimestamp();
                    }
                }));

        Table clickTable = tableEnv.fromDataStream(clickSource, $("user").as("user_name"), $("url"), $("timestamp").as("ts"), $("et").rowtime());

        tableEnv.createTemporaryView("clickTable", clickTable);

        // TODO 2. 註冊自定義函式
        tableEnv.createTemporarySystemFunction("MyHash", MyHashFunction.class);

        // TODO 3. 呼叫UDF進行查詢轉換
        Table resultTable = tableEnv.sqlQuery("select user_name, MyHash(user_name) from clickTable");

        // TODO 4. 轉換成流列印輸出
        tableEnv.toDataStream(resultTable).print();

        env.execute();
    }

    // 自定義標量函式-->求Hash值
    public static class MyHashFunction extends ScalarFunction{
        public int eval(String str){
            return str.hashCode();
        }
        /*
        // 接受任意型別輸入,返回 INT 型輸出
		public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) { 
            return o.hashCode();
		}*/

    }
}

結果:

7.2.3 表函式

package com.peng.defined_function;

import com.peng.top_n.ClickSource;
import com.peng.top_n.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableFunction;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @author 海綿先生
 * @Description TODO 自定義表函式
 * @date 2022/11/25-11:08
 */
public class DUFTest_TableFunction {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        DataStreamSource<Event> streamSource = env.addSource(new ClickSource());

        SingleOutputStreamOperator<Event> clickSource = streamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<com.peng.top_n.Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.getTimestamp();
                    }
                }));

        Table clickTable = tableEnv.fromDataStream(clickSource, $("user").as("user_name"), $("url"), $("timestamp").as("ts"), $("et").rowtime());

        tableEnv.createTemporaryView("clickTable", clickTable);

        // TODO 2. 註冊自定義函式
        tableEnv.createTemporarySystemFunction("MyTableFunction", MyTableFunction.class);

        // TODO 3. 呼叫UDF進行查詢轉換
        Table resultTable = tableEnv.sqlQuery("select url, world, length " +
                "from clickTable, LATERAL TABLE(MyTableFunction(user_name)) AS T(world, length)");// 注意:重新命名時不能跟原表重名

        // TODO 4. 轉換成流列印輸出
        tableEnv.toDataStream(resultTable).print();

        env.execute();
    }
    // 自定義表函式-->統計欄位的長度
    public static class MyTableFunction extends TableFunction<Tuple2<String, Integer>>{
        //該方法自己書寫,型別必須是public,方法名必須是eval
        public void eval(String str){
            collect(Tuple2.of(str, str.length()));//表函式是通過collect()方法進行輸出的
        }
    }
}

結果:

7.2.4 聚合函式

package com.peng.defined_function;

import com.peng.top_n.ClickSource;
import com.peng.top_n.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @author 海綿先生
 * @Description TODO 自定義聚合函式
 * @date 2022/11/25-14:21
 */
public class UDFTest_AggFunction {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        DataStreamSource<Event> streamSource = env.addSource(new ClickSource());

        SingleOutputStreamOperator<Event> clickSource = streamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<com.peng.top_n.Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.getTimestamp();
                    }
                }));

        Table clickTable = tableEnv.fromDataStream(clickSource, $("user").as("user_name"), $("url"), $("timestamp").as("ts"), $("et").rowtime());

        tableEnv.createTemporaryView("clickTable", clickTable);

        // TODO 2. 註冊自定義函式
        tableEnv.createTemporarySystemFunction("MyAggFunction", MyAggFunction.class);

        // TODO 3. 呼叫UDF進行查詢轉換
        Table resultTable = tableEnv.sqlQuery("select user_name, MyAggFunction(ts, 1) as w_avg " + //MyAggFunction(ts, 1)第二個引數傳的是1
                "from clickTable group by user_name");

        // TODO 4. 轉換成流列印輸出
        tableEnv.toChangelogStream(resultTable).print();

        env.execute();

    }

    // 單獨定義個累加器型別
    public static class WeightedAvgAccumulator{
        public long sum = 0;
        public int count = 0;
    }
    // 實現自定義聚合函式-->計算加權平均值
    public static class MyAggFunction extends AggregateFunction<Long,WeightedAvgAccumulator>{//AggregateFunction<IN, ACC>

        @Override
        public Long getValue(WeightedAvgAccumulator accumulator) {
            if (accumulator.count ==0)
                return null;
            else
                return accumulator.sum / accumulator.count;
        }

        @Override
        public WeightedAvgAccumulator createAccumulator() {
            return new WeightedAvgAccumulator();
        }
        // 實現累加計算方法,該方法必須時public屬性,方法名必須為accumulate
        public void accumulate(WeightedAvgAccumulator accumulator, Long iValue, Integer iWeight){
            accumulator.sum += iValue + iWeight;
            accumulator.count += iWeight;
        }
    }
}

7.2.5 表聚合函式

8、SQL客戶端