1. 程式人生 > 其它 >深入解讀 Flink SQL 1.13

深入解讀 Flink SQL 1.13

簡介:Apache Flink 社群 5 月 22 日北京站 Meetup 分享內容整理,深入解讀 Flink SQL 1.13 中 5 個 FLIP 的實用更新和重要改進。

本文由社群志願者陳政羽整理,Apache Flink 社群在 5 月份釋出了 1.13 版本,帶來了很多新的變化。文章整理自徐榜江(雪盡) 5 月 22 日在北京的 Flink Meetup 分享的《深入解讀 Flink SQL 1.13》,內容包括:

  1. Flink SQL 1.13 概覽
  2. 核心 feature 解讀
  3. 重要改進解讀
  4. Flink SQL 1.14 未來規劃
  5. 總結

GitHub 地址
https://github.com/apache/flink


歡迎大家給 Flink 點贊送 star~

一、Flink SQL 1.13 概覽

Flink 1.13 是一個社群大版本,解決的 issue 在 1000 個以上,通過上圖我們可以看到,解決的問題大部分是關於 Table/SQL 模組,一共 400 多個 issue 佔了總體的 37% 左右。這些 issue 主要圍繞了 5 個 FLIP 展開,在本文中我們也會根據這 5 個方面進行介紹,它們分別是:

下面我們對這些 FLIP 進行詳細解讀。

二、 核心 feature 解讀

1. FLIP-145:支援 Window TVF

社群的小夥伴應該瞭解,在騰訊、阿里巴巴、位元組跳動等公司的內部分支已經開發了這個功能的基礎版本。這次 Flink 社群也在 Flink 1.13 推出了 TVF 的相關支援和優化。下面將從 Window TVF 語法、近實時累計計算場景、 Window 效能優化、多維資料分析,來分析這個新功能。

1.1 Window TVF 語法

在 1.13 版本前,window 的實現是通過一個特殊的 SqlGroupedWindowFunction:

SELECT 
    TUMBLE_START(bidtime,INTERVAL '10' MINUTE),
  TUMBLE_END(bidtime,INTERVAL '10' MINUTE),
  TUMBLE_ROWTIME(bidtime,INTERVAL '10' MINUTE),
  SUM(price)
FROM MyTable
GROUP BY TUMBLE(bidtime,INTERVAL '10' MINUTE)

在 1.13 版本中,我們對它進行了 Table-Valued Function 的語法標準化:

SELECT WINDOW_start,WINDOW_end,WINDOW_time,SUM(price) 
FROM Table(TUMBLE(Table myTable,DESCRIPTOR(biztime),INTERVAL '10' MINUTE))
GROUP BY WINDOW_start,WINDOW_end

通過對比兩種語法,我們可以發現:TVF 語法更加靈活,不需要必須跟在 GROUP BY 關鍵字後面,同時 Window TVF 基於關係代數,使得其更加標準。在只需要劃分視窗場景時,可以只用 TVF,無需用 GROUP BY 做聚合,這使得 TVF 擴充套件性和表達能力更強,支援自定義 TVF(例如實現 TOP-N 的 TVF)。

上圖中的示例就是利用 TVF 做的滾動視窗的劃分,只需要把資料劃分到視窗,無需聚合;如果後續需要聚合,再進行 GROP BY 即可。同時,對於熟悉批 SQL 的使用者來說,這種操作是非常自然的,我們不再需要像 1.13 版本之前那樣必須要用特殊的 SqlGroupedWindowFunction 將視窗劃分和聚合繫結在一起。

目前 Window TVF 支援 tumble window,hop window,新增了 cumulate window;session window 預計在 1.14 版本也會支援。

1.2 Cumulate Window

Cumulate window 就是累計視窗,簡單來說,以上圖裡面時間軸上的一個區間為視窗步長。

  • 第一個 window 統計的是一個區間的資料;
  • 第二個 window 統計的是第一區間和第二個區間的資料;
  • 第三個 window 統計的是第一區間,第二個區間和第三個區間的資料。

累積計算在業務場景中非常常見,如累積 UV 場景。在 UV 大盤曲線中:我們每隔 10 分鐘統計一次當天累積使用者 UV。

在 1.13 版本之前,當需要做這種計算時,我們一般的 SQL 寫法如下:

INSERT INTO cumulative_UV
SELECT date_str,MAX(time_str),COUNT(DISTINCT user_id) as UV
FROM (
    SELECT
      DATE_FORMAT(ts,'yyyy-MM-dd') as date_str,
      SUBSTR(DATE_FORMAT(ts,'HH:mm'),1,4) || '0' as time_str,
      user_id
  FROM user_behavior
)
GROUP BY date_str

先將每條記錄所屬的時間視窗欄位拼接好,然後再對所有記錄按照拼接好的時間視窗欄位,通過 GROUP BY 做聚合,從而達到近似累積計算的效果。

  • 1.13 版本前的寫法有很多缺點,首先這個聚合操作是每條記錄都會計算一次。其次,在追逆資料的時候,消費堆積的資料時,UV 大盤的曲線就會跳變。
  • 在 1.13 版本支援了 TVF 寫法,基於 cumulate window,我們可以修改為下面的寫法,將每條資料按照 Event Time 精確地分到每個 Window 裡面, 每個視窗的計算通過 watermark 觸發,即使在追資料場景中也不會跳變。
INSERT INTO cumulative_UV
SELECT WINDOW_end,COUNT(DISTINCT user_id) as UV
FROM Table(
    CUMULATE(Table user_behavior,DESCRIPTOR(ts),INTERVAL '10' MINUTES,INTERVAL '1' DAY))
)
GROUP BY WINDOW_start,WINDOW_end

UV 大盤曲線效果如下圖所示:

1.3 Window 效能優化

Flink 1.13 社群開發者們對 Window TVF 進行了一系列的效能優化,包括:

  • 記憶體優化:通過記憶體預分配,快取 window 的資料,通過 window watermark 觸發計算,通過申請一些記憶體 buffer 避免高頻的訪問 state;
  • 切片優化:將 window 切片,儘可能複用已計算結果,如 hop window,cumulate window。計算過的分片資料無需再次計算,只需對切片的計算結果進行復用;
  • 運算元優化:window 運算元支援 local-global 優化;同時支援 count(distinct) 自動解熱點優化;
  • 遲到資料:支援將遲到資料計算到後續分片,保證資料準確性。

基於這些優化,我們通過開源 Benchmark (Nexmark) 進行效能測試。結果顯示 window 的普適性能有 2x 提升,且在 count(distinct) 場景會有更好的效能提升。

1.4 多維資料分析

語法的標準化帶來了更多的靈活性和擴充套件性,使用者可以直接在 window 視窗函式上進行多維分析。如下圖所示,可以直接進行 GROUPING SETS、ROLLUP、CUBE 的分析計算。如果是在 1.13 之前的版本,我們可能需要對這些分組進行單獨的 SQL 聚合,再對聚合結果做 union 操作才能達到類似的效果。而現在,類似這種多維分析的場景,可以直接在 window TVF 上支援。

支援 Window Top-N

除了多維分析,Window TVF 也支援 Top-N 語法,使得在 Window 上取 Top-N 的寫法更加簡單。

2. FLIP-162:時區和時間函式

2.1 時區問題分析

大家在使用 Flink SQL 時反饋了很多時區相關的問題,造成時區問題的原因可以歸納為 3 個:

  • PROCTIME() 函式應該考慮時區,但未考慮時區;
  • CURRENT_TIMESTAMP/CURRENT_TIME/CURRENT_DATE/NOW() 函式未考慮時區;
  • Flink 的時間屬性,只支援定義在 TIMESTAMP 這種資料型別上面,這個型別是無時區的,TIMESTAMP 型別不考慮時區,但使用者希望是本地時區的時間。

針對 TIMESTAMP 型別沒有考慮時區的問題,我們提議通過TIMESTAMP_LTZ型別支援 (TIMESTAMP_LTZ 是 timestamp with local time zone 的縮寫)。可以通過下面的表格來進行和 TIMESTAMP 的對比:

TIMESTAMP_LTZ 區別於之前我們使用的 TIMESTAMP,它表示絕對時間的含義。通過對比我們可以發現:

  • 如果我們配置使用 TIMESTAMP,它可以是字串型別的。使用者不管是從英國還是中國時區來觀察,這個值都是一樣的;
  • 但是對於 TIMSTAMP_TLZ 來說,它的來源就是一個 Long 值,表示從時間原點流逝過的時間。同一時刻,從時間原點流逝的時間在所有時區都是相同的,所以這個 Long 值是絕對時間的概念。當我們在不同的時區去觀察這個值,我們會用本地的時區去解釋成 “年-月-日-時-分-秒” 的可讀格式,這就是 TIMSTAMP_TLZ 型別,TIMESTAMP_LTZ 型別也更加符合使用者在不同時區下的使用習慣。

下面的例子展示了 TIMESTAMP 和 TIMESTAMP_LTZ 兩個型別的區別。

2.2 時間函式糾正

訂正 PROCTIME() 函式

當我們有了 TIMESTAMP_LTZ 這個型別的時候,我們對 PROCTIME() 型別做了糾正:在 1.13 版本之前,它總是返回 UTC 的 TIMESTAMP;而現在,我們把返回型別變為了 TIMESTAMP_LTZ。PROCTIME 除了表示函式之外,也可以表示時間屬性的標記。

訂正 CURRENT_TIMESTAMP/CURRENT_TIME/CURRENT_DATE/NOW() 函式

這些函式在不同時區下出來的值是會發生變化的。例如在英國 UTC 時區時候是凌晨 2 點;但是如果你設定了時區是 UTC+8,時間就是在早上的 10 點。不同時區的實際時間會發生變化,效果如下圖:

解決 processing time Window 時區問題

大家都知道 proctime 可以表示一個時間屬性,對 proctime 的 window 操作:

  • 在 1.13 版本之前,如果我們需要做按天的 window 操作,你需要手動解決時區問題,去做一些 8 小時的偏移然後再減回去;
  • 在 FLIP-162 中我們解決了這個問題,現在使用者使用的時候十分簡單,只需要宣告 proctime 屬性,因為 PROCTIME() 函式的返回值是TIMESTAMP_LTZ,所以結果是會考慮本地的時區。下圖的例子顯示了在不同的時區下,proctime 屬性的 window 的聚合是按照本地時區進行的。

訂正 Streaming 和 Batch 模式下函式取值方式

時間函式其實在流和批上面的表現形式會有所區別,這次修正主要是讓其更加符合使用者實際的使用習慣。例如以下函式:

  • 在流模式中是 per-record 計算,即每條資料都計算一次;
  • 在 Batch 模式是 query-start 計算,即在作業開始前計算一次。例如我們常用的一些 Batch 計算引擎,如 Hive 也是在每一個批開始前計算一次。

2.3 時間型別使用

在 1.13 版本也支援了在 TIMESTAMP 列上定義 Event time,也就是說Event time 現在既支援定義在 TIMESTAMP 列上,也支援定義在 TIMESTAMP_ LTZ 列上。那麼作為使用者,具體什麼場景用什麼型別呢?

  • 當作業的上游源資料包含了字串的時間(如:2021-4-15 14:00:00)這樣的場景,直接宣告為 TIMESTAMP 然後把 Event time 定義在上面即可,視窗在計算的時候會基於時間字串進行切分,最終會計算出符合你實際想要的預想結果;
  • 當上遊資料來源的打點時間屬於 long 值,表示的是一個絕對時間的含義。在 1.13 版本你可以把 Event time 定義在 TIMESTAMP_LTZ 上面。此時定義在 TIMESTAMP_LTZ 型別上的各種 WINDOW 聚合,都能夠自動的解決 8 小時的時區偏移問題,無需按照之前的 SQL 寫法額外做時區的修改和訂正。

小提示:Flink SQL 中關於時間函式,時區支援的這些提升,是版本不相容的。使用者在進行版本更新的時候需要留意作業邏輯中是否包含此類函式,避免升級後業務受到影響。

2.4 夏令時支援

在 Flink 1.13 以前,對於國外夏令時時區的使用者,做視窗相關的計算操作是十分困難的一件事,因為存在夏令時和冬令時切換的跳變。

Flink 1.13 通過支援在 TIMESTAMP_LTZ 列上定義時間屬性,同時 Flink SQL 在 WINDOW 處理時巧妙地結合 TIMESTAMP 和 TIMESTAMP_LTZ 型別,優雅地支援了夏令時。這對國外夏令時時區使用者,以及有海外業務場景的公司比較有用。

三、重要改進解讀

1. FLIP-152:提升 Hive 語法相容性

FLIP-152 主要是做了 Hive 語法的相容性增強,支援了 Hive 的一些常用 DML 和 DQL 語法,包括:

通過 Hive dialect 支援 Hive 常用語法。Hive 有很多的內建函式,Hive dialect 需要配合 HiveCatalog 和 Hive Module 一起使用,Hive Module 提供了 Hive 所有內建函式,載入後可以直接訪問。

與此同時,我們還可以通過 Hive dialect 建立/刪除 Catalog 函式以及一些自定義的函式,這樣使得 Flink SQL 與 Hive 的相容性得到了極大的提升,讓熟悉 Hive 的使用者使用起來會更加方便。

2. FLIP-163:改進 SQL Client

在 1.13 版本之前,大家覺得 Flink SQL Client 就是周邊的一個小工具。但是,FLIP-163 在 1.13 版本進行了重要改進:

  1. 通過 -i 的引數,提前把 DDL 一次性載入初始化,方便初始化表的多個 DDL 語句,不需要多次執行命令建立表,替代了之前用 yaml 檔案方式建立表;
  2. 支援 -f 引數,其中 SQL 檔案支援 DML(insert into)語句;
  3. 支援更多實用的配置:

    • 通過SET SQL-client.verbose = true, 開啟 verbose,通過開啟 verbose 列印整個資訊,相對以前只輸出一句話更加容易追蹤錯誤資訊;
    • 通過SET execution.runtime-mode=streaming / batch支援設定批/流作業模式;
    • 通過SET pipline.name=my_Flink_job設定作業名稱;
    • 通過SET execution.savepoint.path=/tmp/Flink-savepoints/savepoint-bb0dab設定作業 savepoint 路徑;
    • 對於有依賴的多個作業,通過SET Table.dml-sync=true去選擇是否非同步執行,例如離線作業,作業 a 跑完才能跑作業 b ,通過設定為 true 實現執行有依賴關係的 pipeline 排程。
  4. 同時支援 STATEMENT SET語法:

    有可能我們的一個查詢不止寫到一個 sink 裡面,而是需要輸出到多個 sink,比如一個 sink 寫到 jdbc,一個 sink 寫到 HBase。

    • 在 1.13 版本之前需要啟動 2 個 query 去完成這個作業;
    • 在 1.13 版本,我們可以把這些放到一個 statement 裡面,以一個作業的方式去執行,能夠實現節點的複用,節約資源。

3. FLIP-136:增強 DataStream 和 Table 的轉換

雖然 Flink SQL 大大降低了我們使用實時計算的一些使用門檻,但 Table/SQL 這種高階封裝也遮蔽了一些底層實現,如 timer,state 等。不少高階使用者希望能夠直接操作 DataStream 獲得更多的靈活性,這就需要在 Table 和 DataStream 之間進行轉換。FLIP-136 增強了 Table 和 DataStream 間的轉換,使得使用者在兩者之間的轉換更加容易。

  • 支援 DataStream 和 Table 轉換時傳遞 EVENT TIME 和 WATERMARK;
Table Table = TableEnv.fromDataStream(
    dataStream,
  Schema.newBuilder()
  .columnByMetadata("rowtime","TIMESTMP(3)")
  .watermark("rowtime","SOURCE_WATERMARK()")
  .build());
)
  • 支援 Changelog 資料流在 Table 和 DataStream 間相互轉換。
//DATASTREAM 轉 Table
StreamTableEnvironment.fromChangelogStream(DataStream<ROW>): Table
StreamTableEnvironment.fromChangelogStream(DataStream<ROW>,Schema): Table
//Table 轉 DATASTREAM
StreamTableEnvironment.toChangelogStream(Table): DataStream<ROW>
StreamTableEnvironment.toChangelogStream(Table,Schema): DataStream<ROW>  

四、Flink SQL 1.14 未來規劃

1.14 版本主要有以下幾點規劃:

  • 刪除 Legacy Planner:從 Flink 1.9 開始,在阿里貢獻了 Blink-Planner 之後,很多一些新的 Feature 已經基於此 Blink Planner 進行開發,以前舊的 Legacy Planner 會徹底刪除;
  • 完善 Window TVF:支援 session window,支援 window TVF 的 allow -lateness 等;
  • 提升 Schema Handling:全鏈路的 Schema 處理能力以及關鍵校驗的提升;
  • 增強 Flink CDC 支援:增強對上游 CDC 系統的整合能力,Flink SQL 內更多的運算元支援 CDC 資料流。

五、總結

本文詳細解讀了 Flink SQL 1.13 的核心功能和重要改進。

  • 支援 Window TVF;
  • 系統地解決時區和時間函式問題;
  • 提升 Hive 和 Flink 的相容性;
  • 改進 SQL Client;
  • 增強 DataStream 和 Table 的轉換。

同時還分享了社群關於 Flink SQL 1.14 的未來規劃,相信看完文章的同學可以對 Flink SQL 在這個版本中的變化有更多的瞭解,在實踐過程中大家可以多多關注這些新的改動和變化,感受它們所帶來的業務層面上的便捷。

原文連結
本文為阿里雲原創內容,未經允許不得轉載。