1. 程式人生 > >Flink 原理與實現:Table & SQL API

Flink 原理與實現:Table & SQL API

轉載來源:http://wuchong.me/blog/2017/03/30/flink-internals-table-and-sql-api/

Flink 已經擁有了強大的 DataStream/DataSet API,可以基本滿足流計算和批計算中的所有需求。為什麼還需要 Table & SQL API 呢?

首先 Table API 是一種關係型API,類 SQL 的API,使用者可以像操作表一樣地操作資料,非常的直觀和方便。使用者只需要說需要什麼東西,系統就會自動地幫你決定如何最高效地計算它,而不需要像 DataStream 一樣寫一大堆 Function,優化還得純靠手工調優。另外,SQL 作為一個“人所皆知”的語言,如果一個引擎提供 SQL,它將很容易被人們接受。這已經是業界很常見的現象了。值得學習的是,Flink 的 Table API 與 SQL API 的實現,有 80% 的程式碼是共用的。所以當我們討論 Table API 時,常常是指 Table & SQL API。

Table & SQL API 還有另一個職責,就是流處理和批處理統一的API層。Flink 在runtime層是統一的,因為Flink將批任務看做流的一種特例來執行,這也是 Flink 向外鼓吹的一點。然而在程式設計模型上,Flink 卻為批和流提供了兩套API (DataSet 和 DataStream)。為什麼 runtime 統一,而程式設計模型不統一呢? 在我看來,這是本末倒置的事情。使用者才不管你 runtime 層是否統一,使用者更關心的是寫一套程式碼。這也是為什麼現在 Apache Beam 能這麼火的原因。所以 Table & SQL API 就扛起了統一API的大旗,批上的查詢會隨著輸入資料的結束而結束並生成有限結果集,流上的查詢會一直執行並生成結果流。Table & SQL API 做到了批與流上的查詢具有同樣的語法,因此不用改程式碼就能同時在批和流上跑。

聊聊歷史

Table API 始於 Flink 0.9,Flink 0.9 是一個類庫百花齊放的版本,眾所周知的 Table API, Gelly, FlinkML 都是在這個版本加進去的。Flink 0.9 大概是在2015年6月正式釋出的,在 Flink 0.9 釋出之前,社群對 SQL 展開過好幾次爭論,不過當時社群認為應該首先完善 Table API 的功能,再去搞SQL,如果兩頭一起搞很容易什麼都做不好。而且在整個Hadoop生態圈中已經有大量的所謂 “SQL-on-Hadoop” 的解決方案,譬如 Apache HiveApache Drill

Apache Impala。”SQL-on-Flink”的事情也可以像 Hadoop 一樣丟給其他社群去搞。

不過,隨著 Flink 0.9 的釋出,意味著抽象語法樹、程式碼生成、執行時函式等都已經成熟,這為SQL的整合鋪好了前進道路。另一方面,使用者對 SQL 的呼聲越來越高。2015年下半年,Timo 大神也加入了 dataArtisans,於是對Table API的改造開始了。2016 年初的時候,改造基本上完成了。我們也是在這個時間點發現了 Table API 的潛力,並加入了社群。經過這一年的努力,Flink 已經發展成 Apache 中最火熱的專案之一,而 Flink 中最活躍的類庫目前非 Table API 莫屬。這其中離不開國內公司的支援,Table API 的貢獻者絕大多數都來自於阿里巴巴和華為,並且主導著 Table API 的發展方向,這是非常令國人自豪的。而我在社群貢獻了一年後,幸運地成為了 Flink Committer。

Table API & SQL 長什麼樣?

這裡不會詳細介紹 Table API & SQL 的使用,只是做一個展示。更多使用細節方面的問題請訪問官網文件

下面這個例子展示瞭如何用 Table API 處理溫度感測器資料。計算每天每個以room開頭的location的平均溫度。例子中涉及瞭如何使用window,event-time等。

 

val sensorData: DataStream[(String, Long, Double)] = ???

// convert DataSet into Table

val sensorTable: Table = sensorData

.toTable(tableEnv, 'location, 'time, 'tempF)

// define query on Table

val avgTempCTable: Table = sensorTable

.window(Tumble over 1.day on 'rowtime as 'w)

.groupBy('location, 'w)

.select('w.start as 'day, 'location, (('tempF.avg - 32) * 0.556) as 'avgTempC)

.where('location like "room%")

下面的例子是展示瞭如何用 SQL 來實現。

 

val sensorData: DataStream[(String, Long, Double)] = ???

// register DataStream

tableEnv.registerDataStream("sensorData", sensorData, 'location, ’time, 'tempF)

// query registered Table

val avgTempCTable: Table = tableEnv.sql("""

SELECT FLOOR(rowtime() TO DAY) AS day, location,

AVG((tempF - 32) * 0.556) AS avgTempC

FROM sensorData

WHERE location LIKE 'room%'

GROUP BY location, FLOOR(rowtime() TO DAY) """)

Table API & SQL 原理

Flink 非常明智,沒有像Spark那樣重複造輪子(Spark Catalyst),而是將 SQL 校驗、SQL 解析以及 SQL 優化交給了 Apache Calcite。Calcite 在其他很多開源專案裡也都應用到了,譬如Apache Hive, Apache Drill, Apache Kylin, Cascading。Calcite 在新的架構中處於核心的地位,如下圖所示。

新的架構中,構建抽象語法樹的事情全部交給了 Calcite 去做。SQL query 會經過 Calcite 解析器轉變成 SQL 節點樹,通過驗證後構建成 Calcite 的抽象語法樹(也就是圖中的 Logical Plan)。另一邊,Table API 上的呼叫會構建成 Table API 的抽象語法樹,並通過 Calcite 提供的 RelBuilder 轉變成 Calcite 的抽象語法樹。

以上面的溫度計程式碼為樣例,Table API 和 SQL 的轉換流程如下,綠色的節點代表 Flink Table Nodes,藍色的節點代表 Calcite Logical Nodes。最終都轉化成了相同的 Logical Plan 表現形式。

之後會進入優化器,Calcite 會基於優化規則來優化這些 Logical Plan,根據執行環境的不同會應用不同的優化規則(Flink提供了批的優化規則,和流的優化規則)。這裡的優化規則分為兩類,一類是Calcite提供的內建優化規則(如條件下推,剪枝等),另一類是是將Logical Node轉變成 Flink Node 的規則。這兩類規則的應用體現為下圖中的①和②步驟,這兩步驟都屬於 Calcite 的優化階段。得到的 DataStream Plan 封裝瞭如何將節點翻譯成對應 DataStream/DataSet 程式的邏輯。步驟③就是將不同的 DataStream/DataSet Node 通過程式碼生成(CodeGen)翻譯成最終可執行的 DataStream/DataSet 程式。

程式碼生成是 Table API & SQL 中最核心的一塊內容。表示式、條件、內建函式等等是需要CodeGen出具體的Function 程式碼的,這部分跟Spark SQL的結構很相似。CodeGen 出的Function以字串的形式存在。在提交任務後會分發到各個 TaskManager 中執行,在執行時會使用 Janino 編譯器編譯程式碼後執行。

Table API & SQL 現狀

目前 Table API 對於批和流都已經支援了基本的Selection, Projection, Union,以及 Window 操作(包括固定視窗、滑動視窗、會話視窗)。SQL 的話由於 Calcite 在最近的版本中才支援 Window 語法,所以目前 Flink SQL 還不支援 Window 的語法。並且 Table API 和 SQL 都支援了UDF,UDTF,UDAF(開發中)。

Table API & SQL 未來

  1. Dynamic Tables

    Dynamic Table 就是傳統意義上的表,只不過表中的資料是會變化更新的。Flink 提出 Stream <–> Dynamic Table 之間是可以等價轉換的。不過這需要引入Retraction機制。有機會的話,我會專門寫一篇文章來介紹。

  2. Joins

    包括了支援流與流的 Join,以及流與表的 Join。

  3. SQL 客戶端

    目前 SQL 是需要內嵌到 Java/Scala 程式碼中執行的,不是純 SQL 的使用方式。未來需要支援 SQL 客戶端執行提交 SQL 純文字執行任務。

  4. 並行度設定

    目前 Table API & SQL 是無法設定並行度的,這使得 Table API 看起來仍像個玩具。

在我看來,Flink 的 Table & SQL API 是走在時代前沿的,在很多方面在做著定義業界標準的事情,比如 SQL 上Window的表達,時間語義的表達,流和批語義的統一等。在我看來,SQL 擁有更天然的流與批統一的特性,並且能夠自動幫使用者做很多SQL優化(下推、剪枝等),這是 Beam 所做不到的地方。當然,未來如果 Table & SQL API 發展成熟的話,剝離出來作為業界標準的流與批統一的API也不是不可能(叫BeamTable,BeamSQL ?),哈哈。這也是我非常看好 Table & SQL API,認為其大有潛力的一個原因。當然就目前來說,需要走的路還很長,Table API 現在還只是個玩具。