1. 程式人生 > 其它 >sql 查兩個表不重複的資料_Flink Table API & SQL程式設計指南(1)

sql 查兩個表不重複的資料_Flink Table API & SQL程式設計指南(1)

技術標籤:sql 查兩個表不重複的資料

768acd952970b8667436fdb6aedb8617.png點選藍字關注我們

Apache Flink提供了兩種頂層的關係型API,分別為Table API和SQL,Flink通過Table API&SQL實現了批流統一。其中Table API是用於Scala和Java的語言整合查詢API,它允許以非常直觀的方式組合關係運算符(例如select,where和join)的查詢。Flink SQL基於Apache Calcite 實現了標準的SQL,使用者可以使用標準的SQL處理資料集。Table API和SQL與Flink的DataStream和DataSet API緊密整合在一起,使用者可以實現相互轉化,比如可以將DataStream或者DataSet註冊為table進行操作資料。值得注意的是,Table API and SQL

目前尚未完全完善,還在積極的開發中,所以並不是所有的運算元操作都可以通過其實現。

依賴

從Flink1.9開始,Flink為Table & SQL API提供了兩種planner,分別為Blink planner和old planner,其中old planner是在Flink1.9之前的版本使用。主要區別如下:

尖叫提示:對於生產環境,目前推薦使用old planner.

  • flink-table-common: 通用模組,包含 Flink Planner 和 Blink Planner 一些共用的程式碼
  • flink-table-api-java: java語言的Table & SQL API,僅針對table(處於早期的開發階段,不推薦使用)
  • flink-table-api-scala: scala語言的Table & SQL API,僅針對table(處於早期的開發階段,不推薦使用)
  • flink-table-api-java-bridge: java語言的Table & SQL API,支援DataStream/DataSet API(推薦使用)
  • flink-table-api-scala-bridge: scala語言的Table & SQL API,支援DataStream/DataSet API(推薦使用)
  • flink-table-planner:planner 和runtime. planner為Flink1,9之前的old planner(推薦使用)
  • flink-table-planner-blink: 新的Blink planner.
  • flink-table-runtime-blink: 新的Blink runtime.
  • flink-table-uber: 將上述的API模組及old planner打成一個jar包,形如flink-table-*.jar,位與/lib目錄下
  • flink-table-uber-blink:將上述的API模組及Blink 模組打成一個jar包,形如fflink-table-blink-*.jar,位與/lib目錄下

Blink planner & old planner

Blink planner和old planner有許多不同的特點,具體列舉如下:

  • Blink planner將批處理作業看做是流處理作業的特例。所以,不支援Table 與DataSet之間的轉換,批處理的作業也不會被轉成DataSet程式,而是被轉為DataStream程式。
  • Blink planner不支援 BatchTableSource,使用的是有界的StreamTableSource。
  • Blink planner僅支援新的 Catalog,不支援ExternalCatalog (已過時)。
  • 對於FilterableTableSource的實現,兩種Planner是不同的。old planner會謂詞下推到PlannerExpression(未來會被移除),而Blink planner 會謂詞下推到 Expression(表示一個產生計算結果的邏輯樹)。
  • 僅僅Blink planner支援key-value形式的配置,即通過Configuration進行引數設定。
  • 關於PlannerConfig的實現,兩種planner有所不同。
  • Blink planner 會將多個sink優化成一個DAG(僅支援TableEnvironment,StreamTableEnvironment不支援),old planner總是將每一個sink優化成一個新的DAG,每一個DAG都是相互獨立的。
  • old planner不支援catalog統計,Blink planner支援catalog統計。

Flink Table & SQL程式的pom依賴

根據使用的語言不同,可以選擇下面的依賴,包括scala版和java版,如下:


org.apache.flinkflink-table-api-java-bridge_2.111.10.0provided

org.apache.flinkflink-table-api-scala-bridge_2.111.10.0provided

除此之外,如果需要在本地的IDE中執行Table API & SQL的程式,則需要新增下面的pom依賴:


org.apache.flinkflink-table-planner_2.111.10.0provided

org.apache.flinkflink-table-planner-blink_2.111.10.0provided

另外,如果需要實現自定義的格式(比如和kafka互動)或者使用者自定義函式,需要新增如下依賴:

org.apache.flinkflink-table-common1.10.0provided

Table API & SQL的程式設計模板

所有的Table API&SQL的程式(無論是批處理還是流處理)都有著相同的形式,下面將給出通用的程式設計結構形式:

//建立一個TableEnvironment物件,指定planner、處理模式(batch、streaming)
TableEnvironmenttableEnv=...;
//建立一個表
tableEnv.connect(...).createTemporaryTable("table1");
//註冊一個外部的表
tableEnv.connect(...).createTemporaryTable("outputTable");
//通過TableAPI的查詢建立一個Table物件
TabletapiResult=tableEnv.from("table1").select(...);
//通過SQL查詢的查詢建立一個Table物件
TablesqlResult=tableEnv.sqlQuery("SELECT...FROMtable1...");
//將結果寫入TableSink
tapiResult.insertInto("outputTable");
//執行
tableEnv.execute("java_job");

注意:Table API & SQL的查詢可以相互整合,另外還可以在DataStream或者DataSet中使用Table API & SQL的API,實現DataStreams、 DataSet與Table之間的相互轉換。

建立TableEnvironment

TableEnvironment是Table API & SQL程式的一個入口,主要包括如下的功能:

  • 在內部的catalog中註冊Table
  • 註冊catalog
  • 載入可插拔模組
  • 執行SQL查詢
  • 註冊使用者定義函式
  • DataStreamDataSet與Table之間的相互轉換
  • 持有對ExecutionEnvironmentStreamExecutionEnvironment的引用

一個Table必定屬於一個具體的TableEnvironment,不可以將不同TableEnvironment的表放在一起使用(比如join,union等操作)。

TableEnvironment是通過呼叫 BatchTableEnvironment.create() 或者StreamTableEnvironment.create()的靜態方法進行建立的。另外,預設兩個planner的jar包都存在與classpath下,所有需要明確指定使用的planner。

//**********************
//FLINK流處理查詢
//**********************
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.table.api.EnvironmentSettings;
importorg.apache.flink.table.api.java.StreamTableEnvironment;

EnvironmentSettingsfsSettings=EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamExecutionEnvironmentfsEnv=StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironmentfsTableEnv=StreamTableEnvironment.create(fsEnv,fsSettings);
//或者TableEnvironmentfsTableEnv=TableEnvironment.create(fsSettings);

//******************
//FLINK批處理查詢
//******************
importorg.apache.flink.api.java.ExecutionEnvironment;
importorg.apache.flink.table.api.java.BatchTableEnvironment;

ExecutionEnvironmentfbEnv=ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironmentfbTableEnv=BatchTableEnvironment.create(fbEnv);

//**********************
//BLINK流處理查詢
//**********************
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.table.api.EnvironmentSettings;
importorg.apache.flink.table.api.java.StreamTableEnvironment;

StreamExecutionEnvironmentbsEnv=StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettingsbsSettings=EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironmentbsTableEnv=StreamTableEnvironment.create(bsEnv,bsSettings);
//或者TableEnvironmentbsTableEnv=TableEnvironment.create(bsSettings);

//******************
//BLINK批處理查詢
//******************
importorg.apache.flink.table.api.EnvironmentSettings;
importorg.apache.flink.table.api.TableEnvironment;

EnvironmentSettingsbbSettings=EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironmentbbTableEnv=TableEnvironment.create(bbSettings);

在catalog中建立表

臨時表與永久表

表可以分為臨時表和永久表兩種,其中永久表需要一個catalog(比如Hive的Metastore)倆維護表的元資料資訊,一旦永久表被建立,只要連線到該catalog就可以訪問該表,只有顯示刪除永久表,該表才可以被刪除。臨時表的生命週期是Flink Session,這些表不能夠被其他的Flink Session訪問,這些表不屬於任何的catalog或者資料庫,如果與臨時表相對應的資料庫被刪除了,該臨時表也不會被刪除。

建立表

虛表(Virtual Tables)

一個Table物件相當於SQL中的檢視(虛表),它封裝了一個邏輯執行計劃,可以通過一個catalog建立,具體如下:

//獲取一個TableEnvironment
TableEnvironmenttableEnv=...;
//table物件,查詢的結果集
TableprojTable=tableEnv.from("X").select(...);
//註冊一個表,名稱為"projectedTable"
tableEnv.createTemporaryView("projectedTable",projTable);

外部資料來源表(Connector Tables)

可以把外部的資料來源註冊成表,比如可以讀取MySQL資料庫資料、Kafka資料等

tableEnvironment
.connect(...)
.withFormat(...)
.withSchema(...)
.inAppendMode()
.createTemporaryTable("MyTable")

擴充套件建立表的標識屬性

表的註冊總是包含三部分標識屬性:catalog、資料庫、表名。使用者可以在內部設定一個catalog和一個數據庫作為當前的catalog和資料庫,所以對於catalog和資料庫這兩個標識屬性是可選的,即如果不指定,預設使用的是“current catalog”和 “current database”。

TableEnvironmenttEnv=...;
tEnv.useCatalog("custom_catalog");//設定catalog
tEnv.useDatabase("custom_database");//設定資料庫
Tabletable=...;
//註冊一個名為exampleView的檢視,catalog名為custom_catalog
//資料庫的名為custom_database
tableEnv.createTemporaryView("exampleView",table);

//註冊一個名為exampleView的檢視,catalog的名為custom_catalog
//資料庫的名為other_database
tableEnv.createTemporaryView("other_database.exampleView",table);

//註冊一個名為'View'的檢視,catalog的名稱為custom_catalog
//資料庫的名為custom_database,'View'是保留關鍵字,需要使用``(反引號)
tableEnv.createTemporaryView("`View`",table);

//註冊一個名為example.View的檢視,catalog的名為custom_catalog,
//資料庫名為custom_database
tableEnv.createTemporaryView("`example.View`",table);

//註冊一個名為'exampleView'的檢視,catalog的名為'other_catalog'
//資料庫名為other_database'
tableEnv.createTemporaryView("other_catalog.other_database.exampleView",table);

查詢表

Table API

Table API是一個整合Scala與Java語言的查詢API,與SQL相比,它的查詢不是一個標準的SQL語句,而是由一步一步的操作組成的。如下展示了一個使用Table API實現一個簡單的聚合查詢。

//獲取TableEnvironment
TableEnvironmenttableEnv=...;
//註冊Orders表

//查詢註冊的表
Tableorders=tableEnv.from("Orders");
//計算操作
Tablerevenue=orders
.filter("cCountry==='FRANCE'")
.groupBy("cID,cName")
.select("cID,cName,revenue.sumASrevSum");

SQL

Flink SQL依賴於Apache Calcite,其實現了標準的SQL語法,如下案例:

//獲取TableEnvironment
TableEnvironmenttableEnv=...;

//註冊Orders表

//計算邏輯同上面的TableAPI
Tablerevenue=tableEnv.sqlQuery(
"SELECTcID,cName,SUM(revenue)ASrevSum"+
"FROMOrders"+
"WHEREcCountry='FRANCE'"+
"GROUPBYcID,cName"
);

//註冊"RevenueFrance"外部輸出表
//計算結果插入"RevenueFrance"表
tableEnv.sqlUpdate(
"INSERTINTORevenueFrance"+
"SELECTcID,cName,SUM(revenue)ASrevSum"+
"FROMOrders"+
"WHEREcCountry='FRANCE'"+
"GROUPBYcID,cName"
);

輸出表

一個表通過將其寫入到TableSink,然後進行輸出。TableSink是一個通用的支援多種檔案格式(CSV、Parquet, Avro)和多種外部儲存系統(JDBC, Apache HBase, Apache Cassandra, Elasticsearch)以及多種訊息對列(Apache Kafka, RabbitMQ)的介面。

批處理的表只能被寫入到 BatchTableSink,流處理的表需要指明AppendStreamTableSink、RetractStreamTableSink或者 UpsertStreamTableSink

//獲取TableEnvironment
TableEnvironmenttableEnv=...;

//建立輸出表
finalSchemaschema=newSchema()
.field("a",DataTypes.INT())
.field("b",DataTypes.STRING())
.field("c",DataTypes.LONG());

tableEnv.connect(newFileSystem("/path/to/file"))
.withFormat(newCsv().fieldDelimiter('|').deriveSchema())
.withSchema(schema)
.createTemporaryTable("CsvSinkTable");

//計算結果表
Tableresult=...
//輸出結果表到註冊的TableSink
result.insertInto("CsvSinkTable");

Table API & SQL底層的轉換與執行

上文提到了Flink提供了兩種planner,分別為old planner和Blink planner,對於不同的planner而言,Table API & SQL底層的執行與轉換是有所不同的。

Old planner

根據是流處理作業還是批處理作業,Table API &SQL會被轉換成DataStream或者DataSet程式。一個查詢在內部表示為一個邏輯查詢計劃,會被轉換為兩個階段:

  • 1.邏輯查詢計劃優化
  • 2.轉換成DataStream或者DataSet程式

上面的兩個階段只有下面的操作被執行時才會被執行:

  • 當一個表被輸出到TableSink時,比如呼叫了Table.insertInto()方法
  • 當執行更新查詢時,比如呼叫TableEnvironment.sqlUpdate()方法
  • 當一個表被轉換為DataStream或者DataSet時

一旦執行上述兩個階段,Table API & SQL的操作會被看做是普通的DataStream或者DataSet程式,所以當StreamExecutionEnvironment.execute()或者ExecutionEnvironment.execute() 被呼叫時,會執行轉換後的程式。

Blink planner

無論是批處理作業還是流處理作業,如果使用的是Blink planner,底層都會被轉換為DataStream程式。在一個查詢在內部表示為一個邏輯查詢計劃,會被轉換成兩個階段:

  • 1.邏輯查詢計劃優化
  • 2.轉換成DataStream程式

對於TableEnvironment and StreamTableEnvironment而言,一個查詢的轉換是不同的

首先對於TableEnvironment,當TableEnvironment.execute()方法執行時,Table API & SQL的查詢才會被轉換,因為TableEnvironment會將多個sink優化為一個DAG。

對於StreamTableEnvironment,轉換髮生的時間與old planner相同。

與DataStream & DataSet API整合

對於Old planner與Blink planner而言,只要是流處理的操作,都可以與DataStream API整合,僅僅只有Old planner才可以與DataSet API整合,由於Blink planner的批處理作業會被轉換成DataStream程式,所以不能夠與DataSet API整合。值得注意的是,下面提到的table與DataSet之間的轉換僅適用於Old planner。

Table API & SQL的查詢很容易與DataStream或者DataSet程式整合,並可以將Table API & SQL的查詢嵌入DataStream或者DataSet程式中。DataStream或者DataSet可以轉換成表,反之,表也可以被轉換成DataStream或者DataSet。

從DataStream或者DataSet中註冊臨時表(檢視)

**尖叫提示:**只能將DataStream或者DataSet轉換為臨時表(檢視)

下面演示DataStream的轉換,對於DataSet的轉換類似。

//獲取StreamTableEnvironment
StreamTableEnvironmenttableEnv=...;
DataStream>stream=...//將DataStream註冊為一個名為myTable的檢視,其中欄位分別為"f0","f1"
tableEnv.createTemporaryView("myTable",stream);//將DataStream註冊為一個名為myTable2的檢視,其中欄位分別為"myLong","myString"
tableEnv.createTemporaryView("myTable2",stream,"myLong,myString");

將DataStream或者DataSet轉化為Table物件

可以直接將DataStream或者DataSet轉換為Table物件,之後可以使用Table API進行查詢操作。下面演示DataStream的轉換,對於DataSet的轉換類似。

//獲取StreamTableEnvironment
StreamTableEnvironmenttableEnv=...;
DataStream>stream=...//將DataStream轉換為Table物件,預設的欄位為"f0","f1"
Tabletable1=tableEnv.fromDataStream(stream);//將DataStream轉換為Table物件,預設的欄位為"myLong","myString"
Tabletable2=tableEnv.fromDataStream(stream,"myLong,myString");

將錶轉換為DataStream或者DataSet

當將Table轉為DataStream或者DataSet時,需要指定DataStream或者DataSet的資料型別。通常最方便的資料型別是row型別,Flink提供了很多的資料型別供使用者選擇,具體包括Row、POJO、樣例類、Tuple和原子型別。

將錶轉換為DataStream

一個流處理查詢的結果是動態變化的,所以將錶轉為DataStream時需要指定一個更新模式,共有兩種模式:Append ModeRetract Mode

  • Append Mode

如果動態表僅只有Insert操作,即之前輸出的結果不會被更新,則使用該模式。如果更新或刪除操作使用追加模式會失敗報錯

  • Retract Mode

始終可以使用此模式。返回值是boolean型別。它用true或false來標記資料的插入和撤回,返回true代表資料插入,false代表資料的撤回。

//獲取StreamTableEnvironment.
StreamTableEnvironmenttableEnv=...;
//包含兩個欄位的表(Stringname,Integerage)
Tabletable=...
//將錶轉為DataStream,使用AppendMode追加模式,資料型別為Row
DataStreamdsRow=tableEnv.toAppendStream(table,Row.class);//將錶轉為DataStream,使用AppendMode追加模式,資料型別為定義好的TypeInformation
TupleTypeInfo>tupleType=newTupleTypeInfo<>(
Types.STRING(),
Types.INT());
DataStream>dsTuple=
tableEnv.toAppendStream(table,tupleType);//將錶轉為DataStream,使用的模式為RetractMode撤回模式,型別為Row//對於轉換後的DataStream>,X表示流的資料型別,//boolean值表示資料改變的型別,其中INSERT返回true,DELETE返回的是false
DataStream>retractStream=
tableEnv.toRetractStream(table,Row.class);

將錶轉換為DataSet

//獲取BatchTableEnvironment
BatchTableEnvironmenttableEnv=BatchTableEnvironment.create(env);
//包含兩個欄位的表(Stringname,Integerage)
Tabletable=...
//將錶轉為DataSet資料型別為Row
DataSetdsRow=tableEnv.toDataSet(table,Row.class);//將錶轉為DataSet,通過TypeInformation定義Tuple2資料型別
TupleTypeInfo>tupleType=newTupleTypeInfo<>(
Types.STRING(),
Types.INT());
DataSet>dsTuple=
tableEnv.toDataSet(table,tupleType);

表的Schema與資料型別之間的對映

表的Schema與資料型別之間的對映有兩種方式:分別是基於欄位下標位置的對映和基於欄位名稱的對映。

基於欄位下標位置的對映

該方式是按照欄位的順序進行一一對映,使用方式如下:

//獲取StreamTableEnvironment
StreamTableEnvironmenttableEnv=...;
DataStream>stream=...//將DataStream轉為表,預設的欄位名為"f0"和"f1"
Tabletable=tableEnv.fromDataStream(stream);//將DataStream轉為表,選取tuple的第一個元素,指定一個名為"myLong"的欄位名
Tabletable=tableEnv.fromDataStream(stream,"myLong");//將DataStream轉為表,為tuple的第一個元素指定名為"myLong",為第二個元素指定myInt的欄位名
Tabletable=tableEnv.fromDataStream(stream,"myLong,myInt");

基於欄位名稱的對映

基於欄位名稱的對映方式支援任意的資料型別包括POJO型別,可以很靈活地定義表Schema對映,所有的欄位被對映成一個具體的欄位名稱,同時也可以使用"as"為欄位起一個別名。其中Tuple元素的第一個元素為f0,第二個元素為f1,以此類推。

//獲取StreamTableEnvironment
StreamTableEnvironmenttableEnv=...;
DataStream>stream=...//將DataStream轉為表,預設的欄位名為"f0"和"f1"
Tabletable=tableEnv.fromDataStream(stream);//將DataStream轉為表,選擇tuple的第二個元素,指定一個名為"f1"的欄位名
Tabletable=tableEnv.fromDataStream(stream,"f1");//將DataStream轉為表,交換欄位的順序
Tabletable=tableEnv.fromDataStream(stream,"f1,f0");//將DataStream轉為表,交換欄位的順序,併為f1起別名為"myInt",為f0起別名為"myLong
Tabletable=tableEnv.fromDataStream(stream,"f1asmyInt,f0asmyLong");

原子型別

Flink將Integer, Double, String或者普通的型別稱之為原子型別,一個數據型別為原子型別的DataStream或者DataSet可以被轉成單個欄位屬性的表,這個欄位的型別與DataStream或者DataSet的資料型別一致,這個欄位的名稱可以進行指定。

//獲取StreamTableEnvironment
StreamTableEnvironmenttableEnv=...;
//資料型別為原子型別Long
DataStreamstream=...//將DataStream轉為表,預設的欄位名為"f0"
Tabletable=tableEnv.fromDataStream(stream);//將DataStream轉為表,指定欄位名為myLong"
Tabletable=tableEnv.fromDataStream(stream,"myLong");

Tuple型別

Tuple型別的DataStream或者DataSet都可以轉為表,可以重新設定表的欄位名(即根據tuple元素的位置進行一一對映,轉為表之後,每個元素都有一個別名),如果不為欄位指定名稱,則使用預設的名稱(java語言預設的是f0,f1,scala預設的是_1),使用者也可以重新排列欄位的順序,併為每個欄位起一個別名。

//獲取StreamTableEnvironment
StreamTableEnvironmenttableEnv=...;
//Tuple2型別的DataStream
DataStream>stream=...//將DataStream轉為表,預設的欄位名為"f0","f1"
Tabletable=tableEnv.fromDataStream(stream);//將DataStream轉為表,指定欄位名為"myLong","myString"(按照Tuple元素的順序位置)
Tabletable=tableEnv.fromDataStream(stream,"myLong,myString");//將DataStream轉為表,指定欄位名為"f0","f1",並且交換順序
Tabletable=tableEnv.fromDataStream(stream,"f1,f0");//將DataStream轉為表,只選擇Tuple的第二個元素,指定欄位名為"f1"
Tabletable=tableEnv.fromDataStream(stream,"f1");//將DataStream轉為表,為Tuple的第二個元素指定別名為myString,為第一個元素指定欄位名為myLong
Tabletable=tableEnv.fromDataStream(stream,"f1as'myString',f0as'myLong'");

POJO型別

當將POJO型別的DataStream或者DataSet轉為表時,如果不指定表名,則預設使用的是POJO欄位本身的名稱,原始欄位名稱的對映需要指定原始欄位的名稱,可以為其起一個別名,也可以調換欄位的順序,也可以只選擇部分的欄位。

//獲取StreamTableEnvironment
StreamTableEnvironmenttableEnv=...;
//資料型別為Person的POJO型別,欄位包括"name"和"age"
DataStreamstream=...//將DataStream轉為表,預設的欄位名稱為"age","name"
Tabletable=tableEnv.fromDataStream(stream);//將DataStream轉為表,為"age"欄位指定別名myAge,為"name"欄位指定別名myName
Tabletable=tableEnv.fromDataStream(stream,"ageasmyAge,nameasmyName");//將DataStream轉為表,只選擇一個name欄位
Tabletable=tableEnv.fromDataStream(stream,"name");//將DataStream轉為表,只選擇一個name欄位,並起一個別名myName
Tabletable=tableEnv.fromDataStream(stream,"nameasmyName");

Row型別

Row型別的DataStream或者DataSet轉為表的過程中,可以根據欄位的位置或者欄位名稱進行對映,同時也可以為欄位起一個別名,或者只選擇部分欄位。

//獲取StreamTableEnvironment
StreamTableEnvironmenttableEnv=...;
//Row型別的DataStream,通過RowTypeInfo指定兩個欄位"name"和"age"
DataStreamstream=...//將DataStream轉為表,預設的欄位名為原始欄位名"name"和"age"
Tabletable=tableEnv.fromDataStream(stream);//將DataStream轉為表,根據位置對映,為第一個欄位指定myName別名,為第二個欄位指定myAge別名
Tabletable=tableEnv.fromDataStream(stream,"myName,myAge");//將DataStream轉為表,根據欄位名對映,為name欄位起別名myName,為age欄位起別名myAge
Tabletable=tableEnv.fromDataStream(stream,"nameasmyName,ageasmyAge");//將DataStream轉為表,根據欄位名對映,只選擇name欄位
Tabletable=tableEnv.fromDataStream(stream,"name");//將DataStream轉為表,根據欄位名對映,只選擇name欄位,並起一個別名"myName"
Tabletable=tableEnv.fromDataStream(stream,"nameasmyName");

查詢優化

Old planner

Apache Flink利用Apache Calcite來優化和轉換查詢。當前執行的優化包括投影和過濾器下推,去相關子查詢以及其他型別的查詢重寫。Old Planner目前不支援優化JOIN的順序,而是按照查詢中定義的順序執行它們。

通過提供一個CalciteConfig物件,可以調整在不同階段應用的優化規則集。這可通過呼叫CalciteConfig.createBuilder()方法來進行建立,並通過呼叫tableEnv.getConfig.setPlannerConfig(calciteConfig)方法將該物件傳遞給TableEnvironment。

Blink planner

Apache Flink利用並擴充套件了Apache Calcite來執行復雜的查詢優化。這包括一系列基於規則和基於成本的優化(cost_based),例如:

  • 基於Apache Calcite的去相關子查詢
  • 投影裁剪
  • 分割槽裁剪
  • 過濾器謂詞下推
  • 子計劃重複資料刪除以避免重複計算
  • 特殊的子查詢重寫,包括兩個部分:
    • 將IN和EXISTS轉換為左半聯接( left semi-join)
    • 將NOT IN和NOT EXISTS轉換為left anti-join
  • 調整join的順序,需要啟用 table.optimizer.join-reorder-enabled

注意: IN / EXISTS / NOT IN / NOT EXISTS當前僅在子查詢重寫的結合條件下受支援。

查詢優化器不僅基於計劃,而且還可以基於資料來源的統計資訊以及每個操作的細粒度開銷(例如io,cpu,網路和記憶體),從而做出更加明智且合理的優化決策。

高階使用者可以通過CalciteConfig物件提供自定義優化規則,通過呼叫tableEnv.getConfig.setPlannerConfig(calciteConfig),將引數傳遞給TableEnvironment。

檢視執行計劃

SQL語言支援通過explain來檢視某條SQL的執行計劃,Flink Table API也可以通過呼叫explain()方法來檢視具體的執行計劃。該方法返回一個字串用來描述三個部分計劃,分別為:

  1. 關係查詢的抽象語法樹,即未優化的邏輯查詢計劃,
  2. 優化的邏輯查詢計劃
  3. 實際執行計劃
StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env);
DataStream>stream1=env.fromElements(newTuple2<>(1,"hello"));
DataStream>stream2=env.fromElements(newTuple2<>(1,"hello"));
Tabletable1=tEnv.fromDataStream(stream1,"count,word");
Tabletable2=tEnv.fromDataStream(stream2,"count,word");
Tabletable=table1
.where("LIKE(word,'F%')")
.unionAll(table2);//檢視執行計劃
Stringexplanation=tEnv.explain(table);
System.out.println(explanation);

執行計劃的結果為:

==抽象語法樹==
LogicalUnion(all=[true])
LogicalFilter(condition=[LIKE($1,_UTF-16LE'F%')])
FlinkLogicalDataStreamScan(id=[1],fields=[count,word])
FlinkLogicalDataStreamScan(id=[2],fields=[count,word])

==優化的邏輯執行計劃==
DataStreamUnion(all=[true],unionall=[count,word])
DataStreamCalc(select=[count,word],where=[LIKE(word,_UTF-16LE'F%')])
DataStreamScan(id=[1],fields=[count,word])
DataStreamScan(id=[2],fields=[count,word])

==物理執行計劃==
Stage1:DataSource
content:collectelementswithCollectionInputFormat

Stage2:DataSource
content:collectelementswithCollectionInputFormat

Stage3:Operator
content:from:(count,word)
ship_strategy:REBALANCE

Stage4:Operator
content:where:(LIKE(word,_UTF-16LE'F%')),select:(count,word)
ship_strategy:FORWARD

Stage5:Operator
content:from:(count,word)
ship_strategy:REBALANCE

小結

本文主要介紹了Flink TableAPI &SQL,首先介紹了Flink Table API &SQL的基本概念 ,然後介紹了構建Flink Table API & SQL程式所需要的依賴,接著介紹了Flink的兩種planner,還介紹瞭如何登錄檔以及DataStream、DataSet與表的相互轉換,最後介紹了Flink的兩種planner對應的查詢優化並給出了一個檢視執行計劃的案例。

往期 精彩回顧Flink DataSet API程式設計指南
Flink DataStream API程式設計指南
Flink DataStream API 中的多面手——Process Function詳解透過視窗觀無限資料流——Flink的Window全面解析 我就知道你在看!