Flink SQL UNION ALL和 UNION 附完整程式碼
阿新 • • 發佈:2021-01-16
test_source_union1
test_source_union2
UNION ALL
UNION ALL 將兩個表合併起來,要求兩個表的欄位完全一致,包括欄位型別、欄位順序,語義對應關係代數的 Union,只是關係代數是 Set 集合操作,會有去重複操作,UNION ALL 不進行去重,如下所示:
對應的 SQL 語句如下:
select * from test_source_union1 UNION ALL SELECT * from test_source_union2
UNION
UNION 將兩個流給合併起來,要求兩個流的欄位完全一致,包括欄位型別、欄位順序,並其 UNION 不同於 UNION ALL,UNION 會對結果資料去重,與關係代數的 Union 語義一致,如下:
對應的 SQL 語句如下:
SELECT * FROM T1 UNION SELECT * FROM T2
完整程式碼
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment
import org.apache.flink.types. Row
object test001 {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val bsSettings = EnvironmentSettings.newInstance.useBlinkPlanner.inStreamingMode.build
val tEnv = StreamTableEnvironment.create(env, bsSettings)
tEnv.executeSql(
s"""
|CREATE TABLE test_source_union1 (
| a string,
| b string,
| c string
|) WITH (
| 'connector.type' = 'jdbc',
| 'connector.url' = 'jdbc:mysql://node01:3306/test',
| 'connector.table' = 'test_source_union1',
| 'connector.driver' = 'com.mysql.cj.jdbc.Driver',
| 'connector.username' = 'root',
| 'connector.password' = 'rootroot'
|)
|""".stripMargin)
tEnv.executeSql(
s"""
|CREATE TABLE test_source_union2 (
| a string,
| b string,
| c string
|) WITH (
| 'connector.type' = 'jdbc',
| 'connector.url' = 'jdbc:mysql://node01:3306/test',
| 'connector.table' = 'test_source_union2',
| 'connector.driver' = 'com.mysql.cj.jdbc.Driver',
| 'connector.username' = 'root',
| 'connector.password' = 'rootroot'
|)
|""".stripMargin)
// val table: Table = tEnv.sqlQuery(
// "select * from test_source_union1 UNION ALL SELECT * from test_source_union2"
// )
val table: Table = tEnv.sqlQuery(
"select * from test_source_union1 UNION SELECT * from test_source_union2"
)
val value = tEnv.toRetractStream(table, classOf[Row])
value.print()
env.execute("test")
}
}