1. 程式人生 > 實用技巧 >Flink基礎(二十五):FLINK SQL(一)查詢語句(一)基本查詢

Flink基礎(二十五):FLINK SQL(一)查詢語句(一)基本查詢

來源:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html

0 簡介

SELECT 語句和 VALUES 語句需要使用TableEnvironmentsqlQuery()方法加以指定。這個方法會以Table的形式返回 SELECT (或 VALUE)的查詢結果。Table可以被用於隨後的SQL 與 Table API 查詢轉換為 DataSet 或 DataStream輸出到 TableSink。SQL 與 Table API 的查詢可以進行無縫融合、整體優化並翻譯為單一的程式。

為了可以在 SQL 查詢中訪問到表,你需要先

在 TableEnvironment 中登錄檔。表可以通過TableSourceTableCREATE TABLE 語句DataStream 或 DataSet註冊。 使用者也可以通過向 TableEnvironment 中註冊 catalog的方式指定資料來源的位置。

為方便起見Table.toString()將會在其TableEnvironment中自動使用一個唯一的名字登錄檔並返回表名。 因此,Table物件可以如下文所示樣例,直接內聯到 SQL 語句中。

注意:查詢若包括了不支援的 SQL 特性,將會丟擲TableException。批處理和流處理所支援的 SQL 特性將會在下述章節中列出。

1指定查詢

以下示例顯示如何在已註冊和內聯表上指定 SQL 查詢。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)

//  從外部資料來源讀取 DataStream 
val ds: DataStream[(Long, String, Integer)] = env.addSource(...)

// 使用 SQL 查詢內聯的(未註冊的)表
val table = ds.toTable(tableEnv, $"user", $"product", $"amount")
val result 
= tableEnv.sqlQuery( s"SELECT SUM(amount) FROM $table WHERE product LIKE '%Rubber%'") // 使用名稱 "Orders" 註冊一個 DataStream tableEnv.createTemporaryView("Orders", ds, $"user", $"product", $"amount") // 在表上執行 SQL 查詢並得到以新表返回的結果 val result2 = tableEnv.sqlQuery( "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'") // 建立並註冊一個 TableSink val schema = new Schema() .field("product", DataTypes.STRING()) .field("amount", DataTypes.INT()) tableEnv.connect(new FileSystem().path("/path/to/file")) .withFormat(...) .withSchema(schema) .createTemporaryTable("RubberOrders") // 在表上執行插入操作,並把結果發出到 TableSink tableEnv.executeSql( "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")

2執行查詢

SELECT 語句或者 VALUES 語句可以通過TableEnvironment.executeSql()方法來執行,將選擇的結果收集到本地。該方法返回TableResult物件用於包裝查詢的結果。和 SELECT 語句很像,一個Table物件可以通過Table.execute()方法執行從而將Table的內容收集到本地客戶端。TableResult.collect()方法返回一個可以關閉的行迭代器。除非所有的資料都被收集到本地,否則一個查詢作業永遠不會結束。所以我們應該通過CloseableIterator#close()方法主動地關閉作業以防止資源洩露。 我們還可以通過TableResult.print()方法將查詢結果列印到本地控制檯。TableResult中的結果資料只能被訪問一次,因此一個TableResult例項中,collect()方法和print()方法不能被同時使用。

對於流模式,TableResult.collect()方法或者TableResult.print方法保證端到端精確一次的資料交付。這就要求開啟 checkpointing。預設情況下 checkpointing 是禁止的,我們可以通過TableConfig設定 checkpointing 相關屬性(請參考checkpointing 配置)來開啟 checkpointing。 因此一條結果資料只有在其對應的 checkpointing 完成後才能在客戶端被訪問。

注意:對於流模式,當前僅支援追加模式的查詢語句,並且應該開啟 checkpoint。因為一條結果只有在其對應的 checkpoint 完成之後才能被客戶端訪問到。

val env = StreamExecutionEnvironment.getExecutionEnvironment()
val tableEnv = StreamTableEnvironment.create(env, settings)
// enable checkpointing
tableEnv.getConfig.getConfiguration.set(
  ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE)
tableEnv.getConfig.getConfiguration.set(
  ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10))

tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")

// execute SELECT statement
val tableResult1 = tableEnv.executeSql("SELECT * FROM Orders")
val it = tableResult1.collect()
try while (it.hasNext) {
  val row = it.next
  // handle row
}
finally it.close() // close the iterator to avoid resource leak

// execute Table
val tableResult2 = tableEnv.sqlQuery("SELECT * FROM Orders").execute()
tableResult2.print()

3 語法

Flink 通過支援標準 ANSI SQL的Apache Calcite解析 SQL。

以下 BNF-語法 描述了批處理和流處理查詢中所支援的 SQL 特性的超集。其中操作符章節展示了所支援的特性的樣例,並指明瞭哪些特性僅適用於批處理或流處理。

query:
  values
  | {
      select
      | selectWithoutFrom
      | query UNION [ ALL ] query
      | query EXCEPT query
      | query INTERSECT query
    }
    [ ORDER BY orderItem [, orderItem ]* ]
    [ LIMIT { count | ALL } ]
    [ OFFSET start { ROW | ROWS } ]
    [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]

orderItem:
  expression [ ASC | DESC ]

select:
  SELECT [ ALL | DISTINCT ]
  { * | projectItem [, projectItem ]* }
  FROM tableExpression
  [ WHERE booleanExpression ]
  [ GROUP BY { groupItem [, groupItem ]* } ]
  [ HAVING booleanExpression ]
  [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]

selectWithoutFrom:
  SELECT [ ALL | DISTINCT ]
  { * | projectItem [, projectItem ]* }

projectItem:
  expression [ [ AS ] columnAlias ]
  | tableAlias . *

tableExpression:
  tableReference [, tableReference ]*
  | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]

joinCondition:
  ON booleanExpression
  | USING '(' column [, column ]* ')'

tableReference:
  tablePrimary
  [ matchRecognize ]
  [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]

tablePrimary:
  [ TABLE ] [ [ catalogName . ] schemaName . ] tableName [ dynamicTableOptions ]
  | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
  | UNNEST '(' expression ')'

dynamicTableOptions:
  /*+ OPTIONS(key=val [, key=val]*) */

key:
  stringLiteral

val:
  stringLiteral

values:
  VALUES expression [, expression ]*

groupItem:
  expression
  | '(' ')'
  | '(' expression [, expression ]* ')'
  | CUBE '(' expression [, expression ]* ')'
  | ROLLUP '(' expression [, expression ]* ')'
  | GROUPING SETS '(' groupItem [, groupItem ]* ')'

windowRef:
    windowName
  | windowSpec

windowSpec:
    [ windowName ]
    '('
    [ ORDER BY orderItem [, orderItem ]* ]
    [ PARTITION BY expression [, expression ]* ]
    [
        RANGE numericOrIntervalExpression {PRECEDING}
      | ROWS numericExpression {PRECEDING}
    ]
    ')'

matchRecognize:
      MATCH_RECOGNIZE '('
      [ PARTITION BY expression [, expression ]* ]
      [ ORDER BY orderItem [, orderItem ]* ]
      [ MEASURES measureColumn [, measureColumn ]* ]
      [ ONE ROW PER MATCH ]
      [ AFTER MATCH
            ( SKIP TO NEXT ROW
            | SKIP PAST LAST ROW
            | SKIP TO FIRST variable
            | SKIP TO LAST variable
            | SKIP TO variable )
      ]
      PATTERN '(' pattern ')'
      [ WITHIN intervalLiteral ]
      DEFINE variable AS condition [, variable AS condition ]*
      ')'

measureColumn:
      expression AS alias

pattern:
      patternTerm [ '|' patternTerm ]*

patternTerm:
      patternFactor [ patternFactor ]*

patternFactor:
      variable [ patternQuantifier ]

patternQuantifier:
      '*'
  |   '*?'
  |   '+'
  |   '+?'
  |   '?'
  |   '??'
  |   '{' { [ minRepeat ], [ maxRepeat ] } '}' ['?']
  |   '{' repeat '}'

Flink SQL 對於識別符號(表、屬性、函式名)有類似於 Java 的詞法約定:

  • 不管是否引用識別符號,都保留識別符號的大小寫。
  • 且識別符號需區分大小寫。
  • 與 Java 不一樣的地方在於,通過反引號,可以允許識別符號帶有非字母的字元(如:"SELECT a AS `my field` FROM t")。

字串文字常量需要被單引號包起來(如SELECT 'Hello World')。兩個單引號表示轉移(如SELECT 'It''s me.')。字串文字常量支援 Unicode 字元,如需明確使用 Unicode 編碼,請使用以下語法:

  • 使用反斜槓(\)作為轉義字元(預設):SELECT U&'\263A'
  • 使用自定義的轉義字元:SELECT U&'#263A' UESCAPE '#'