1. 程式人生 > 其它 >Flink:Table Api 和 Flink SQL

Flink:Table Api 和 Flink SQL

簡介

Flink 對批處理和流處理,提供了統一的上層 API

Table API 是一套內嵌在 Java 和 Scala 語言中的查詢API,它允許以非常直觀的方式組合來自一些關係運算符的查詢

Flink 的 SQL 支援基於實現了 SQL 標準的 Apache Calcite

示例:

先引入pom依賴:

        <dependency>
            <groupid>org.apache.flink</groupid>
            <artifactid>flink-table-planner_2.12</artifactid>
            <version>1.10.1</version>
        </dependency>

測試程式碼·:

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStream<string> inputStream = env.readTextFile("D:\\project\\flink-demo\\src\\main\\resources\\sensor.txt");
        DataStream<sensorreading> dataStream = inputStream.map((str) -> {
            String[] split = str.split(" ");
            return new SensorReading(split[0], Long.parseLong(split[1]), Double.parseDouble(split[2]));
        });

        //建立表環境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        //基於流建立一張表
        Table dataTab = tableEnv.fromDataStream(dataStream);
        //呼叫tableApi進行轉換操作
        Table resultTab = dataTab.select("id, temperature")
                .where("id = 'sensor_1'");
        //執行SQL
        tableEnv.createTemporaryView("sensor", dataTab);
        String sql = "select id, temperature from sensor where id = 'sensor_1'";
        Table resultSqlTab = tableEnv.sqlQuery(sql);
        //獲取結果
        tableEnv.toAppendStream(resultTab, Row.class).print("resultTab");
        tableEnv.toAppendStream(resultSqlTab, Row.class).print("resultSqlTab");

        env.execute();

執行結果:

基本程式結構

Table API 和 SQL 的程式結構,與流式處理的程式結構十分類似

StreamTableEnvironment tableEnv = ... // 建立表的執行環境
// 建立一張表,用於讀取資料
tableEnv.connect(...).createTemporaryTable("inputTable");
// 註冊一張表,用於把計算結果輸出
tableEnv.connect(...).createTemporaryTable("outputTable");
// 通過 Table API 查詢運算元,得到一張結果表
Table result = tableEnv.from("inputTable").select(...);
// 通過 SQL查詢語句,得到一張結果表
Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM inputTable ...");
// 將結果表寫入輸出表中
result.insertInto("outputTable");

表環境配置

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //基於老版本planner的流處理
        //表環境配置
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useOldPlanner() //基於老版本planner的流處理
                .inStreamingMode()//流環境
                .build();
        StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, settings);

        //基於老版本planner的批處理
        ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment oldBatchTableEnvironment = BatchTableEnvironment.create(batchEnv);

        //基於Blink的流處理
        EnvironmentSettings blinkStreamSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        StreamTableEnvironment blinkStreamEnv = StreamTableEnvironment.create(env, blinkStreamSettings);

        //基於Blink的批處理
        EnvironmentSettings blinkBatchSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inBatchMode()
                .build();
        TableEnvironment blinkBatchEnv = TableEnvironment.create(blinkBatchSettings);

TableEnvironment 可以註冊目錄 Catalog,並可以基於 Catalog 登錄檔。

表(Table)是由一個“識別符號”(identifier)來指定的,由3部分組成: Catalog名、資料庫(database)名和物件名。

表可以是常規的,也可以是虛擬的(檢視,View)。

常規表(Table)一般可以用來描述外部資料,比如檔案、資料庫表或訊息佇列的資料,也可以直接從 DataStream轉換而來。

檢視(View)可以從現有的表中建立,通常是 table API 或者 SQL 查詢的 一個結果集。

建立表

TableEnvironment 可以呼叫 .connect() 方法,連線外部系統,並呼叫 .createTemporaryTable() 方法,在 Catalog 中登錄檔。

tableEnv
.connect(...) // 定義表的資料來源,和外部系統建立連線
.withFormat(...) // 定義資料格式化方法
.withSchema(...) // 定義表結構
.createTemporaryTable("MyTable"); // 建立臨時表

引入csv依賴:

        <dependency>
            <groupid>org.apache.flink</groupid>
            <artifactid>flink-csv</artifactid>
            <version>1.10.1</version>
        </dependency>

建立表:從檔案讀取資料

        //表的建立:讀取檔案登錄檔
        String file = "D:\\project\\flink-demo\\src\\main\\resources\\sensor2.txt";

        Schema schema = new Schema()
                .field("id", DataTypes.STRING())
                .field("timestamp", DataTypes.BIGINT())
                .field("temp", DataTypes.DOUBLE());
        tabEnv.connect(new FileSystem().path(file))
            .withFormat(new Csv()) //檔案預設的字元分隔符號是逗號
            .withSchema(schema)
            .createTemporaryTable("inputTable");

        Table inputTable = tabEnv.from("inputTable");
        inputTable.printSchema();
        tabEnv.toAppendStream(inputTable, Row.class).print();

        env.execute();

執行結果:

表的查詢-Table API

Table API 是整合在 Scala 和 Java 語言內的查詢 API

Table API 基於代表“表”的 Table 類,並提供一整套操作處理的方法 API;這些方法會返回一個新的 Table 物件,表示對輸入表應用轉換操作的結果

有些關係型轉換操作,可以由多個方法呼叫組成,構成鏈式呼叫結構

測試:

        Table inputTable = tabEnv.from("inputTable");
//        inputTable.printSchema();
//        tabEnv.toAppendStream(inputTable, Row.class).print();
        //查詢轉換,聚合統計
        inputTable.select("id, temp")
                .filter("id = 'sensor_1'");
        Table aggTable = inputTable.groupBy("id")
                .select("id, id.count as idCount, temp.avg as avgTemp");

        Table aggTableSql = tabEnv.sqlQuery("select id, count(id) as idCount, avg(temp) as avgTemp from inputTable group by id");

        tabEnv.toRetractStream(aggTable, Row.class).print("aggTable");
        tabEnv.toRetractStream(aggTableSql, Row.class).print("aggTableSql");

輸出結果:

表的輸出-輸出到檔案

測試:

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useOldPlanner() //基於老版本planner的流處理
                .inStreamingMode()//流環境
                .build();
        StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, settings);

        //表的建立:讀取檔案登錄檔
        String file = "D:\\project\\flink-demo\\src\\main\\resources\\sensor2.txt";

        Schema schema = new Schema()
                .field("id", DataTypes.STRING())
                .field("timestamp", DataTypes.BIGINT())
                .field("temp", DataTypes.DOUBLE());
        tabEnv.connect(new FileSystem().path(file))
                .withFormat(new Csv())
                .withSchema(schema)
                .createTemporaryTable("inputTable");
        //基於老版本planner的流處理
        //表環境配置
        Table aggTableSql = tabEnv.sqlQuery("select id, temp from inputTable where id = 'sensor_1'");

        //輸出到檔案
        String out = "D:\\project\\flink-demo\\src\\main\\resources\\out.txt";
        Schema outSchema = new Schema()
                .field("id", DataTypes.STRING())
                .field("temp", DataTypes.DOUBLE());
        tabEnv.connect(new FileSystem().path(out))
                .withFormat(new Csv())
                .withSchema(outSchema)
                .createTemporaryTable("outputTable");

        aggTableSql.insertInto("outputTable");

        env.execute();

sensor2.txt檔案內容:

sensor_1,1547718199,35.8
sensor_2,1547718199,16.8
sensor_3,1547718199,26.9
sensor_1,1547718199,17.8
sensor_2,1547718199,38.8
sensor_3,1547718199,39.8

生成的out.txt內容:

sensor_1,35.8
sensor_1,17.8

連線kafka讀取資料

測試程式碼:

        //前面程式碼省略
		//連線kafka,並讀取資料
        Kafka kafka = new Kafka()
                .version("0.11")
                .topic("sensor")
                .property("zookeeper.connect", "192.168.1.77:2181")
                .property("bootstrap.servers", "192.168.1.77:9092");
        tabEnv.connect(kafka)
                .withFormat(new Csv())
                .withSchema(schema)
                .createTemporaryTable("inputTable");

        //表環境配置
        Table aggTableSql = tabEnv.sqlQuery("select id, temp from inputTable where id = 'sensor_1'");

        tabEnv.toAppendStream(aggTableSql, Row.class).print();

測試效果:

kafka執行:

./kafka-console-producer.sh --broker-list 192.168.1.77:9092 --topic sensor

更新模式

對於流式查詢,需要宣告如何在表和外部聯結器之間執行轉換。

與外部系統交換的訊息型別,由更新模式(Update Mode)指定。

  • 追加(Append)模式
    • 表只做插入操作,和外部聯結器只交換插入(Insert)訊息
  • 撤回(Retract)模式
    • 表和外部聯結器交換新增(Add)和撤回(Retract)訊息
    • 插入操作(Insert)編碼為 Add 訊息;刪除(Delete)編碼為 Retract 訊息;更新(Update)編碼為上一條的 Retract 和下一條的 Add 訊息
  • 更新插入(Upsert)模式
    • 更新和插入都被編碼為 Upsert 訊息;刪除編碼為 Delete 訊息

輸出到外部系統

輸出到Kafka

可以建立 Table 來描述 kafka 中的資料,作為輸入或輸出的 TableSink

tableEnv.connect(
    new Kafka()
    .version("0.11")
    .topic("sinkTest")
    .property("zookeeper.connect", "localhost:2181")
    .property("bootstrap.servers", "localhost:9092")
)
.withFormat( new Csv() )
.withSchema( new Schema()
.field("id", DataTypes.STRING())
.field("temp", DataTypes.DOUBLE())
).createTemporaryTable("kafkaOutputTable");
resultTable.insertInto("kafkaOutputTable");

輸出到 ES

可以建立 Table 來描述 ES 中的資料,作為輸出的 TableSink

tableEnv.connect(
    new Elasticsearch()
    .version("6")
    .host("localhost", 9200, "http")
    .index("sensor")
    .documentType("temp")
)
.inUpsertMode()
.withFormat(new Json())
.withSchema( new Schema()
.field("id", DataTypes.STRING())
.field("count", DataTypes.BIGINT())
)
.createTemporaryTable("esOutputTable");
aggResultTable.insertInto("esOutputTable");

輸出到 MySql

可以建立 Table 來描述 MySql 中的資料,作為輸入和輸出

需要先匯入依賴:

        <dependency>
            <groupid>org.apache.flink</groupid>
            <artifactid>flink-jdbc_2.12</artifactid>
            <version>1.10.1</version>
        </dependency>
String sinkDDL=
"create table jdbcOutputTable (" +
" id varchar(20) not null, " +
" cnt bigint not null " +
") with (" +
" 'connector.type' = 'jdbc', " +
" 'connector.url' = 'jdbc:mysql://localhost:3306/test', " +
" 'connector.table' = 'sensor_count', " +
" 'connector.driver' = 'com.mysql.jdbc.Driver', " +
" 'connector.username' = 'root', " +
" 'connector.password' = '123456' )";
tableEnv.sqlUpdate(sinkDDL) // 執行 DDL建立表
aggResultSqlTable.insertInto("jdbcOutputTable");

表和流的轉換

Table 轉換成 DataStream

表可以轉換為 DataStream 或 DataSet ,這樣自定義流處理或批處理程式就可以繼續在 Table API 或 SQL 查詢的結果上運行了

將錶轉換為 DataStream 或 DataSet 時,需要指定生成的資料型別,即要將 表的每一行轉換成的資料型別

表作為流式查詢的結果,是動態更新的

轉換有兩種轉換模式:追加(Append)模式和撤回(Retract)模式

  • 追加模式(Append Mode)

    DataStream<row> resultStream = tableEnv.toAppendStream(resultTable, Row.class);
    
  • 撤回模式(Retract Mode)

    • 用於任何場景。有些類似於更新模式中 Retract 模式,它只有 Insert 和 Delete 兩類操作。
    • 得到的資料會增加一個 Boolean 型別的標識位(返回的第一個欄位),用它來表示到底是新增的資料(Insert),還是被刪除的資料(Delete)
    DataStream<tuple2<boolean, row="">> aggResultStream = tableEnv
    .toRetractStream(aggResultTable , Row.class);
    

DataStream 轉換成Table

  • 對於一個 DataStream,可以直接轉換成 Table,進而方便地呼叫 Table API 做轉換操作

    DataStream<sensorreading> dataStream = ...
    Table sensorTable = tableEnv.fromDataStream(dataStream);
    
  • 預設轉換後的 Table schema 和 DataStream 中的欄位定義一一對應,也可以單獨指定出來

    DataStream<sensorreading> dataStream = ...
    Table sensorTable = tableEnv.fromDataStream(dataStream,
    "id, timestamp as ts, temperature");
    

建立臨時檢視

基於 DataStream 建立臨時檢視:

tableEnv.createTemporaryView("sensorView", dataStream);
tableEnv.createTemporaryView("sensorView",  dataStream, "id, temperature, timestamp as ts");

基於 Table 建立臨時檢視:

tableEnv.createTemporaryView("sensorView", sensorTable);

檢視執行計劃

Table API 提供了一種機制來解釋計算表的邏輯和優化查詢計劃

檢視執行計劃,可以通過 TableEnvironment.explain(table) 方法或TableEnvironment.explain() 方法完成,返回一個字串,描述三個計劃

  • 優化的邏輯查詢計劃
  • 優化後的邏輯查詢計劃
  • 優化後的邏輯查詢計劃
String explaination = tableEnv.explain(resultTable);
System.out.println(explaination);

動態表

動態表是 Flink 對流資料的 Table API 和 SQL 支援的核心概念

與表示批處理資料的靜態表不同,動態表是隨時間變化的

持續查詢(Continuous Query):動態表可以像靜態的批處理表一樣進行查詢,查詢一個動態表會產生持續查詢(Continuous Query),連續查詢永遠不會終止,並會生成另一個動態表,查詢會不斷更新其動態結果表,以反映其動態輸入表上的更改

流式表查詢的處理過程:

  1. 流被轉換為動態表
  2. 對動態表計算連續查詢,生成新的動態表
  3. 生成的動態表被轉換回流

將流轉換成動態表:

持續查詢:持續查詢會在動態表上做計算處理,並作為結果生成新的動態表

將動態錶轉換成 DataStream:

時間特性

Table 可以提供一個邏輯上的時間欄位,用於在表處理程式中,指示時間和訪問相應的時間戳。時間屬性,可以是每個表schema的一部分。一旦定義了時間屬性,它就可以作為一個欄位引用,並且可以在基於時間的操作中使用。時間屬性的行為類似於常規時間戳,可以訪問,並且進行計算。

處理時間

定義處理時間:由 DataStream 轉換成表時指定

  • 在定義Schema期間,可以使用.proctime,指定欄位名定義處理時間欄位。

  • 這個proctime屬性只能通過附加邏輯欄位,來擴充套件物理schema。因此,只能在schema定義的末尾定義它

    Table sensorTable = tableEnv.fromDataStream(dataStream, 
    "id, temperature, timestamp, pt.proctime");
    

程式碼測試:(部分程式碼省略)

        StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, settings);
        Table table = tabEnv.fromDataStream(dataStream, "id, temperature, timestamp, pt.proctime");
        tabEnv.toAppendStream(table, Row.class).print("row");
        env.execute();

測試結果:

  • 定義 Table Schema 時指定(謹慎使用)

    .withSchema(new Schema()
    .field("id", DataTypes.STRING())
    .field("timestamp", DataTypes.BIGINT())
    .field("temperature", DataTypes.DOUBLE())
    .field("pt", DataTypes.TIMESTAMP(3))
    .proctime()
    )
    
  • 在建立表的 DDL 中定義

    String sinkDDL =
    "create table dataTable (" +
    " id varchar(20) not null, " +
    " ts bigint, " +
    " temperature double, " +
    " pt AS PROCTIME() " +
    ") with (" +
    " 'connector.type' = 'filesystem', " +
    " 'connector.path' = '/sensor.txt', " +
    " 'format.type' = 'csv')";
    tableEnv.sqlUpdate(sinkDDL);
    

事件時間

事件時間語義,允許表處理程式根據每個記錄中包含的時間生成結果。這樣即使在有亂序事件或者延遲事件時,也可以獲得正確的結果。為了處理無序事件,並區分流中的準時和遲到事件;Flink 需要從事件資料中,提取時間戳,並用來推進事件時間的進展。

事件時間由 DataStream 轉換成表時指定。

  • 在 DataStream 轉換成 Table,使用 .rowtime 可以定義事件時間屬性

    //別忘記設定事件時間語義
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
    // 將 DataStream轉換為 Table,並指定時間欄位
    Table sensorTable = tableEnv.fromDataStream(dataStream, 
    "id, timestamp.rowtime, temperature");
    // 或者,直接追加時間欄位
    Table sensorTable = tableEnv.fromDataStream(dataStream, 
    " id, temperature, timestamp, rt.rowtime");
    

    程式碼:

            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            DataStream<string> inputStream = env.readTextFile("D:\\project\\flink-demo\\src\\main\\resources\\sensor.txt");
            DataStream<sensorreading> dataStream = inputStream.map((str) -> {
                String[] split = str.split(" ");
                return new SensorReading(split[0], Long.parseLong(split[1]), Double.parseDouble(split[2]));
            }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<sensorreading>(Time.seconds(2)) {
                @Override
                public long extractTimestamp(SensorReading element) {
                    return element.getTimestamp() * 1000;
                }
            });
            EnvironmentSettings settings = EnvironmentSettings.newInstance()
                    .useOldPlanner() //基於老版本planner的流處理
                    .inStreamingMode()//流環境
                    .build();
            StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, settings);
    
            Table sensorTable = tabEnv.fromDataStream(dataStream,
                    "id, timestamp as ts, temperature, timestamp.rowtime as rt");
    //        Table table = tabEnv.fromDataStream(dataStream, "id, temperature, timestamp, pt.proctime");
            tabEnv.toAppendStream(sensorTable, Row.class).print("row");
            sensorTable.printSchema();
            env.execute();
    

    測試:

  • 定義 Table Schema 時指定

    .withSchema(new Schema()
                .field("id", DataTypes.STRING())
                .field("timestamp", DataTypes.BIGINT())
                .rowtime(
                    new Rowtime()
                    .timestampsFromField("timestamp") // 從欄位中提取時間戳
                    .watermarksPeriodicBounded(1000) // watermark延遲1秒
    )
    .field("temperature", DataTypes.DOUBLE())
    )
    
  • 在建立表的 DDL 中定義

    String sinkDDL=
    "create table dataTable (" +
    " id varchar(20) not null, " +
    " ts bigint, " +
    " temperature double, " +
    " rt AS TO_TIMESTAMP( FROM_UNIXTIME(ts) ), " +
    " watermark for rt as rt - interval '1' second" +
    ") with (" +
    " 'connector.type' = 'filesystem', " +
    " 'connector.path' = '/sensor.txt', " +
    " 'format.type' = 'csv')";
    tableEnv.sqlUpdate(sinkDDL);
    

視窗

時間語義,要配合視窗操作才能發揮作用。

在 Table API 和 SQL 中,主要有兩種視窗:

  • Group Windows(分組視窗):根據時間或行計數間隔,將行聚合到有限的組(Group)中,並對每個組的資料
  • Over Windows:針對每個輸入行,計算相鄰行範圍內的聚合

分組視窗

Group Windows 是使用 window(w:GroupWindow)子句定義的,並且必須由as子句指定一個別名。

為了按視窗對錶進行分組,視窗的別名必須在 group by 子句中,像常規的分組欄位一樣引用。

Table table = input
    .window([w: GroupWindow] as "w") // 定義視窗,別名為 w
    .groupBy("w, a") // 按照欄位 a和視窗 w分組
    .select("a, b.sum"); // 聚合

Table API 提供了一組具有特定語義的預定義 Window 類,這些類會被轉換為底層 DataStream 或 DataSet 的視窗操作。

滾動視窗(Tumbling windows)

滾動視窗要用 Tumble 類來定義。

// Tumbling Event-time Window
.window(Tumble.over("10.minutes").on("rowtime").as("w"))
// Tumbling Processing-time Window
.window(Tumble.over("10.minutes").on("proctime").as("w"))
// Tumbling Row-count Window
.window(Tumble.over("10.rows").on("proctime").as("w"))

滑動視窗(Sliding windows)

滑動視窗要用 Slide 類來定義

// Sliding Event-time Window
.window(Slide.over("10.minutes").every("5.minutes").on("rowtime").as("w"))
// Sliding Processing-time window 
.window(Slide.over("10.minutes").every("5.minutes").on("proctime").as("w"))
// Sliding Row-count window
.window(Slide.over("10.rows").every("5.rows").on("proctime").as("w"))

會話視窗(Session windows)

會話視窗要用 Session 類來定義。

// Session Event-time Window
.window(Session.withGap("10.minutes").on("rowtime").as("w"))
// Session Processing-time Window
.window(Session.withGap("10.minutes").on(“proctime").as("w"))

SQL 中的 Group Windows

Group Windows 定義在 SQL 查詢的 Group By 子句中.

  • TUMBLE(time_attr, interval): 定義一個滾動視窗,第一個引數是時間欄位,第二個引數是視窗長度
  • HOP(time_attr, interval, interval):定義一個滑動視窗,第一個引數是時間欄位,第二個引數是視窗滑動步長,第三個是視窗長度
  • SESSION(time_attr, interval):定義一個會話視窗,第一個引數是時間欄位,第二個引數是視窗間隔

舉例:

Table resultSqlTable = tableEnv.sqlQuery("select id, count(id) as cnt, avg(temp) as avgTemp, tumble_end(rt, interval '10' second) " +
                                         "from sensor group by id, tumble(rt, interval '10' second)");

Over Windows

Over window 聚合是標準 SQL 中已有的(over 子句),可以在查詢的 SELECT 子句中定義。Over window 聚合,會針對每個輸入行,計算相鄰行範圍內的聚合。Over windows 使用 window(w:overwindows*)子句定義,並在 select()方法中通過別名來引用。

Table table = input
.window([w: OverWindow] as "w")
.select("a, b.sum over w, c.min over w");

Table API 提供了 Over 類,來配置 Over 視窗的屬性。

無界 Over Windows

可以在事件時間或處理時間,以及指定為時間間隔、或行計數的範圍內,定義 Over windows。無界的 over window 是使用常量指定的。

// 無界的事件時間 over window
.window(Over.partitionBy("a").orderBy("rowtime").preceding(UNBOUNDED_RANGE).as("w"))
//無界的處理時間 over window
.window(Over.partitionBy("a").orderBy("proctime").preceding(UNBOUNDED_RANGE).as("w"))
// 無界的事件時間 Row-count over window
.window(Over.partitionBy("a").orderBy("rowtime").preceding(UNBOUNDED_ROW).as("w"))
//無界的處理時間 Row-count over window
.window(Over.partitionBy("a").orderBy("proctime").preceding(UNBOUNDED_ROW).as("w"))

有界 Over Windows

有界的 over window 是用間隔的大小指定的

// 有界的事件時間 over window
.window(Over.partitionBy("a").orderBy("rowtime").preceding("1.minutes").as("w"))
// 有界的處理時間 over window
.window(Over.partitionBy("a").orderBy("proctime").preceding("1.minutes").as("w"))
// 有界的事件時間 Row-count over window
.window(Over.partitionBy("a").orderBy("rowtime").preceding("10.rows").as("w"))
// 有界的處理時間 Row-count over window
.window(Over.partitionBy("a").orderBy("procime").preceding("10.rows").as("w"))

SQL 中的 Over Windows

用 Over 做視窗聚合時,所有聚合必須在同一視窗上定義,也就是說必須是相同的分割槽、排序和範圍。目前僅支援在當前行範圍之前的視窗。ORDER BY 必須在單一的時間屬性上指定。

SELECT COUNT(amount) OVER (
    PARTITION BY user
    ORDER BY proctime
    ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
FROM Orders

舉例:

        Table overSqlResult = tableEnv.sqlQuery("select id, rt, count(id) over ow, avg(temp) over ow " +
                " from sensor " +
                " window ow as (partition by id order by rt rows between 2 preceding and current row)");

函式

系統內建函式

Flink Table API 和 SQL 為使用者提供了一組用於資料轉換的內建函式。SQL 中支援的很多函式,Table API 和 SQL 都已經做了實現。

使用者自定義函式(UDF)

使用者定義函式(User-defined Functions,UDF)是一個重要的特性,它們顯著地擴充套件了查詢的表達能力。

在大多數情況下,使用者定義的函式必須先註冊,然後才能在查詢中使用。

函式通過呼叫 registerFunction()方法在 TableEnvironment 中註冊。當用戶定義的函式被註冊時,它被插入到 TableEnvironment 的函式目錄中,這樣Table API 或 SQL 解析器就可以識別並正確地解釋它。

標量函式(Scalar Functions)

使用者定義的標量函式,可以將0、1或多個標量值,對映到新的標量值。

為了定義標量函式,必須在 org.apache.flink.table.functions 中擴充套件基類 Scalar Function,並實現(一個或多個)求值(eval)方法。

標量函式的行為由求值方法決定,求值方法必須公開宣告並命名為 eval。

舉例:

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<string> inputStream = env.readTextFile("D:\\project\\flink-demo\\src\\main\\resources\\sensor.txt");
        DataStream<sensorreading> dataStream = inputStream.map((str) -> {
            String[] split = str.split(" ");
            return new SensorReading(split[0], Long.parseLong(split[1]), Double.parseDouble(split[2]));
        });
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useOldPlanner() //基於老版本planner的流處理
                .inStreamingMode()//流環境
                .build();
        StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, settings);

        MyFunction myFunction = new MyFunction(10);
        tabEnv.registerFunction("myFun", myFunction);

        Table sensorTable = tabEnv.fromDataStream(dataStream,"id, timestamp as ts, temperature");

        Table select = sensorTable.select("myFun(id) as fun");

        tabEnv.toAppendStream(select, Row.class).print();
        env.execute();
    }

    //實現自定義的ScalarFunction
    public static class MyFunction extends ScalarFunction {
        private int factor = 13;

        public MyFunction(int factor) {
            this.factor = factor;
        }
        public int eval(String s) {
            return s.hashCode() * factor;
        }
    }

結果:

表函式(Table Functions)

使用者定義的表函式,也可以將0、1或多個標量值作為輸入引數;與標量函式不 的是,它可以返回任意數量的行作為輸出,而不是單個值。

為了定義一個表函式,必須擴充套件 org.apache.flink.table.functions 中的基類 TableFunction 並實現(一個或多個)求值方法

表函式的行為由其求值方法決定,求值方法必須是 public 的,並命名為 eval。

    public static class Split extends TableFunction<tuple2<string, integer="">> {
        private String separator = ",";
        public Split(String separator) {
            this.separator = separator;
        }
        public void eval(String str) {
            for (String s : str.split(separator)) {
                collect(new Tuple2<>(s, s.length()));
            }
        }
    }
Split split = new Split("_");
tableEnv.registerFunction("split", split);
Table resultTable = sensorTable
    .joinLateral("split(id) as (word, length)")
    .select("id, ts, word, length");

聚合函式(Aggregate Functions)

使用者自定義聚合函式(User-Defined Aggregate Functions,UDAGGs)可以把一個表中的資料,聚合成一個標量值。

使用者定義的聚合函式,是通過繼承 AggregateFunction 抽象類實現的。

AggregateFunction 的工作原理如下:

  • 首先,它需要一個累加器(Accumulator),用來儲存聚合中間結果的資料結構;可以通過呼叫 createAccumulator() 方法建立空累加器。
  • 隨後,對每個輸入行呼叫函式的 accumulate() 方法來更新累加器。
  • 處理完所有行後,將呼叫函式的 getValue() 方法來計算並返回最終結果。

測試:

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<string> inputStream = env.readTextFile("D:\\project\\flink-demo\\src\\main\\resources\\sensor.txt");
        DataStream<sensorreading> dataStream = inputStream.map((str) -> {
            String[] split = str.split(" ");
            return new SensorReading(split[0], Long.parseLong(split[1]), Double.parseDouble(split[2]));
        });
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        // 3. 將流轉換成表
        Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");

        // 4. 自定義聚合函式,求當前感測器的平均溫度值
        // 4.1 table API
        AvgTemp avgTemp = new AvgTemp();

        // 需要在環境中註冊UDF
        tableEnv.registerFunction("avgTemp", avgTemp);
        Table resultTable = sensorTable
                .groupBy("id")
                .aggregate( "avgTemp(temp) as avgtemp" )
                .select("id, avgtemp");

        // 4.2 SQL
        tableEnv.createTemporaryView("sensor", sensorTable);
        Table resultSqlTable = tableEnv.sqlQuery("select id, avgTemp(temp) " +
                " from sensor group by id");

        // 列印輸出
        tableEnv.toRetractStream(resultTable, Row.class).print("result");
        tableEnv.toRetractStream(resultSqlTable, Row.class).print("sql");

        env.execute();
    }


    // 實現自定義的AggregateFunction
    public static class AvgTemp extends AggregateFunction<double, tuple2<double,="" integer="">>{
        @Override
        public Double getValue(Tuple2<double, integer=""> accumulator) {
            return accumulator.f0 / accumulator.f1;
        }

        @Override
        public Tuple2<double, integer=""> createAccumulator() {
            return new Tuple2<>(0.0, 0);
        }

        // 必須實現一個accumulate方法,來資料之後更新狀態
        public void accumulate( Tuple2<double, integer=""> accumulator, Double temp ){
            accumulator.f0 += temp;
            accumulator.f1 += 1;
        }
    }

結果:

表聚合函式(Table Aggregate Functions)

使用者定義的表聚合函式(User-Defined Table Aggregate Functions, UDTAGGs),可以把一個表中資料,聚合為具有多行和多列的結果表。

使用者定義表聚合函式,是通過繼承 TableAggregateFunction 抽象類來實現的。

TableAggregateFunction 的工作原理如下:

  • 首先,它同樣需要一個累加器(Accumulator),它是儲存聚合中間結果的資料結構。通過呼叫 createAccumulator() 方法可以建立空累加器。
  • 隨後,對每個輸入行呼叫函式的 accumulate() 方法來更新累加器。
  • 處理完所有行後,將呼叫函式的 emitValue() 方法來計算並返回最終結果。
    </double,></double,></double,></double,></tuple2<string,></tuple2<boolean,>