Flink實戰(六) - Table API & SQL程式設計
1 意義
1.1 分層的 APIs & 抽象層次
Flink提供三層API。 每個API在簡潔性和表達性之間提供不同的權衡,並針對不同的用例。
而且Flink提供不同級別的抽象來開發流/批處理應用程式
- 最低階抽象只提供有狀態流。它通過Process Function嵌入到DataStream API中。它允許使用者自由處理來自一個或多個流的事件,並使用一致的容錯狀態。此外,使用者可以註冊事件時間和處理時間回撥,允許程式實現複雜的計算。
- 實際上,大多數應用程式不需要上述低階抽象,而是針對Core API程式設計, 如DataStream API(有界/無界流)和DataSet API (有界資料集)。這些流暢的API提供了用於資料處理的通用構建塊,例如各種形式的使用者指定的轉換,連線,聚合,視窗,狀態等。在這些API中處理的資料型別在相應的程式語言中表示為類。
低階Process Function與DataStream API整合,因此只能對某些 運算元操作進行低階抽象。該資料集API提供的有限資料集的其他原語,如迴圈/迭代。 - 該 Table API 是為中心的宣告性DSL 表,其可被動態地改變的表(表示流時)。該 Table API遵循(擴充套件)關係模型:表有一個模式連線(類似於在關係資料庫中的表)和API提供可比的 運算元操作,如選擇,專案,連線,分組依據,聚合等 Table API程式以宣告方式定義應該執行的邏輯 運算元操作,而不是準確指定 運算元操作程式碼的外觀。雖然 Table API可以通過各種型別的使用者定義函式進行擴充套件,但它的表現力不如Core API,但使用更簡潔(編寫的程式碼更少)。此外, Table API程式還會通過優化程式,在執行之前應用優化規則。
可以在表和DataStream / DataSet之間無縫轉換,允許程式混合 Table API以及DataStream 和DataSet API。 - Flink提供的最高階抽象是SQL。這種抽象在語義和表達方面類似於 Table API,但是將程式表示為SQL查詢表示式。在SQL抽象與 Table API緊密地相互作用,和SQL查詢可以通過定義表來執行 Table API。1.2 模型類比MapReduce ==> Hive SQL
Spark ==> Spark SQL
Flink ==> SQL
2 總覽
2.1 簡介
Apache Flink具有兩個關係型API
- Table API
- SQL
用於統一流和批處理
Table API是Scala和Java語言整合查詢API,可以非常直觀的方式組合來自關係運算元的查詢(e.g. 選擇,過濾和連線).
Flink的SQL支援基於實現SQL標準的Apache Calcite。無論輸入是批輸入(DataSet)還是流輸入(DataStream),任一介面中指定的查詢都具有相同的語義並指定相同的結果。
Table API和SQL介面彼此緊密整合,就如Flink的DataStream和DataSet API。我們可以輕鬆地在基於API構建的所有API和庫之間切換。例如,可以使用CEP庫從DataStream中提取模式,然後使用 Table API分析模式,或者可以在預處理上執行Gelly圖演算法之前使用SQL查詢掃描,過濾和聚合批處理表資料。
Table API和SQL尚未完成並且正在積極開發中。並非 Table API,SQL和stream,batch輸入的每種組合都支援所有運算元操作
2.2 依賴結構
所有Table API和SQL元件都捆綁在flink-table Maven工件中。
以下依賴項與大多數專案相關:
- flink-table-common
通過自定義函式,格式等擴充套件表生態系統的通用模組。 - flink-table-api-java
使用Java程式語言的純表程式的表和SQL API(在早期開發階段,不推薦!)。 - flink-table-api-scala
使用Scala程式語言的純表程式的表和SQL API(在早期開發階段,不推薦!)。 - flink-table-api-java-bridge
使用Java程式語言支援DataStream / DataSet API的Table&SQL API。 - flink-table-api-scala-bridge
使用Scala程式語言支援DataStream / DataSet API的Table&SQL API。 - flink-table-planner
表程式規劃器和執行時。 - flink-table-uber
將上述模組打包成大多數Table&SQL API用例的發行版。 uber JAR檔案flink-table * .jar位於Flink版本的/ opt目錄中,如果需要可以移動到/ lib。
2.3 專案依賴
必須將以下依賴項新增到專案中才能使用Table API和SQL來定義管道:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.8.0</version>
</dependency>
此外,根據目標程式語言,您需要新增Java或Scala API。
<!-- Either... -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<!-- or... -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.8.0</version>
</dependency>
在內部,表生態系統的一部分是在Scala中實現的。 因此,請確保為批處理和流應用程式新增以下依賴項:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.8.0</version>
</dependency>
2.4 擴充套件依賴
如果要實現與Kafka或一組使用者定義函式互動的自定義格式,以下依賴關係就足夠了,可用於SQL客戶端的JAR檔案:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.8.0</version>
</dependency>
目前,該模組包括以下擴充套件點:
- SerializationSchemaFactory
- DeserializationSchemaFactory
- ScalarFunction
- TableFunction
- AggregateFunction
3 概念和通用API
Table API和SQL整合在一個聯合API中。此API的核心概念是Table用作查詢的輸入和輸出。本文件顯示了具有 Table API和SQL查詢的程式的常見結構,如何註冊Table,如何查詢Table以及如何發出Table。
3.1 Table API和SQL程式的結構
批處理和流式傳輸的所有 Table API和SQL程式都遵循相同的模式。以下程式碼示例顯示了 Table API和SQL程式的常見結構。
// 對於批處理程式,使用ExecutionEnvironment而不是StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 建立一個TableEnvironment
// 對於批處理程式使用BatchTableEnvironment而不是StreamTableEnvironment
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 註冊一個 Table
tableEnv.registerTable("table1", ...) // 或者
tableEnv.registerTableSource("table2", ...); // 或者
tableEnv.registerExternalCatalog("extCat", ...);
// 註冊一個輸出 Table
tableEnv.registerTableSink("outputTable", ...);
/ 從 Table API query 建立一個Table
Table tapiResult = tableEnv.scan("table1").select(...);
// 從 SQL query 建立一個Table
Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ... ");
// 將表API結果表傳送到TableSink,對於SQL結果也是如此
tapiResult.insertInto("outputTable");
// 執行
env.execute();
3.2 將DataStream或DataSet轉換為表
它也可以直接轉換為a 而不是註冊a DataStream或DataSetin 。如果要在 Table API查詢中使用Table,這很方便。TableEnvironmentTable
// 獲取StreamTableEnvironment
//在BatchTableEnvironment中註冊DataSet是等效的
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
DataStream<Tuple2<Long, String>> stream = ...
// 將DataStream轉換為預設欄位為“f0”,“f1”的表
Table table1 = tableEnv.fromDataStream(stream);
// 將DataStream轉換為包含欄位“myLong”,“myString”的表
Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");
- sale.csv檔案
- Scala
- Java
還不完善,等日後Flink該模組開發完畢再深入研究!
參考
Table API & SQL