1. 程式人生 > >Flink Table API&SQL的概念和通用API

Flink Table API&SQL的概念和通用API

開發十年,就只剩下這套架構體系了! >>>   

Table API和SQL通過join API整合在一起,這個join API的核心概念是Table,Table可以作為查詢的輸入和輸出。這篇文件展示了使用Table API和SQL查詢的程式的通用結構,如何註冊一個Table,如何查詢一個Table以及如何將資料發給Table。

Table API和SQL查詢程式的結構

所有批處理和流處理的Table API、SQL程式都有如下相同的模式,下面例子的程式碼展示了Table API和SQL程式的通用結構:

// 對於批處理程式來說使用 ExecutionEnvironment 來替換 StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 建立一個TableEnvironment
// 對於批處理程式來說使用 BatchTableEnvironment 替換 StreamTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// 註冊一個 Table
tableEnv.registerTable("table1", ...)            // 或者
tableEnv.registerTableSource("table2", ...);     // 或者
tableEnv.registerExternalCatalog("extCat", ...);

// 從Table API的查詢中建立一個Table
Table tapiResult = tableEnv.scan("table1").select(...);
// 從SQL查詢中建立一個Table
Table sqlResult  = tableEnv.sql("SELECT ... FROM table2 ... ");

// 將Table API 種的結果 Table 發射到TableSink中 , SQL查詢也是一樣的
tapiResult.writeToSink(...);

// 執行
env.execute();

注意:Table API 和 SQL查詢可以輕易地進行整合並嵌入到DataStream或者DataSet程式中,請參考Integration With DataStream and DataSet API部分來了解DataStream和DataSet如何轉換成Table及Table如何轉換成DataStream和DataSet。

建立一個TableEnvironment

TableEnvironment是Table API和SQL整合的核心概念,它主要負責:
  1、在內部目錄中註冊一個Table
  2、註冊一個外部目錄
  3、執行SQL查詢
  4、註冊一個使用者自定義函式(標量、表及聚合)
  5、將DataStream或者DataSet轉換成Table
  6、持有ExecutionEnvironment或者StreamExecutionEnvironment的引用
一個Table總是會繫結到一個指定的TableEnvironment中,相同的查詢不同的TableEnvironment是無法通過join、union合併在一起。

TableEnvironment可以通過呼叫帶有引數StreamExecutionEnvironment或者ExecutionEnvironment和一個可選引數TableConfig的靜態方法TableEnvironment.getTableEnvironment()來建立。TableConf可以用來配置TableEnvironment或者自定義查詢優化器和翻譯過程(參考查詢優化器)

// ***************
// STREAMING QUERY
// ***************
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
// 為streaming查詢建立一個 TableEnvironment
StreamTableEnvironment sTableEnv = TableEnvironment.getTableEnvironment(sEnv);

// ***********
// BATCH QUERY
// ***********
ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
// 為批查詢建立一個 TableEnvironment
BatchTableEnvironment bTableEnv = TableEnvironment.getTableEnvironment(bEnv);
// ***************
// STREAMING QUERY
// ***************
val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
// 為流查詢建立一個 TableEnvironment
val sTableEnv = TableEnvironment.getTableEnvironment(sEnv)

// ***********
// BATCH QUERY
// ***********
val bEnv = ExecutionEnvironment.getExecutionEnvironment
// 為批查詢建立一個 TableEnvironment
val bTableEnv = TableEnvironment.getTableEnvironment(bEnv)

在Catalog(目錄)中註冊一個Table

TableEnvironment有一個在內部通過表名組織起來的表目錄,Table API或者SQL查詢可以訪問註冊在目錄中的表,並通過名稱來引用它們。
TableEnvironment允許通過各種源來註冊一個表:
  1、一個已存在的Table物件,通常是Table API或者SQL查詢的結果
  2、TableSource,可以訪問外部資料如檔案、資料庫或者訊息系統
  3、DataStream或者DataSet程式中的DataStream或者DataSet

將DataStream或者DataSet註冊為一個表將在Integration With DataStream and DataSet API中討論。

註冊一個Table

一個Table可以在TableEnvironment中按照下面程式註冊:

// 獲取一個 StreamTableEnvironment,  BatchTableEnvironment也是同樣的方法
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// Table 是簡單的投影查詢的結果 
Table projTable = tableEnv.scan("X").project(...);

// 將 Table projTable 註冊為表 "projectedX"
tableEnv.registerTable("projectedTable", projTable);
// 獲取一個TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// Table 是簡單的投影查詢的結果 
val projTable: Table = tableEnv.scan("X").project(...)

// 將 Table projTable 註冊為表 "projectedX"
tableEnv.registerTable("projectedTable", projTable)

注意:一個註冊的Table被當做是與關係型資料庫中的檢視類似,即定義Table的查詢不會被優化,但是當其他查詢引用到已註冊的Table時會被內聯。如果多個查詢引用同一個已註冊的Table,這個Table會跟每個查詢內聯並進行多次執行,即:已註冊的Table的結果不會共享。

註冊一個TableSource

TableSource可以訪問儲存在外部儲存系統如資料庫系統(MySQL、HBase...),指定編碼格式的檔案(CSV, Apache [Parquet, Avro, ORC],...)或者訊息系統(Apache Kafka,RabbitMQ,...)中的資料。

Flink的目標是為通用的資料格式和儲存系統提供TableSource,請參考Table Sources和Sinks頁來了解Flink所支援的TableSource列表及如何自定義一個TableSource。

一個TableSource可以在TableEnvironment中按如下方式來定義:

// 獲取一個StreamTableEnvironment, 同樣適用於BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// 建立一個 TableSource
TableSource csvSource = new CsvTableSource("/path/to/file", ...);

// 將TableSource註冊為表 "CsvTable"
tableEnv.registerTableSource("CsvTable", csvSource);
// 獲取一個 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 建立一個TableSource
val csvSource: TableSource = new CsvTableSource("/path/to/file", ...)

// 將 TableSource 註冊為表 "CsvTable"
tableEnv.registerTableSource("CsvTable", csvSource)

註冊一個外部Catalog(目錄)

一個外部目錄提供了關於外部資料庫和表的資訊如:它們的名稱、模式、統計及如何訪問儲存在外部資料庫、表和檔案中的資料。

一個外部目錄可以通過實現ExternalCatalog介面來建立並在TableEnvironment中註冊,如下:

// 獲取一個 StreamTableEnvironment, 同樣適用於 BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// 建立一個外部catalog
ExternalCatalog catalog = new InMemoryExternalCatalog();

// 註冊 ExternalCatalog
tableEnv.registerExternalCatalog("InMemCatalog", catalog);
// 獲取一個 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 建立一個 catalog
val catalog: ExternalCatalog = new InMemoryExternalCatalog

// 註冊 ExternalCatalog
tableEnv.registerExternalCatalog("InMemCatalog", catalog)

一旦在TableEnvironment中註冊之後,所有定義在ExternalCatalog中的表都可以通過指定全路徑如:catalog.database.table 在Table API或者SQL查詢來訪問。

目前,Flink提供InMemoryExternalCatalog來做demo或者測試。然而,ExternalCatalog介面還可以被用來連線HCatalog或者Metastore到Table API。

查詢一個Table

Table API

Table API是一個Scala和Java的語言整合查詢API,與SQL相反,查詢並不指定為字串而是根據主機語言一步一步的構建。

Table API是基於Table類來的,Table類代表了一個流或者批表,並且提供方法來使用關係型操作。這些方法返回一個新的Table物件,這個Table物件代表著輸入的Table應用關係型操作後的結果。一些關係型操作是由多個方法呼叫組成的如:table.groupBy(...).select(), 其中groupBy(...)指定了table的分組,而select(...)則是table分組的對映。

Table API文件描述了streaming和batch表所支援的所有Table API操作。

下面的例子展示了一個簡單的Table API聚合查詢:

// 獲取一個 StreamTableEnvironment, 同樣適用於 BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// 註冊一個名叫 Orders 的表

// 掃描註冊的 Orders 表
Table orders = tableEnv.scan("Orders");
// 計算所有來自法國的客戶的收入
Table revenue = orders
  .filter("cCountry === 'FRANCE'")
  .groupBy("cID, cName")
  .select("cID, cName, revenue.sum AS revSum");

// 發射或者轉換一個 Table
// 執行查詢
// 獲取一個 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 註冊一個名叫 Orders 的表

// 掃描已註冊的 Orders 表
Table orders = tableEnv.scan("Orders")
// 計算所有來自法國偶的客戶的收入
Table revenue = orders
  .filter('cCountry === "FRANCE")
  .groupBy('cID, 'cName)
  .select('cID, 'cName, 'revenue.sum AS 'revSum)

// 發射或者轉換一個Table
// 執行查詢

注意:Scala Table API使用Scala的符號在引用表屬性時,以'`'開始,Table API使用Scala的隱式轉換,為了使用Scala的隱式轉換,請確保匯入org.apache.flink.api.scala._org.apache.flink.table.api.scala._

SQL

Flink的SQL整合是基於Apache Calcite的,Apache Calcite實現了標準的SQL,SQL查詢被指定為常規字串。

SQL文件描述了Flink對流和批表的SQL支援。
下面的例子展示瞭如何指定一個查詢並返回一個Table結果;

// 獲取一個 StreamTableEnvironment, 同樣適用於BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// 註冊一個名叫Orders 的表

// 計算所有來自法國的客戶的收入
Table revenue = tableEnv.sql(
    "SELECT cID, cName, SUM(revenue) AS revSum " +
    "FROM Orders " +
    "WHERE cCountry = 'FRANCE' " +
    "GROUP BY cID, cName"
  );

// 發射或者轉換一個Table
// 執行查詢
// 獲取一個 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

//註冊一個名叫 Orders的表

// 計算所有來自法國的客戶的收入
Table revenue = tableEnv.sql(""" 
  |SELECT cID, cName, SUM(revenue) AS revSum
  |FROM Orders
  |WHERE cCountry = 'FRANCE'
  |GROUP BY cID, cName
  """.stripMargin)

// 發射或者轉換 Table
// 執行查詢

混合使用Table API和SQL

Table API和SQL查詢可以很容易地合併因為它們都返回Table物件:
  1、Table API查詢可以基於SQL查詢結果的Table來進行
  2、SQL查詢可以基於Table API查詢的結果來定義

發射一個Table

為了發射一個Table,可以將其寫入一個TableSink中,TableSink 是支援各種檔案格式(如:CSV, Apache Parquet, Apache Avro)、儲存系統(如:JDBC, Apache HBase, Apache Cassandra, Elasticsearch)或者訊息系統(如:Apache Kafka,RabbitMQ)的通用介面。

一個批Table只能寫入BatchTableSink中,而流Table需要一個AppendStreamTableSinkRetractStreamTableSink或者UpsertStreamTableSink

請參考Table Sources & Sinks文件來了解更多可用sink的資訊和如何實現一個自定義的TableSink。

// 獲取一個StreamTableEnvironment, 同樣適用於 BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// 使用Table API和/或SQL查詢獲取一個 Table
Table result = ...

// 建立一個TableSink
TableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|");

// 將結果Table寫入TableSink中
result.writeToSink(sink);

// 執行程式
// 獲取一個TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 使用Table API和/或SQL查詢獲取一個 Table
val result: Table = ...

//建立一個 TableSink
val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|")

// 將結果 Table寫入TableSink中
result.writeToSink(sink)

// 執行程式

翻譯和執行一個查詢

Table API和SQL查詢根據輸入是流還是批翻譯成DataStream或者DataSet,查詢內部表示為一個邏輯查詢計劃,並分兩個階段進行翻譯:
  1、優化邏輯計劃
  2、翻譯成一個DataStream或者DataSet程式

Table API或者SQL查詢會在下面情況下觸發:
  當呼叫Table.writeToSink()時,Table會發射到TableSink中
  Table轉換DataStream或者DataSet時(參考與DataStream和DataSet API整合)
一旦翻譯,Table API或者SQL查詢就會像常規DataStream或DataSet處理一樣,並且當StreamExecutionEnvironment.execute()或者ExecutionEnvironment.execute()呼叫時執行。

與DataStream和DataSet API整合

Table API和SQL查詢可以很容易地進行整合並嵌入到DataStreamDataSet程式中。例如:我們可以查詢一個外部表(如:來自關係型資料庫的表)、做一些預處理,如過濾、對映、聚合或者與元資料關聯,然後使用DataStream或者DataSet API(及其他基於這些API的庫,如CEP或Gelly)進行進一步處理。同樣,Table API或者SQL查詢也可以應用於DataStream或者DataSet程式的結果中。

這種互動可以通過將DataStream或者DataSet轉換成一個Table及將Table轉換成DataStream或者DataSet來實現。在本節,我們將描述這些轉換是如何完成的。

Scala 隱式轉換

Scala Table API為DataSet、DataStream和Table類提供了隱式轉換功能。這些轉換可以通過匯入Scala DataStream API中的org.apache.flink.table.api.scala._org.apache.flink.api.scala._包來啟用。

註冊一個DataStream或者DataSet為Table

一個DataStream或者DataSet可以在TableEnvironment中註冊為Table,表的結果模式根據註冊的DataStream或者DataSet的資料型別來定。請參考資料型別對映到表模式來了解更詳細的資訊。

// 獲取 StreamTableEnvironment
// 註冊一個DataSet 到BatchTableEnvironment也是等效的
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

DataStream<Tuple2<Long, String>> stream = ...

// 註冊DataStream 為表  "myTable" ,並有兩個欄位 "f0", "f1"
tableEnv.registerDataStream("myTable", stream);

// 註冊 DataStream 為表 "myTable2" 並有兩個欄位 "myLong", "myString"
tableEnv.registerDataStream("myTable2", stream, "myLong, myString");
// 獲取 TableEnvironment 
// 註冊一個 DataSet 是等價的
val tableEnv = TableEnvironment.getTableEnvironment(env)

val stream: DataStream[(Long, String)] = ...

// 註冊 DataStream 為表 "myTable" 並有兩個欄位 "f0", "f1"
tableEnv.registerDataStream("myTable", stream)

// 註冊 DataStream 為 "myTable2" 並有兩個欄位 "myLong", "myString"
tableEnv.registerDataStream("myTable2", stream, 'myLong, 'myString)

將Table轉換為DataStream或者DataSet

Table可以轉換為DataStream或者DataSet,這樣的話,自定義的DataStream或者DataSet程式就可以基於Table API或者SQL查詢的結果來執行了。

當將一個Table轉換為DataStream或者DataSet時,你需要指定生成的DataStream或者DataSet的資料型別,即需要轉換表的行的資料型別,通常最方便的轉換型別是Row,下面列表概述了不同選項的功能:
  1、Row:欄位通過位置對映、可以是任意數量欄位,支援空值,非型別安全訪問
  2、POJO:欄位通過名稱(POJO欄位作為Table欄位時,必須命名)對映,可以是任意數量欄位,支援空值,型別安全訪問
  3、Case Class:欄位通過位置對映,不支援空值,型別安全訪問
  4、Tuple:欄位通過位置對映,不得多於22(Scala)或者25(Java)個欄位,不支援空值,型別安全訪問
  5、Atomic Type:Table必須有一個欄位,不支援空值,型別安全訪問。

將Table轉換為DataStream

流式查詢的結果Table會被動態地更新,即每個新的記錄到達輸入流時結果就會發生變化。因此,轉換此動態查詢的DataStream需要對錶的更新進行編碼。

有兩種模式來將Table轉換為DataStream:
  1、Append Mode:這種模式只適用於當動態表僅由INSERT更改修改時,即僅附加,之前發射的結果不會被更新。
  2、Retract Mode:始終都可以使用此模式,它使用一個boolean標識來編碼INSERT和DELETE更改。

// 獲取一個 StreamTableEnvironment. 
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

//  有兩個欄位(String name, Integer age)的Table
Table table = ...

// 通過指定類將Table轉換為Row的Append DataStream
DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);

// 通過一個TypeInformation將Table轉換為Tuple2<String, Integer> 型別的Append DataStream  
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
  Types.STRING(),
  Types.INT());
DataStream<Tuple2<String, Integer>> dsTuple = 
  tableEnv.toAppendStream(table, tupleType);

// 將Table轉換為Row的react形式的DataStream
//   一個reactDataStream的型別X為 DataStream<Tuple2<Boolean, X>>. 
//   boolean欄位指定了更改的型別. 
//   True 是 INSERT, false 是 DELETE.
DataStream<Tuple2<Boolean, Row>> retractStream = 
  tableEnv.toRetractStream(table, Row.class);
// get TableEnvironment. 
// registration of a DataSet is equivalent
val tableEnv = TableEnvironment.getTableEnvironment(env)

// Table with two fields (String name, Integer age)
val table: Table = ...

// convert the Table into an append DataStream of Row
val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)

// convert the Table into an append DataStream of Tuple2[String, Int]
val dsTuple: DataStream[(String, Int)] dsTuple = 
  tableEnv.toAppendStream[(String, Int)](table)

// convert the Table into a retract DataStream of Row.
//   A retract stream of type X is a DataStream[(Boolean, X)]. 
//   The boolean field indicates the type of the change. 
//   True is INSERT, false is DELETE.
val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)

注意:有關動態表及其屬性的詳細討論在Streaming Queries文件中給出。

將Table轉換為DataSet

Table可以按照如下方式轉換為DataSet:

// 獲取 BatchTableEnvironment
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// 有兩個欄位(String name, Integer age)的Table
Table table = ...

// 通過指定類將Table轉換為Row型別的DataSet
DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class);

// 通過TypeInformation 將Table轉換為Tuple2<String, Integer>型別的DataSet
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
  Types.STRING(),
  Types.INT());
DataStream<Tuple2<String, Integer>> dsTuple = 
  tableEnv.toAppendStream(table, tupleType);
// 獲取 TableEnvironment 
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 有兩個欄位(String name, Integer age)的Table
val table: Table = ...

// 將Table轉換為Row型別的DataSet
val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)

// 將Table轉換為Tuple2[String, Int]型別的DataSet
val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)

資料型別對映到表模式

Flink的DataStream和DataSet API支援多種資料型別,如Tuple,POJO, case class及原始資料型別。接下來我們描述Table API如何將這些型別轉換為內部行表示及展示將DataStream轉換為Table的例子。

原子型別

Flink將原生型別(如:Integer, Double, String)或者通用型別(不能再被分析或者分解的型別)視為原子型別,一個原子型別的DataStream或者DataSet可以轉換為只有一個屬性的Table,屬性的型別根據原子型別推算,並且必須得指定屬性的名稱。

// 獲取一個 StreamTableEnvironment, 同樣原理適用於 BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

DataStream<Long> stream = ...
// 將 DataStream轉換為具有屬性"myLong"的Table
Table table = tableEnv.fromDataStream(stream, "myLong");
// 獲取一個 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

val stream: DataStream[Long] = ...
// 將 DataStream 轉換為具有屬性'myLong的Table
val table: Table = tableEnv.fromDataStream(stream, 'myLong)

Tuple(Java和Scala都支援)和Case Class(僅Scala支援)

Flink支援Scala內建的Tuple和Flink為Java提供的Tuple,DataStream和DataSet型別的Tuple都可以被轉換為表。欄位可以通過為所有欄位(通過位置來對映)提供的名稱來重新命名,如果沒有為欄位指定名稱的話,就會採用預設的欄位名。

// 獲取一個 StreamTableEnvironment, 同樣適用於BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

DataStream<Tuple2<Long, String>> stream = ...

// 將 DataStream為具有欄位名為"myLong", "myString"的Table
Table table1 = tableEnv.fromDataStream(stream, "myLong, myString");

// 將 DataStream 轉換為具有預設欄位名 "f0", "f1"的 Table
Table table2 = tableEnv.fromDataStream(stream);
//獲取一個 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

val stream: DataStream[(Long, String)] = ...

// 將 DataStream 轉換為具有欄位名 'myLong, 'myString' 的Table
val table1: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)

// 將 DataStream 轉換為具有預設欄位名 '_1, '_2的Table
val table2: Table = tableEnv.fromDataStream(stream)

// 定義一個 case class
case class Person(name: String, age: Int)
val streamCC: DataStream[Person] = ...

// 將 DataStream 轉換為具有預設欄位名 'name, 'age'的Table
val tableCC1 = tableEnv.fromDataStream(streamCC)

// 將 DataStream 轉換為具有欄位名 'myName, 'myAge'的Table
val tableCC1 = tableEnv.fromDataStream(streamCC, 'myName, 'myAge)

POJO(Java 和 Scala)

Flink支援使用POJO作為複合型別,決定POJO規則的文件請參考這裡

當將一個POJO型別的DataStream或者DataSet轉換為Table而不指定欄位名稱時,Table的欄位名稱將採用JOPO原生的欄位名稱作為欄位名稱。重新命名原始的POJO欄位需要關鍵字AS,因為POJO沒有固定的順序,名稱對映需要原始名稱並且不能通過位置來完成。

//獲取一個 StreamTableEnvironment, 同樣原理適用於 BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// Person 是一個有兩個欄位"name" and "age" 的POJO
DataStream<Person> stream = ...

// 將 DataStream 轉換為有欄位 "name", "age" 的Table
Table table1 = tableEnv.fromDataStream(stream);

// 將 DataStream 轉換為有欄位 "myName", "myAge" 的Table
Table table2 = tableEnv.fromDataStream(stream, "name as myName, age as myAge");
// 獲取一個 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// Person 是一個有欄位 "name" and "age" 的POJO
val stream: DataStream[Person] = ...

// 將 DataStream 轉換為具有欄位 'name, 'age' 的Table
val table1: Table = tableEnv.fromDataStream(stream)

// 將 DataStream 轉換為具有欄位 'myName, 'myAge' 的Table
val table2: Table = tableEnv.fromDataStream(stream, 'name as 'myName, 'age as 'myAge)

Row

Row資料型別支援任意數量的欄位,並且欄位可以是null值,欄位名稱可以通過RowTypeInformation來指定或者將一個Row DataStream或者DataSet轉換為Table時(根據位置)指定。

// 獲取一個 StreamTableEnvironment, 同樣原理適用於 BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// 在`RowTypeInfo`中指定欄位"name" and "age"的Row型別DataStream
DataStream<Row> stream = ...

// 將 DataStream 轉換為具有欄位 "name", "age" 的Table
Table table1 = tableEnv.fromDataStream(stream);

// 將 DataStream 轉換為具有欄位 "myName", "myAge" 的Table
Table table2 = tableEnv.fromDataStream(stream, "myName, myAge");
// 獲取一個 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 在`RowTypeInfo`中指定欄位"name" and "age"的Row型別DataStream
val stream: DataStream[Row] = ...

// 將 DataStream 轉換為具有欄位 'name, 'age' 的Table
val table1: Table = tableEnv.fromDataStream(stream)

// 將 DataStream 轉換為具有欄位 'myName, 'myAge' 的Table
val table2: Table = tableEnv.fromDataStream(stream, 'myName, 'myAge)

查詢優化

Apache Flink使用Apache Calcite來優化和翻譯查詢,當前的查詢優化包括投影、過濾下推、子查詢去相關及各種形式的查詢重寫。Flink不去優化join的順序,但是會根據它們的順序去執行(FROM子句中表的順序或者WHERE子句中連線謂詞的順序)。

可以通過提供一個CalciteConfig物件來調整在不同階段應用的優化規則集,這個可以通過呼叫CalciteConfig.createBuilder())獲得的builder來建立,並且可以通過呼叫tableEnv.getConfig.setCalciteConfig(calciteConfig)來提供給TableEnvironment。

解析一個Table

Table API為計算一個Table提供了一個機制來解析邏輯和優化查詢計劃,這個可以通過呼叫TableEnvironment.explain(table)方法來完成,它會返回描述三個計劃的字串:
  1、關係查詢語法樹,即未優化的查詢計劃
  2、優化後的邏輯查詢計劃
  3、物理執行計劃

以下程式碼顯示了一個示例和相應的輸出:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);

DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1, "hello"));
DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1, "hello"));

Table table1 = tEnv.fromDataStream(stream1, "count, word");
Table table2 = tEnv.fromDataStream(stream2, "count, word");
Table table = table1
  .where("LIKE(word, 'F%')")
  .unionAll(table2);

String explanation = tEnv.explain(table);
System.out.println(explanation);
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)

val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
val table = table1
  .where('word.like("F%"))
  .unionAll(table2)

val explanation: String = tEnv.explain(table)
println(explanation)

輸出如下:

== Abstract Syntax Tree ==
LogicalUnion(all=[true])
  LogicalFilter(condition=[LIKE($1, 'F%')])
    LogicalTableScan(table=[[_DataStreamTable_0]])
  LogicalTableScan(table=[[_DataStreamTable_1]])

== Optimized Logical Plan ==
DataStreamUnion(union=[count, word])
  DataStreamCalc(select=[count, word], where=[LIKE(word, 'F%')])
    DataStreamScan(table=[[_DataStreamTable_0]])
  DataStreamScan(table=[[_DataStreamTable_1]])

== Physical Execution Plan ==
Stage 1 : Data Source
  content : collect elements with CollectionInputFormat

Stage 2 : Data Source
  content : collect elements with CollectionInputFormat

  Stage 3 : Operator
    content : from: (count, word)
    ship_strategy : REBALANCE

    Stage 4 : Operator
      content : where: (LIKE(word, 'F%')), select: (count, word)
      ship_strategy : FORWARD

      Stage 5 : Operator
        content : from: (count, word)
        ship_strategy : REBALANCE