1. 程式人生 > 實用技巧 >Flink基礎(三十一):FLINK SQL(七)INSERT 語句

Flink基礎(三十一):FLINK SQL(七)INSERT 語句

INSERT 語句用來向表中新增行。

1 執行 INSERT 語句

單條 INSERT 語句,可以使用TableEnvironment中的executeSql()方法執行,也可以在SQL CLI中執行 INSERT 語句。executeSql()方法執行 INSERT 語句時會立即提交一個 Flink 作業,並且返回一個 TableResult 物件,通過該物件可以獲取 JobClient 方便的操作提交的作業。 多條 INSERT 語句,使用TableEnvironment中的createStatementSet建立一個StatementSet物件,然後使用StatementSet中的addInsertSql()

方法新增多條 INSERT 語句,最後通過StatementSet中的execute()方法來執行。

以下的例子展示瞭如何在TableEnvironment和 SQL CLI 中執行一條 INSERT 語句,或者通過StatementSet執行多條 INSERT 語句。

val settings = EnvironmentSettings.newInstance()...
val tEnv = TableEnvironment.create(settings)

// 註冊一個 "Orders" 源表,和 "RubberOrders" 結果表
tEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
tEnv.executeSql(
"CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)") // 執行一個 INSERT 語句,將源表的資料輸出到結果表中 val tableResult1 = tEnv.executeSql( "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'") // 通過 TableResult 來獲取作業狀態 println(tableResult1.getJobClient().get().getJobStatus())
//---------------------------------------------------------------------------- // 註冊一個 "GlassOrders" 結果表用於執行多 INSERT 語句 tEnv.executeSql("CREATE TABLE GlassOrders(product VARCHAR, amount INT) WITH (...)"); // 執行多個 INSERT 語句,將原表資料輸出到多個結果表中 val stmtSet = tEnv.createStatementSet() // `addInsertSql` 方法每次只接收單條 INSERT 語句 stmtSet.addInsertSql( "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'") stmtSet.addInsertSql( "INSERT INTO GlassOrders SELECT product, amount FROM Orders WHERE product LIKE '%Glass%'") // 執行剛剛新增的所有 INSERT 語句 val tableResult2 = stmtSet.execute() // 通過 TableResult 來獲取作業狀態 println(tableResult1.getJobClient().get().getJobStatus())

2 將 SELECT 查詢資料插入表中

通過 INSERT 語句,可以將查詢的結果插入到表中,

語法

INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name [PARTITION part_spec] select_statement

part_spec:
  (part_col_name1=val1 [, part_col_name2=val2, ...])

OVERWRITE

INSERT OVERWRITE將會覆蓋表中或分割槽中的任何已存在的資料。否則,新資料會追加到表中或分割槽中。

PARTITION

PARTITION語句應該包含需要插入的靜態分割槽列與值。

示例

-- 建立一個分割槽表
CREATE TABLE country_page_view (user STRING, cnt INT, date STRING, country STRING)
PARTITIONED BY (date, country)
WITH (...)

-- 追加行到該靜態分割槽中 (date='2019-8-30', country='China')
INSERT INTO country_page_view PARTITION (date='2019-8-30', country='China')
  SELECT user, cnt FROM page_view_source;

-- 追加行到分割槽 (date, country) 中,其中 date 是靜態分割槽 '2019-8-30';country 是動態分割槽,其值由每一行動態決定
INSERT INTO country_page_view PARTITION (date='2019-8-30')
  SELECT user, cnt, country FROM page_view_source;

-- 覆蓋行到靜態分割槽 (date='2019-8-30', country='China')
INSERT OVERWRITE country_page_view PARTITION (date='2019-8-30', country='China')
  SELECT user, cnt FROM page_view_source;

-- 覆蓋行到分割槽 (date, country) 中,其中 date 是靜態分割槽 '2019-8-30';country 是動態分割槽,其值由每一行動態決定
INSERT OVERWRITE country_page_view PARTITION (date='2019-8-30')
  SELECT user, cnt, country FROM page_view_source;

3 將值插入表中

通過 INSERT 語句,也可以直接將值插入到表中,

語法

INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name VALUES values_row [, values_row ...]

values_row:
    : (val1 [, val2, ...])

OVERWRITE

INSERT OVERWRITE將會覆蓋表中的任何已存在的資料。否則,新資料會追加到表中。

示例

CREATE TABLE students (name STRING, age INT, gpa DECIMAL(3, 2)) WITH (...);

INSERT INTO students
  VALUES ('fred flintstone', 35, 1.28), ('barney rubble', 32, 2.32);