Flink Table API&SQL的概念和通用API
Table API和SQL通過join API整合在一起,這個join API的核心概念是Table,Table可以作為查詢的輸入和輸出。這篇文件展示了使用Table API和SQL查詢的程式的通用結構,如何註冊一個Table,如何查詢一個Table以及如何將資料發給Table。
Table API和SQL查詢程式的結構
所有批處理和流處理的Table API、SQL程式都有如下相同的模式,下面例子的程式碼展示了Table API和SQL程式的通用結構:
// 對於批處理程式來說使用 ExecutionEnvironment 來替換 StreamExecutionEnvironment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 建立一個TableEnvironment // 對於批處理程式來說使用 BatchTableEnvironment 替換 StreamTableEnvironment StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // 註冊一個 Table tableEnv.registerTable("table1", ...) // 或者 tableEnv.registerTableSource("table2", ...); // 或者 tableEnv.registerExternalCatalog("extCat", ...); // 從Table API的查詢中建立一個Table Table tapiResult = tableEnv.scan("table1").select(...); // 從SQL查詢中建立一個Table Table sqlResult = tableEnv.sql("SELECT ... FROM table2 ... "); // 將Table API 種的結果 Table 發射到TableSink中 , SQL查詢也是一樣的 tapiResult.writeToSink(...); // 執行 env.execute();
注意:Table API 和 SQL查詢可以輕易地進行整合並嵌入到DataStream或者DataSet程式中,請參考Integration With DataStream and DataSet API部分來了解DataStream和DataSet如何轉換成Table及Table如何轉換成DataStream和DataSet。
建立一個TableEnvironment
TableEnvironment是Table API和SQL整合的核心概念,它主要負責:
1、在內部目錄中註冊一個Table
2、註冊一個外部目錄
3、執行SQL查詢
4、註冊一個使用者自定義函式(標量、表及聚合)
5、將DataStream或者DataSet轉換成Table
6、持有ExecutionEnvironment或者StreamExecutionEnvironment的引用
一個Table總是會繫結到一個指定的TableEnvironment中,相同的查詢不同的TableEnvironment是無法通過join、union合併在一起。
TableEnvironment可以通過呼叫帶有引數StreamExecutionEnvironment或者ExecutionEnvironment和一個可選引數TableConfig的靜態方法TableEnvironment.getTableEnvironment()
來建立。TableConf可以用來配置TableEnvironment或者自定義查詢優化器和翻譯過程(參考查詢優化器)
// ***************
// STREAMING QUERY
// ***************
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
// 為streaming查詢建立一個 TableEnvironment
StreamTableEnvironment sTableEnv = TableEnvironment.getTableEnvironment(sEnv);
// ***********
// BATCH QUERY
// ***********
ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
// 為批查詢建立一個 TableEnvironment
BatchTableEnvironment bTableEnv = TableEnvironment.getTableEnvironment(bEnv);
// ***************
// STREAMING QUERY
// ***************
val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
// 為流查詢建立一個 TableEnvironment
val sTableEnv = TableEnvironment.getTableEnvironment(sEnv)
// ***********
// BATCH QUERY
// ***********
val bEnv = ExecutionEnvironment.getExecutionEnvironment
// 為批查詢建立一個 TableEnvironment
val bTableEnv = TableEnvironment.getTableEnvironment(bEnv)
在Catalog(目錄)中註冊一個Table
TableEnvironment有一個在內部通過表名組織起來的表目錄,Table API或者SQL查詢可以訪問註冊在目錄中的表,並通過名稱來引用它們。
TableEnvironment允許通過各種源來註冊一個表:
1、一個已存在的Table物件,通常是Table API或者SQL查詢的結果
2、TableSource,可以訪問外部資料如檔案、資料庫或者訊息系統
3、DataStream或者DataSet程式中的DataStream或者DataSet
將DataStream或者DataSet註冊為一個表將在Integration With DataStream and DataSet API中討論。
註冊一個Table
一個Table可以在TableEnvironment中按照下面程式註冊:
// 獲取一個 StreamTableEnvironment, BatchTableEnvironment也是同樣的方法
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// Table 是簡單的投影查詢的結果
Table projTable = tableEnv.scan("X").project(...);
// 將 Table projTable 註冊為表 "projectedX"
tableEnv.registerTable("projectedTable", projTable);
// 獲取一個TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// Table 是簡單的投影查詢的結果
val projTable: Table = tableEnv.scan("X").project(...)
// 將 Table projTable 註冊為表 "projectedX"
tableEnv.registerTable("projectedTable", projTable)
注意:一個註冊的Table被當做是與關係型資料庫中的檢視類似,即定義Table的查詢不會被優化,但是當其他查詢引用到已註冊的Table時會被內聯。如果多個查詢引用同一個已註冊的Table,這個Table會跟每個查詢內聯並進行多次執行,即:已註冊的Table的結果不會共享。
註冊一個TableSource
TableSource可以訪問儲存在外部儲存系統如資料庫系統(MySQL、HBase...),指定編碼格式的檔案(CSV, Apache [Parquet, Avro, ORC],...)或者訊息系統(Apache Kafka,RabbitMQ,...)中的資料。
Flink的目標是為通用的資料格式和儲存系統提供TableSource,請參考Table Sources和Sinks頁來了解Flink所支援的TableSource列表及如何自定義一個TableSource。
一個TableSource可以在TableEnvironment中按如下方式來定義:
// 獲取一個StreamTableEnvironment, 同樣適用於BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// 建立一個 TableSource
TableSource csvSource = new CsvTableSource("/path/to/file", ...);
// 將TableSource註冊為表 "CsvTable"
tableEnv.registerTableSource("CsvTable", csvSource);
// 獲取一個 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 建立一個TableSource
val csvSource: TableSource = new CsvTableSource("/path/to/file", ...)
// 將 TableSource 註冊為表 "CsvTable"
tableEnv.registerTableSource("CsvTable", csvSource)
註冊一個外部Catalog(目錄)
一個外部目錄提供了關於外部資料庫和表的資訊如:它們的名稱、模式、統計及如何訪問儲存在外部資料庫、表和檔案中的資料。
一個外部目錄可以通過實現ExternalCatalog介面來建立並在TableEnvironment中註冊,如下:
// 獲取一個 StreamTableEnvironment, 同樣適用於 BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// 建立一個外部catalog
ExternalCatalog catalog = new InMemoryExternalCatalog();
// 註冊 ExternalCatalog
tableEnv.registerExternalCatalog("InMemCatalog", catalog);
// 獲取一個 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 建立一個 catalog
val catalog: ExternalCatalog = new InMemoryExternalCatalog
// 註冊 ExternalCatalog
tableEnv.registerExternalCatalog("InMemCatalog", catalog)
一旦在TableEnvironment中註冊之後,所有定義在ExternalCatalog中的表都可以通過指定全路徑如:catalog.database.table
在Table API或者SQL查詢來訪問。
目前,Flink提供InMemoryExternalCatalog來做demo或者測試。然而,ExternalCatalog介面還可以被用來連線HCatalog或者Metastore到Table API。
查詢一個Table
Table API
Table API是一個Scala和Java的語言整合查詢API,與SQL相反,查詢並不指定為字串而是根據主機語言一步一步的構建。
Table API是基於Table類來的,Table類代表了一個流或者批表,並且提供方法來使用關係型操作。這些方法返回一個新的Table物件,這個Table物件代表著輸入的Table應用關係型操作後的結果。一些關係型操作是由多個方法呼叫組成的如:table.groupBy(...).select()
, 其中groupBy(...)
指定了table的分組,而select(...)
則是table分組的對映。
Table API文件描述了streaming和batch表所支援的所有Table API操作。
下面的例子展示了一個簡單的Table API聚合查詢:
// 獲取一個 StreamTableEnvironment, 同樣適用於 BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// 註冊一個名叫 Orders 的表
// 掃描註冊的 Orders 表
Table orders = tableEnv.scan("Orders");
// 計算所有來自法國的客戶的收入
Table revenue = orders
.filter("cCountry === 'FRANCE'")
.groupBy("cID, cName")
.select("cID, cName, revenue.sum AS revSum");
// 發射或者轉換一個 Table
// 執行查詢
// 獲取一個 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 註冊一個名叫 Orders 的表
// 掃描已註冊的 Orders 表
Table orders = tableEnv.scan("Orders")
// 計算所有來自法國偶的客戶的收入
Table revenue = orders
.filter('cCountry === "FRANCE")
.groupBy('cID, 'cName)
.select('cID, 'cName, 'revenue.sum AS 'revSum)
// 發射或者轉換一個Table
// 執行查詢
注意:Scala Table API使用Scala的符號在引用表屬性時,以'`'開始,Table API使用Scala的隱式轉換,為了使用Scala的隱式轉換,請確保匯入org.apache.flink.api.scala._
和org.apache.flink.table.api.scala._
。
SQL
Flink的SQL整合是基於Apache Calcite的,Apache Calcite實現了標準的SQL,SQL查詢被指定為常規字串。
SQL文件描述了Flink對流和批表的SQL支援。
下面的例子展示瞭如何指定一個查詢並返回一個Table結果;
// 獲取一個 StreamTableEnvironment, 同樣適用於BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// 註冊一個名叫Orders 的表
// 計算所有來自法國的客戶的收入
Table revenue = tableEnv.sql(
"SELECT cID, cName, SUM(revenue) AS revSum " +
"FROM Orders " +
"WHERE cCountry = 'FRANCE' " +
"GROUP BY cID, cName"
);
// 發射或者轉換一個Table
// 執行查詢
// 獲取一個 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
//註冊一個名叫 Orders的表
// 計算所有來自法國的客戶的收入
Table revenue = tableEnv.sql("""
|SELECT cID, cName, SUM(revenue) AS revSum
|FROM Orders
|WHERE cCountry = 'FRANCE'
|GROUP BY cID, cName
""".stripMargin)
// 發射或者轉換 Table
// 執行查詢
混合使用Table API和SQL
Table API和SQL查詢可以很容易地合併因為它們都返回Table物件:
1、Table API查詢可以基於SQL查詢結果的Table來進行
2、SQL查詢可以基於Table API查詢的結果來定義
發射一個Table
為了發射一個Table,可以將其寫入一個TableSink中,TableSink 是支援各種檔案格式(如:CSV, Apache Parquet, Apache Avro)、儲存系統(如:JDBC, Apache HBase, Apache Cassandra, Elasticsearch)或者訊息系統(如:Apache Kafka,RabbitMQ)的通用介面。
一個批Table只能寫入BatchTableSink中,而流Table需要一個AppendStreamTableSink
、RetractStreamTableSink
或者UpsertStreamTableSink
請參考Table Sources & Sinks文件來了解更多可用sink的資訊和如何實現一個自定義的TableSink。
// 獲取一個StreamTableEnvironment, 同樣適用於 BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// 使用Table API和/或SQL查詢獲取一個 Table
Table result = ...
// 建立一個TableSink
TableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|");
// 將結果Table寫入TableSink中
result.writeToSink(sink);
// 執行程式
// 獲取一個TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 使用Table API和/或SQL查詢獲取一個 Table
val result: Table = ...
//建立一個 TableSink
val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|")
// 將結果 Table寫入TableSink中
result.writeToSink(sink)
// 執行程式
翻譯和執行一個查詢
Table API和SQL查詢根據輸入是流還是批翻譯成DataStream或者DataSet,查詢內部表示為一個邏輯查詢計劃,並分兩個階段進行翻譯:
1、優化邏輯計劃
2、翻譯成一個DataStream或者DataSet程式
Table API或者SQL查詢會在下面情況下觸發:
當呼叫Table.writeToSink()
時,Table會發射到TableSink中
Table轉換DataStream或者DataSet時(參考與DataStream和DataSet API整合)
一旦翻譯,Table API或者SQL查詢就會像常規DataStream或DataSet處理一樣,並且當StreamExecutionEnvironment.execute()
或者ExecutionEnvironment.execute()
呼叫時執行。
與DataStream和DataSet API整合
Table API和SQL查詢可以很容易地進行整合並嵌入到DataStream和DataSet程式中。例如:我們可以查詢一個外部表(如:來自關係型資料庫的表)、做一些預處理,如過濾、對映、聚合或者與元資料關聯,然後使用DataStream或者DataSet API(及其他基於這些API的庫,如CEP或Gelly)進行進一步處理。同樣,Table API或者SQL查詢也可以應用於DataStream或者DataSet程式的結果中。
這種互動可以通過將DataStream或者DataSet轉換成一個Table及將Table轉換成DataStream或者DataSet來實現。在本節,我們將描述這些轉換是如何完成的。
Scala 隱式轉換
Scala Table API為DataSet、DataStream和Table類提供了隱式轉換功能。這些轉換可以通過匯入Scala DataStream API中的org.apache.flink.table.api.scala._
和org.apache.flink.api.scala._
包來啟用。
註冊一個DataStream或者DataSet為Table
一個DataStream或者DataSet可以在TableEnvironment中註冊為Table,表的結果模式根據註冊的DataStream或者DataSet的資料型別來定。請參考資料型別對映到表模式來了解更詳細的資訊。
// 獲取 StreamTableEnvironment
// 註冊一個DataSet 到BatchTableEnvironment也是等效的
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
DataStream<Tuple2<Long, String>> stream = ...
// 註冊DataStream 為表 "myTable" ,並有兩個欄位 "f0", "f1"
tableEnv.registerDataStream("myTable", stream);
// 註冊 DataStream 為表 "myTable2" 並有兩個欄位 "myLong", "myString"
tableEnv.registerDataStream("myTable2", stream, "myLong, myString");
// 獲取 TableEnvironment
// 註冊一個 DataSet 是等價的
val tableEnv = TableEnvironment.getTableEnvironment(env)
val stream: DataStream[(Long, String)] = ...
// 註冊 DataStream 為表 "myTable" 並有兩個欄位 "f0", "f1"
tableEnv.registerDataStream("myTable", stream)
// 註冊 DataStream 為 "myTable2" 並有兩個欄位 "myLong", "myString"
tableEnv.registerDataStream("myTable2", stream, 'myLong, 'myString)
將Table轉換為DataStream或者DataSet
Table可以轉換為DataStream或者DataSet,這樣的話,自定義的DataStream或者DataSet程式就可以基於Table API或者SQL查詢的結果來執行了。
當將一個Table轉換為DataStream或者DataSet時,你需要指定生成的DataStream或者DataSet的資料型別,即需要轉換表的行的資料型別,通常最方便的轉換型別是Row,下面列表概述了不同選項的功能:
1、Row:欄位通過位置對映、可以是任意數量欄位,支援空值,非型別安全訪問
2、POJO:欄位通過名稱(POJO欄位作為Table欄位時,必須命名)對映,可以是任意數量欄位,支援空值,型別安全訪問
3、Case Class:欄位通過位置對映,不支援空值,型別安全訪問
4、Tuple:欄位通過位置對映,不得多於22(Scala)或者25(Java)個欄位,不支援空值,型別安全訪問
5、Atomic Type:Table必須有一個欄位,不支援空值,型別安全訪問。
將Table轉換為DataStream
流式查詢的結果Table會被動態地更新,即每個新的記錄到達輸入流時結果就會發生變化。因此,轉換此動態查詢的DataStream需要對錶的更新進行編碼。
有兩種模式來將Table轉換為DataStream:
1、Append Mode:這種模式只適用於當動態表僅由INSERT更改修改時,即僅附加,之前發射的結果不會被更新。
2、Retract Mode:始終都可以使用此模式,它使用一個boolean標識來編碼INSERT和DELETE更改。
// 獲取一個 StreamTableEnvironment.
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// 有兩個欄位(String name, Integer age)的Table
Table table = ...
// 通過指定類將Table轉換為Row的Append DataStream
DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);
// 通過一個TypeInformation將Table轉換為Tuple2<String, Integer> 型別的Append DataStream
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
Types.STRING(),
Types.INT());
DataStream<Tuple2<String, Integer>> dsTuple =
tableEnv.toAppendStream(table, tupleType);
// 將Table轉換為Row的react形式的DataStream
// 一個reactDataStream的型別X為 DataStream<Tuple2<Boolean, X>>.
// boolean欄位指定了更改的型別.
// True 是 INSERT, false 是 DELETE.
DataStream<Tuple2<Boolean, Row>> retractStream =
tableEnv.toRetractStream(table, Row.class);
// get TableEnvironment.
// registration of a DataSet is equivalent
val tableEnv = TableEnvironment.getTableEnvironment(env)
// Table with two fields (String name, Integer age)
val table: Table = ...
// convert the Table into an append DataStream of Row
val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)
// convert the Table into an append DataStream of Tuple2[String, Int]
val dsTuple: DataStream[(String, Int)] dsTuple =
tableEnv.toAppendStream[(String, Int)](table)
// convert the Table into a retract DataStream of Row.
// A retract stream of type X is a DataStream[(Boolean, X)].
// The boolean field indicates the type of the change.
// True is INSERT, false is DELETE.
val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)
注意:有關動態表及其屬性的詳細討論在Streaming Queries文件中給出。
將Table轉換為DataSet
Table可以按照如下方式轉換為DataSet:
// 獲取 BatchTableEnvironment
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// 有兩個欄位(String name, Integer age)的Table
Table table = ...
// 通過指定類將Table轉換為Row型別的DataSet
DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class);
// 通過TypeInformation 將Table轉換為Tuple2<String, Integer>型別的DataSet
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
Types.STRING(),
Types.INT());
DataStream<Tuple2<String, Integer>> dsTuple =
tableEnv.toAppendStream(table, tupleType);
// 獲取 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 有兩個欄位(String name, Integer age)的Table
val table: Table = ...
// 將Table轉換為Row型別的DataSet
val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)
// 將Table轉換為Tuple2[String, Int]型別的DataSet
val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)
資料型別對映到表模式
Flink的DataStream和DataSet API支援多種資料型別,如Tuple,POJO, case class及原始資料型別。接下來我們描述Table API如何將這些型別轉換為內部行表示及展示將DataStream轉換為Table的例子。
原子型別
Flink將原生型別(如:Integer, Double, String)或者通用型別(不能再被分析或者分解的型別)視為原子型別,一個原子型別的DataStream或者DataSet可以轉換為只有一個屬性的Table,屬性的型別根據原子型別推算,並且必須得指定屬性的名稱。
// 獲取一個 StreamTableEnvironment, 同樣原理適用於 BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
DataStream<Long> stream = ...
// 將 DataStream轉換為具有屬性"myLong"的Table
Table table = tableEnv.fromDataStream(stream, "myLong");
// 獲取一個 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
val stream: DataStream[Long] = ...
// 將 DataStream 轉換為具有屬性'myLong的Table
val table: Table = tableEnv.fromDataStream(stream, 'myLong)
Tuple(Java和Scala都支援)和Case Class(僅Scala支援)
Flink支援Scala內建的Tuple和Flink為Java提供的Tuple,DataStream和DataSet型別的Tuple都可以被轉換為表。欄位可以通過為所有欄位(通過位置來對映)提供的名稱來重新命名,如果沒有為欄位指定名稱的話,就會採用預設的欄位名。
// 獲取一個 StreamTableEnvironment, 同樣適用於BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
DataStream<Tuple2<Long, String>> stream = ...
// 將 DataStream為具有欄位名為"myLong", "myString"的Table
Table table1 = tableEnv.fromDataStream(stream, "myLong, myString");
// 將 DataStream 轉換為具有預設欄位名 "f0", "f1"的 Table
Table table2 = tableEnv.fromDataStream(stream);
//獲取一個 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
val stream: DataStream[(Long, String)] = ...
// 將 DataStream 轉換為具有欄位名 'myLong, 'myString' 的Table
val table1: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)
// 將 DataStream 轉換為具有預設欄位名 '_1, '_2的Table
val table2: Table = tableEnv.fromDataStream(stream)
// 定義一個 case class
case class Person(name: String, age: Int)
val streamCC: DataStream[Person] = ...
// 將 DataStream 轉換為具有預設欄位名 'name, 'age'的Table
val tableCC1 = tableEnv.fromDataStream(streamCC)
// 將 DataStream 轉換為具有欄位名 'myName, 'myAge'的Table
val tableCC1 = tableEnv.fromDataStream(streamCC, 'myName, 'myAge)
POJO(Java 和 Scala)
Flink支援使用POJO作為複合型別,決定POJO規則的文件請參考這裡。
當將一個POJO型別的DataStream或者DataSet轉換為Table而不指定欄位名稱時,Table的欄位名稱將採用JOPO原生的欄位名稱作為欄位名稱。重新命名原始的POJO欄位需要關鍵字AS,因為POJO沒有固定的順序,名稱對映需要原始名稱並且不能通過位置來完成。
//獲取一個 StreamTableEnvironment, 同樣原理適用於 BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// Person 是一個有兩個欄位"name" and "age" 的POJO
DataStream<Person> stream = ...
// 將 DataStream 轉換為有欄位 "name", "age" 的Table
Table table1 = tableEnv.fromDataStream(stream);
// 將 DataStream 轉換為有欄位 "myName", "myAge" 的Table
Table table2 = tableEnv.fromDataStream(stream, "name as myName, age as myAge");
// 獲取一個 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// Person 是一個有欄位 "name" and "age" 的POJO
val stream: DataStream[Person] = ...
// 將 DataStream 轉換為具有欄位 'name, 'age' 的Table
val table1: Table = tableEnv.fromDataStream(stream)
// 將 DataStream 轉換為具有欄位 'myName, 'myAge' 的Table
val table2: Table = tableEnv.fromDataStream(stream, 'name as 'myName, 'age as 'myAge)
Row
Row資料型別支援任意數量的欄位,並且欄位可以是null
值,欄位名稱可以通過RowTypeInformation來指定或者將一個Row DataStream或者DataSet轉換為Table時(根據位置)指定。
// 獲取一個 StreamTableEnvironment, 同樣原理適用於 BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// 在`RowTypeInfo`中指定欄位"name" and "age"的Row型別DataStream
DataStream<Row> stream = ...
// 將 DataStream 轉換為具有欄位 "name", "age" 的Table
Table table1 = tableEnv.fromDataStream(stream);
// 將 DataStream 轉換為具有欄位 "myName", "myAge" 的Table
Table table2 = tableEnv.fromDataStream(stream, "myName, myAge");
// 獲取一個 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 在`RowTypeInfo`中指定欄位"name" and "age"的Row型別DataStream
val stream: DataStream[Row] = ...
// 將 DataStream 轉換為具有欄位 'name, 'age' 的Table
val table1: Table = tableEnv.fromDataStream(stream)
// 將 DataStream 轉換為具有欄位 'myName, 'myAge' 的Table
val table2: Table = tableEnv.fromDataStream(stream, 'myName, 'myAge)
查詢優化
Apache Flink使用Apache Calcite來優化和翻譯查詢,當前的查詢優化包括投影、過濾下推、子查詢去相關及各種形式的查詢重寫。Flink不去優化join的順序,但是會根據它們的順序去執行(FROM子句中表的順序或者WHERE子句中連線謂詞的順序)。
可以通過提供一個CalciteConfig物件來調整在不同階段應用的優化規則集,這個可以通過呼叫CalciteConfig.createBuilder())
獲得的builder來建立,並且可以通過呼叫tableEnv.getConfig.setCalciteConfig(calciteConfig)
來提供給TableEnvironment。
解析一個Table
Table API為計算一個Table提供了一個機制來解析邏輯和優化查詢計劃,這個可以通過呼叫TableEnvironment.explain(table)
方法來完成,它會返回描述三個計劃的字串:
1、關係查詢語法樹,即未優化的查詢計劃
2、優化後的邏輯查詢計劃
3、物理執行計劃
以下程式碼顯示了一個示例和相應的輸出:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1, "hello"));
DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1, "hello"));
Table table1 = tEnv.fromDataStream(stream1, "count, word");
Table table2 = tEnv.fromDataStream(stream2, "count, word");
Table table = table1
.where("LIKE(word, 'F%')")
.unionAll(table2);
String explanation = tEnv.explain(table);
System.out.println(explanation);
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
val table = table1
.where('word.like("F%"))
.unionAll(table2)
val explanation: String = tEnv.explain(table)
println(explanation)
輸出如下:
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
LogicalFilter(condition=[LIKE($1, 'F%')])
LogicalTableScan(table=[[_DataStreamTable_0]])
LogicalTableScan(table=[[_DataStreamTable_1]])
== Optimized Logical Plan ==
DataStreamUnion(union=[count, word])
DataStreamCalc(select=[count, word], where=[LIKE(word, 'F%')])
DataStreamScan(table=[[_DataStreamTable_0]])
DataStreamScan(table=[[_DataStreamTable_1]])
== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat
Stage 2 : Data Source
content : collect elements with CollectionInputFormat
Stage 3 : Operator
content : from: (count, word)
ship_strategy : REBALANCE
Stage 4 : Operator
content : where: (LIKE(word, 'F%')), select: (count, word)
ship_strategy : FORWARD
Stage 5 : Operator
content : from: (count, word)
ship_strategy : REBALANCE