Flink基礎(三十一):FLINK SQL(七)INSERT 語句
阿新 • • 發佈:2020-11-28
INSERT 語句用來向表中新增行。
1 執行 INSERT 語句
單條 INSERT 語句,可以使用TableEnvironment
中的executeSql()
方法執行,也可以在SQL CLI中執行 INSERT 語句。executeSql()
方法執行 INSERT 語句時會立即提交一個 Flink 作業,並且返回一個 TableResult 物件,通過該物件可以獲取 JobClient 方便的操作提交的作業。 多條 INSERT 語句,使用TableEnvironment
中的createStatementSet
建立一個StatementSet
物件,然後使用StatementSet
中的addInsertSql()
StatementSet
中的execute()
方法來執行。
以下的例子展示瞭如何在TableEnvironment
和 SQL CLI 中執行一條 INSERT 語句,或者通過StatementSet
執行多條 INSERT 語句。
val settings = EnvironmentSettings.newInstance()... val tEnv = TableEnvironment.create(settings) // 註冊一個 "Orders" 源表,和 "RubberOrders" 結果表 tEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)") tEnv.executeSql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)") // 執行一個 INSERT 語句,將源表的資料輸出到結果表中 val tableResult1 = tEnv.executeSql( "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'") // 通過 TableResult 來獲取作業狀態 println(tableResult1.getJobClient().get().getJobStatus())//---------------------------------------------------------------------------- // 註冊一個 "GlassOrders" 結果表用於執行多 INSERT 語句 tEnv.executeSql("CREATE TABLE GlassOrders(product VARCHAR, amount INT) WITH (...)"); // 執行多個 INSERT 語句,將原表資料輸出到多個結果表中 val stmtSet = tEnv.createStatementSet() // `addInsertSql` 方法每次只接收單條 INSERT 語句 stmtSet.addInsertSql( "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'") stmtSet.addInsertSql( "INSERT INTO GlassOrders SELECT product, amount FROM Orders WHERE product LIKE '%Glass%'") // 執行剛剛新增的所有 INSERT 語句 val tableResult2 = stmtSet.execute() // 通過 TableResult 來獲取作業狀態 println(tableResult1.getJobClient().get().getJobStatus())
2 將 SELECT 查詢資料插入表中
通過 INSERT 語句,可以將查詢的結果插入到表中,
語法
INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name [PARTITION part_spec] select_statement
part_spec:
(part_col_name1=val1 [, part_col_name2=val2, ...])
OVERWRITE
INSERT OVERWRITE
將會覆蓋表中或分割槽中的任何已存在的資料。否則,新資料會追加到表中或分割槽中。
PARTITION
PARTITION
語句應該包含需要插入的靜態分割槽列與值。
示例
-- 建立一個分割槽表 CREATE TABLE country_page_view (user STRING, cnt INT, date STRING, country STRING) PARTITIONED BY (date, country) WITH (...) -- 追加行到該靜態分割槽中 (date='2019-8-30', country='China') INSERT INTO country_page_view PARTITION (date='2019-8-30', country='China') SELECT user, cnt FROM page_view_source; -- 追加行到分割槽 (date, country) 中,其中 date 是靜態分割槽 '2019-8-30';country 是動態分割槽,其值由每一行動態決定 INSERT INTO country_page_view PARTITION (date='2019-8-30') SELECT user, cnt, country FROM page_view_source; -- 覆蓋行到靜態分割槽 (date='2019-8-30', country='China') INSERT OVERWRITE country_page_view PARTITION (date='2019-8-30', country='China') SELECT user, cnt FROM page_view_source; -- 覆蓋行到分割槽 (date, country) 中,其中 date 是靜態分割槽 '2019-8-30';country 是動態分割槽,其值由每一行動態決定 INSERT OVERWRITE country_page_view PARTITION (date='2019-8-30') SELECT user, cnt, country FROM page_view_source;
3 將值插入表中
通過 INSERT 語句,也可以直接將值插入到表中,
語法
INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name VALUES values_row [, values_row ...]
values_row:
: (val1 [, val2, ...])
OVERWRITE
INSERT OVERWRITE
將會覆蓋表中的任何已存在的資料。否則,新資料會追加到表中。
示例
CREATE TABLE students (name STRING, age INT, gpa DECIMAL(3, 2)) WITH (...);
INSERT INTO students
VALUES ('fred flintstone', 35, 1.28), ('barney rubble', 32, 2.32);