1. 程式人生 > 其它 >Flink SQL UNION ALL和 UNION 附完整程式碼

Flink SQL UNION ALL和 UNION 附完整程式碼

技術標籤:flink sqlflink

test_source_union1
在這裡插入圖片描述

test_source_union2

在這裡插入圖片描述

UNION ALL

UNION ALL 將兩個表合併起來,要求兩個表的欄位完全一致,包括欄位型別、欄位順序,語義對應關係代數的 Union,只是關係代數是 Set 集合操作,會有去重複操作,UNION ALL 不進行去重,如下所示:

對應的 SQL 語句如下:

select * from test_source_union1 UNION ALL SELECT * from test_source_union2

在這裡插入圖片描述

UNION

UNION 將兩個流給合併起來,要求兩個流的欄位完全一致,包括欄位型別、欄位順序,並其 UNION 不同於 UNION ALL,UNION 會對結果資料去重,與關係代數的 Union 語義一致,如下:

對應的 SQL 語句如下:

SELECT * FROM T1 UNION SELECT * FROM T2

在這裡插入圖片描述

完整程式碼

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment
import org.apache.flink.types.
Row object test001 { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val bsSettings = EnvironmentSettings.newInstance.useBlinkPlanner.inStreamingMode.build val tEnv = StreamTableEnvironment.create(env, bsSettings) tEnv.executeSql(
s""" |CREATE TABLE test_source_union1 ( | a string, | b string, | c string |) WITH ( | 'connector.type' = 'jdbc', | 'connector.url' = 'jdbc:mysql://node01:3306/test', | 'connector.table' = 'test_source_union1', | 'connector.driver' = 'com.mysql.cj.jdbc.Driver', | 'connector.username' = 'root', | 'connector.password' = 'rootroot' |) |""".stripMargin) tEnv.executeSql( s""" |CREATE TABLE test_source_union2 ( | a string, | b string, | c string |) WITH ( | 'connector.type' = 'jdbc', | 'connector.url' = 'jdbc:mysql://node01:3306/test', | 'connector.table' = 'test_source_union2', | 'connector.driver' = 'com.mysql.cj.jdbc.Driver', | 'connector.username' = 'root', | 'connector.password' = 'rootroot' |) |""".stripMargin) // val table: Table = tEnv.sqlQuery( // "select * from test_source_union1 UNION ALL SELECT * from test_source_union2" // ) val table: Table = tEnv.sqlQuery( "select * from test_source_union1 UNION SELECT * from test_source_union2" ) val value = tEnv.toRetractStream(table, classOf[Row]) value.print() env.execute("test") } }