1. 程式人生 > 實用技巧 >Flink基礎(二十八):FLINK SQL(四)CREATE 語句

Flink基礎(二十八):FLINK SQL(四)CREATE 語句

0 簡介

CREATE 語句用於向當前或指定的Catalog中登錄檔、檢視或函式。註冊後的表、檢視和函式可以在 SQL 查詢中使用。

目前 Flink SQL 支援下列 CREATE 語句:

  • CREATE TABLE
  • CREATE DATABASE
  • CREATE VIEW
  • CREATE FUNCTION

1 執行 CREATE 語句

可以使用TableEnvironment中的executeSql()方法執行 CREATE 語句,也可以在SQL CLI中執行 CREATE 語句。 若 CREATE 操作執行成功,executeSql()方法返回 ‘OK’,否則會丟擲異常。

以下的例子展示瞭如何在TableEnvironment

和 SQL CLI 中執行一個 CREATE 語句。

val settings = EnvironmentSettings.newInstance()...
val tableEnv = TableEnvironment.create(settings)

// 對已註冊的表進行 SQL 查詢
// 註冊名為 “Orders” 的表
tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
// 在表上執行 SQL 查詢,並把得到的結果作為一個新的表
val result = tableEnv.sqlQuery(
  
"SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'"); // 對已註冊的表進行 INSERT 操作 // 註冊 TableSink tableEnv.executeSql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH ('connector.path'='/path/to/file' ...)"); // 在表上執行 INSERT 語句並向 TableSink 發出結果 tableEnv.executeSql( "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
Flink SQL> CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...);
[INFO] Table has been created.

Flink SQL> CREATE TABLE RubberOrders (product STRING, amount INT) WITH (...);
[INFO] Table has been created.

Flink SQL> INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%';
[INFO] Submitting SQL update statement to the cluster...

2 CREATE TABLE

CREATE TABLE [catalog_name.][db_name.]table_name
  (
    { <column_definition> | <computed_column_definition> }[ , ...n]
    [ <watermark_definition> ]
    [ <table_constraint> ][ , ...n]
  )
  [COMMENT table_comment]
  [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
  WITH (key1=val1, key2=val2, ...)
  [ LIKE source_table [( <like_options> )] ]

<column_definition>:
  column_name column_type [ <column_constraint> ] [COMMENT column_comment]

<column_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED

<table_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED

<computed_column_definition>:
  column_name AS computed_column_expression [COMMENT column_comment]

<watermark_definition>:
  WATERMARK FOR rowtime_column_name AS watermark_strategy_expression

<source_table>:
  [catalog_name.][db_name.]table_name

<like_options>:
{
   { INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | PARTITIONS }
 | { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS } 
}[, ...]

根據指定的表名建立一個表,如果同名表已經在 catalog 中存在了,則無法註冊。

COMPUTED COLUMN

計算列是一個使用 “column_name AS computed_column_expression” 語法生成的虛擬列。它由使用同一表中其他列的非查詢表示式生成,並且不會在表中進行物理儲存。例如,一個計算列可以使用cost AS price * quantity進行定義,這個表示式可以包含物理列、常量、函式或變數的任意組合,但這個表示式不能存在任何子查詢。

在 Flink 中計算列一般用於為 CREATE TABLE 語句定義時間屬性處理時間屬性可以簡單地通過使用了系統函式PROCTIME()proc AS PROCTIME()語句進行定義。 另一方面,由於事件時間列可能需要從現有的欄位中獲得,因此計算列可用於獲得事件時間列。例如,原始欄位的型別不是TIMESTAMP(3)或巢狀在 JSON 字串中。

注意:

  • 定義在一個數據源表( source table )上的計算列會在從資料來源讀取資料後被計算,它們可以在 SELECT 查詢語句中使用。
  • 計算列不可以作為 INSERT 語句的目標,在 INSERT 語句中,SELECT 語句的 schema 需要與目標表不帶有計算列的 schema 一致。

WATERMARK

WATERMARK定義了表的事件時間屬性,其形式為WATERMARK FOR rowtime_column_name AS watermark_strategy_expression

rowtime_column_name把一個現有的列定義為一個為表標記事件時間的屬性。該列的型別必須為TIMESTAMP(3),且是 schema 中的頂層列,它也可以是一個計算列。

watermark_strategy_expression定義了 watermark 的生成策略。它允許使用包括計算列在內的任意非查詢表示式來計算 watermark ;表示式的返回型別必須是TIMESTAMP(3),表示了從 Epoch 以來的經過的時間。 返回的 watermark 只有當其不為空且其值大於之前發出的本地 watermark 時才會被髮出(以保證 watermark 遞增)。每條記錄的 watermark 生成表示式計算都會由框架完成。 框架會定期發出所生成的最大的 watermark ,如果當前 watermark 仍然與前一個 watermark 相同、為空、或返回的 watermark 的值小於最後一個發出的 watermark ,則新的 watermark 不會被髮出。 Watermark 根據pipeline.auto-watermark-interval中所配置的間隔發出。 若 watermark 的間隔是0ms,那麼每條記錄都會產生一個 watermark,且 watermark 會在不為空並大於上一個發出的 watermark 時發出。

使用事件時間語義時,表必須包含事件時間屬性和 watermark 策略。

Flink 提供了幾種常用的 watermark 策略。

  • 嚴格遞增時間戳:WATERMARK FOR rowtime_column AS rowtime_column

    發出到目前為止已觀察到的最大時間戳的 watermark ,時間戳大於最大時間戳的行被認為沒有遲到。

  • 遞增時間戳:WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND

    發出到目前為止已觀察到的最大時間戳減 1 的 watermark ,時間戳大於或等於最大時間戳的行被認為沒有遲到。

  • 有界亂序時間戳:WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit

    發出到目前為止已觀察到的最大時間戳減去指定延遲的 watermark ,例如,WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECOND是一個 5 秒延遲的 watermark 策略。

CREATE TABLE Orders (
    user BIGINT,
    product STRING,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ( . . . );

PRIMARY KEY

主鍵用作 Flink 優化的一種提示資訊。主鍵限制表明一張表或檢視的某個(些)列是唯一的並且不包含 Null 值。 主鍵宣告的列都是非 nullable 的。因此主鍵可以被用作錶行級別的唯一標識。

主鍵可以和列的定義一起宣告,也可以獨立宣告為表的限制屬性,不管是哪種方式,主鍵都不可以重複定義,否則 Flink 會報錯。

有效性檢查

SQL 標準主鍵限制可以有兩種模式:ENFORCED或者NOT ENFORCED。 它申明瞭是否輸入/出資料會做合法性檢查(是否唯一)。Flink 不儲存資料因此只支援NOT ENFORCED模式,即不做檢查,使用者需要自己保證唯一性。

Flink 假設聲明瞭主鍵的列都是不包含 Null 值的,Connector 在處理資料時需要自己保證語義正確。

Notes:在 CREATE TABLE 語句中,建立主鍵會修改列的 nullable 屬性,主鍵宣告的列預設都是非 Nullable 的。

PARTITIONED BY

根據指定的列對已經建立的表進行分割槽。若表使用 filesystem sink ,則將會為每個分割槽建立一個目錄。

WITH OPTIONS

表屬性用於建立 table source/sink ,一般用於尋找和建立底層的聯結器。

表示式key1=val1的鍵和值必須為字串文字常量。請參考連線外部系統瞭解不同聯結器所支援的屬性。

注意:表名可以為以下三種格式 1.catalog_name.db_name.table_name2.db_name.table_name3.table_name。使用catalog_name.db_name.table_name的表將會與名為 “catalog_name” 的 catalog 和名為 “db_name” 的資料庫一起註冊到 metastore 中。使用db_name.table_name的表將會被註冊到當前執行的 table environment 中的 catalog 且資料庫會被命名為 “db_name”;對於table_name, 資料表將會被註冊到當前正在執行的catalog和資料庫中。

注意:使用CREATE TABLE語句註冊的表均可用作 table source 和 table sink。 在被 DML 語句引用前,我們無法決定其實際用於 source 抑或是 sink。

LIKE

LIKE子句來源於兩種 SQL 特性的變體/組合(Feature T171,“表定義中的 LIKE 語法” 和 Feature T173,“表定義中的 LIKE 語法擴充套件”)。LIKE 子句可以基於現有表的定義去建立新表,並且可以擴充套件或排除原始表中的某些部分。與 SQL 標準相反,LIKE 子句必須在 CREATE 語句中定義,並且是基於 CREATE 語句的更上層定義,這是因為 LIKE 子句可以用於定義表的多個部分,而不僅僅是 schema 部分。

你可以使用該子句,重用(或改寫)指定的聯結器配置屬性或者可以向外部表新增 watermark 定義,例如可以向 Apache Hive 中定義的表新增 watermark 定義。

示例如下:

CREATE TABLE Orders (
    user BIGINT,
    product STRING,
    order_time TIMESTAMP(3)
) WITH ( 
    'connector' = 'kafka',
    'scan.startup.mode' = 'earliest-offset'
);

CREATE TABLE Orders_with_watermark (
    -- 新增 watermark 定義
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 
) WITH (
    -- 改寫 startup-mode 屬性
    'scan.startup.mode' = 'latest-offset'
)
LIKE Orders;

結果表Orders_with_watermark等效於使用以下語句建立的表:

CREATE TABLE Orders_with_watermark (
    user BIGINT,
    product STRING,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 
) WITH (
    'connector' = 'kafka',
    'scan.startup.mode' = 'latest-offset'
);

表屬性的合併邏輯可以用like options來控制。

可以控制合併的表屬性如下:

  • CONSTRAINTS - 主鍵和唯一鍵約束
  • GENERATED - 計算列
  • OPTIONS - 聯結器資訊、格式化方式等配置項
  • PARTITIONS - 表分割槽資訊
  • WATERMARKS - watermark 定義

並且有三種不同的表屬性合併策略:

  • INCLUDING - 新表包含源表(source table)所有的表屬性,如果和源表的表屬性重複則會直接失敗,例如新表和源表存在相同 key 的屬性。
  • EXCLUDING - 新表不包含源表指定的任何表屬性。
  • OVERWRITING - 新表包含源表的表屬性,但如果出現重複項,則會用新表的表屬性覆蓋源表中的重複表屬性,例如,兩個表中都存在相同 key 的屬性,則會使用當前語句中定義的 key 的屬性值。

並且你可以使用INCLUDING/EXCLUDING ALL這種宣告方式來指定使用怎樣的合併策略,例如使用EXCLUDING ALL INCLUDING WATERMARKS,那麼代表只有源表的 WATERMARKS 屬性才會被包含進新表。

示例如下:

-- 儲存在檔案系統的源表
CREATE TABLE Orders_in_file (
    user BIGINT,
    product STRING,
    order_time_string STRING,
    order_time AS to_timestamp(order_time)
    
)
PARTITIONED BY user 
WITH ( 
    'connector' = 'filesystem'
    'path' = '...'
);

-- 對應儲存在 kafka 的源表
CREATE TABLE Orders_in_kafka (
    -- 新增 watermark 定義
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 
) WITH (
    'connector': 'kafka'
    ...
)
LIKE Orders_in_file (
    -- 排除需要生成 watermark 的計算列之外的所有內容。
    -- 去除不適用於 kafka 的所有分割槽和檔案系統的相關屬性。
    EXCLUDING ALL
    INCLUDING GENERATED
);

如果未提供 like 配置項(like options),預設將使用INCLUDING ALL OVERWRITING OPTIONS的合併策略。

注意:您無法選擇物理列的合併策略,當物理列進行合併時就如使用了INCLUDING策略。

注意:源表source_table可以是一個組合 ID。您可以指定不同 catalog 或者 DB 的表作為源表: 例如,my_catalog.my_db.MyTable指定了源表MyTable來源於名為MyCatalog的 catalog 和名為my_db的 DB ,my_db.MyTable指定了源表MyTable來源於當前 catalog 和名為my_db的 DB。

3 CREATE CATALOG

CREATE CATALOG catalog_name
  WITH (key1=val1, key2=val2, ...)

Create a catalog with the given catalog properties. If a catalog with the same name already exists, an exception is thrown.

WITH OPTIONS

Catalog properties used to store extra information related to this catalog. The key and value of expressionkey1=val1should both be string literal.

Check out more details atCatalogs.

4 CREATE DATABASE

CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name
  [COMMENT database_comment]
  WITH (key1=val1, key2=val2, ...)

根據給定的表屬性建立資料庫。若資料庫中已存在同名表會丟擲異常。

IF NOT EXISTS

若資料庫已經存在,則不會進行任何操作。

WITH OPTIONS

資料庫屬性一般用於儲存關於這個資料庫額外的資訊。 表示式key1=val1中的鍵和值都需要是字串文字常量。

5 CREATE VIEW

CREATE [TEMPORARY] VIEW [IF NOT EXISTS] [catalog_name.][db_name.]view_name
  [{columnName [, columnName ]* }] [COMMENT view_comment]
  AS query_expression

根據給定的 query 語句建立一個檢視。若資料庫中已經存在同名檢視會丟擲異常.

TEMPORARY

建立一個有 catalog 和資料庫名稱空間的臨時檢視,並覆蓋原有的檢視。

IF NOT EXISTS

若該檢視已經存在,則不會進行任何操作。

6 CREATE FUNCTION

CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION
  [IF NOT EXISTS] [[catalog_name.]db_name.]function_name
  AS identifier [LANGUAGE JAVA|SCALA|PYTHON]

建立一個有 catalog 和資料庫名稱空間的 catalog function ,需要指定一個 identifier ,可指定 language tag 。 若 catalog 中,已經有同名的函式註冊了,則無法註冊。

如果 language tag 是 JAVA 或者 SCALA ,則 identifier 是 UDF 實現類的全限定名。關於 JAVA/SCALA UDF 的實現,請參考自定義函式

如果 language tag 是 PYTHON ,則 identifier 是 UDF 物件的全限定名,例如pyflink.table.tests.test_udf.add。關於 PYTHON UDF 的實現,請參考Python UDFs

TEMPORARY

建立一個有 catalog 和資料庫名稱空間的臨時 catalog function ,並覆蓋原有的 catalog function 。

TEMPORARY SYSTEM

建立一個沒有資料庫名稱空間的臨時系統 catalog function ,並覆蓋系統內建的函式。

IF NOT EXISTS

若該函式已經存在,則不會進行任何操作。

LANGUAGE JAVA|SCALA|PYTHON

Language tag 用於指定 Flink runtime 如何執行這個函式。目前,只支援 JAVA, SCALA 和 PYTHON,且函式的預設語言為 JAVA。