Flink基礎(二十五):FLINK SQL(一)查詢語句(一)基本查詢
來源:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html
0 簡介
SELECT 語句和 VALUES 語句需要使用TableEnvironment
的sqlQuery()
方法加以指定。這個方法會以Table
的形式返回 SELECT (或 VALUE)的查詢結果。Table
可以被用於隨後的SQL 與 Table API 查詢、轉換為 DataSet 或 DataStream或輸出到 TableSink。SQL 與 Table API 的查詢可以進行無縫融合、整體優化並翻譯為單一的程式。
為了可以在 SQL 查詢中訪問到表,你需要先
為方便起見Table.toString()
將會在其TableEnvironment
中自動使用一個唯一的名字登錄檔並返回表名。 因此,Table
物件可以如下文所示樣例,直接內聯到 SQL 語句中。
注意:查詢若包括了不支援的 SQL 特性,將會丟擲TableException
。批處理和流處理所支援的 SQL 特性將會在下述章節中列出。
1指定查詢
以下示例顯示如何在已註冊和內聯表上指定 SQL 查詢。
val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(env) // 從外部資料來源讀取 DataStream val ds: DataStream[(Long, String, Integer)] = env.addSource(...) // 使用 SQL 查詢內聯的(未註冊的)表 val table = ds.toTable(tableEnv, $"user", $"product", $"amount") val result= tableEnv.sqlQuery( s"SELECT SUM(amount) FROM $table WHERE product LIKE '%Rubber%'") // 使用名稱 "Orders" 註冊一個 DataStream tableEnv.createTemporaryView("Orders", ds, $"user", $"product", $"amount") // 在表上執行 SQL 查詢並得到以新表返回的結果 val result2 = tableEnv.sqlQuery( "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'") // 建立並註冊一個 TableSink val schema = new Schema() .field("product", DataTypes.STRING()) .field("amount", DataTypes.INT()) tableEnv.connect(new FileSystem().path("/path/to/file")) .withFormat(...) .withSchema(schema) .createTemporaryTable("RubberOrders") // 在表上執行插入操作,並把結果發出到 TableSink tableEnv.executeSql( "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
2執行查詢
SELECT 語句或者 VALUES 語句可以通過TableEnvironment.executeSql()
方法來執行,將選擇的結果收集到本地。該方法返回TableResult
物件用於包裝查詢的結果。和 SELECT 語句很像,一個Table
物件可以通過Table.execute()
方法執行從而將Table
的內容收集到本地客戶端。TableResult.collect()
方法返回一個可以關閉的行迭代器。除非所有的資料都被收集到本地,否則一個查詢作業永遠不會結束。所以我們應該通過CloseableIterator#close()
方法主動地關閉作業以防止資源洩露。 我們還可以通過TableResult.print()
方法將查詢結果列印到本地控制檯。TableResult
中的結果資料只能被訪問一次,因此一個TableResult
例項中,collect()
方法和print()
方法不能被同時使用。
對於流模式,TableResult.collect()
方法或者TableResult.print
方法保證端到端精確一次的資料交付。這就要求開啟 checkpointing。預設情況下 checkpointing 是禁止的,我們可以通過TableConfig
設定 checkpointing 相關屬性(請參考checkpointing 配置)來開啟 checkpointing。 因此一條結果資料只有在其對應的 checkpointing 完成後才能在客戶端被訪問。
注意:對於流模式,當前僅支援追加模式的查詢語句,並且應該開啟 checkpoint。因為一條結果只有在其對應的 checkpoint 完成之後才能被客戶端訪問到。
val env = StreamExecutionEnvironment.getExecutionEnvironment() val tableEnv = StreamTableEnvironment.create(env, settings) // enable checkpointing tableEnv.getConfig.getConfiguration.set( ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE) tableEnv.getConfig.getConfiguration.set( ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10)) tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)") // execute SELECT statement val tableResult1 = tableEnv.executeSql("SELECT * FROM Orders") val it = tableResult1.collect() try while (it.hasNext) { val row = it.next // handle row } finally it.close() // close the iterator to avoid resource leak // execute Table val tableResult2 = tableEnv.sqlQuery("SELECT * FROM Orders").execute() tableResult2.print()
3 語法
Flink 通過支援標準 ANSI SQL的Apache Calcite解析 SQL。
以下 BNF-語法 描述了批處理和流處理查詢中所支援的 SQL 特性的超集。其中操作符章節展示了所支援的特性的樣例,並指明瞭哪些特性僅適用於批處理或流處理。
query: values | { select | selectWithoutFrom | query UNION [ ALL ] query | query EXCEPT query | query INTERSECT query } [ ORDER BY orderItem [, orderItem ]* ] [ LIMIT { count | ALL } ] [ OFFSET start { ROW | ROWS } ] [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY] orderItem: expression [ ASC | DESC ] select: SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ] [ GROUP BY { groupItem [, groupItem ]* } ] [ HAVING booleanExpression ] [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ] selectWithoutFrom: SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } projectItem: expression [ [ AS ] columnAlias ] | tableAlias . * tableExpression: tableReference [, tableReference ]* | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ] joinCondition: ON booleanExpression | USING '(' column [, column ]* ')' tableReference: tablePrimary [ matchRecognize ] [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ] tablePrimary: [ TABLE ] [ [ catalogName . ] schemaName . ] tableName [ dynamicTableOptions ] | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')' | UNNEST '(' expression ')' dynamicTableOptions: /*+ OPTIONS(key=val [, key=val]*) */ key: stringLiteral val: stringLiteral values: VALUES expression [, expression ]* groupItem: expression | '(' ')' | '(' expression [, expression ]* ')' | CUBE '(' expression [, expression ]* ')' | ROLLUP '(' expression [, expression ]* ')' | GROUPING SETS '(' groupItem [, groupItem ]* ')' windowRef: windowName | windowSpec windowSpec: [ windowName ] '(' [ ORDER BY orderItem [, orderItem ]* ] [ PARTITION BY expression [, expression ]* ] [ RANGE numericOrIntervalExpression {PRECEDING} | ROWS numericExpression {PRECEDING} ] ')' matchRecognize: MATCH_RECOGNIZE '(' [ PARTITION BY expression [, expression ]* ] [ ORDER BY orderItem [, orderItem ]* ] [ MEASURES measureColumn [, measureColumn ]* ] [ ONE ROW PER MATCH ] [ AFTER MATCH ( SKIP TO NEXT ROW | SKIP PAST LAST ROW | SKIP TO FIRST variable | SKIP TO LAST variable | SKIP TO variable ) ] PATTERN '(' pattern ')' [ WITHIN intervalLiteral ] DEFINE variable AS condition [, variable AS condition ]* ')' measureColumn: expression AS alias pattern: patternTerm [ '|' patternTerm ]* patternTerm: patternFactor [ patternFactor ]* patternFactor: variable [ patternQuantifier ] patternQuantifier: '*' | '*?' | '+' | '+?' | '?' | '??' | '{' { [ minRepeat ], [ maxRepeat ] } '}' ['?'] | '{' repeat '}'
Flink SQL 對於識別符號(表、屬性、函式名)有類似於 Java 的詞法約定:
- 不管是否引用識別符號,都保留識別符號的大小寫。
- 且識別符號需區分大小寫。
- 與 Java 不一樣的地方在於,通過反引號,可以允許識別符號帶有非字母的字元(如:
"SELECT a AS `my field` FROM t"
)。
字串文字常量需要被單引號包起來(如SELECT 'Hello World'
)。兩個單引號表示轉移(如SELECT 'It''s me.'
)。字串文字常量支援 Unicode 字元,如需明確使用 Unicode 編碼,請使用以下語法:
- 使用反斜槓(
\
)作為轉義字元(預設):SELECT U&'\263A'
- 使用自定義的轉義字元:
SELECT U&'#263A' UESCAPE '#'