深入理解Flink Streaming SQL
序言
時效性提升資料的價值,所以Flink這樣的流式(Streaming)計算系統應用得越來越廣泛。
廣大的普通使用者決定一個產品的介面和介面。
ETL開發者需要簡單而有效的開發工具,從而把更多時間花在理業務和對口徑上。
因此流式計算系統都趨同以SQL作為唯一開發語言,讓使用者以Table形式操作Stream。
程式開發三部曲:First make it work, then make it right, and, finally, make it fast
流計算開發者面對的現狀及趨勢:
第一步,讓程式執行起來。
開發者能用SQL方便地表達問題。
開發者能通過任務管理系統一體化地管理任務,如:開發,上線,調優,監控和排查任務。
第二步,讓程式執行正確。
簡單資料清洗之外的流計算開發需求通常會涉及到Streaming SQL的兩個核心擴充套件:Window 和 Emit。
否則無法在資料時效性和資料準確性上做適合各個業務場景的決策和折中。
第三步,讓程式執行越來越快。
蘋果每年都會發布新手機:
當前,流計算系統每年也會有很大的效能提升和功能擴充套件,但想要深入調優及排錯,
還是要學習分散式系統的各個元件及原理,各種運算元實現方法,效能優化技術等知識。
以後,隨著系統的進一步成熟和完善,開發者在效能優化上的負擔會越來越低,
無需瞭解底層技術實現細節和手動配置各種引數,就能享受效能和穩定性的逐步提升。
分散式系統的一致性和可用性是一對矛盾。
流計算系統的資料準確性和資料時效性也是一對矛盾。
應用開發者都需要認識到這些矛盾,並且知道自己在什麼場景下該作何種取捨。
本文希望通過剖析Flink Streaming SQL的三個具體例子:Union,Group By 和 Join ,
來依次闡述流式計算模型的核心概念: What, Where, When, How 。
以便開發者加深對Streaming SQL的Window 和 Emit語義的理解,
從而能在資料準確性和資料時效性上做適合業務場景的折中和取捨。
也順帶介紹Streaming SQL的底層實現,以便於SQL任務的開發和調優。
UNION
通過這個例子來闡述Streaming SQL的底層實現和優化手段:Logical Plan Optimization 和 Operator Chaining。
例子
改編自Flink StreamSQLExample 。只在最外層加了一個Filter,以便觸發Filter下推及合併。
Source
SQL
Sink
執行結果
轉換Table為Stream
Flink 會把基於Table的Streaming SQL轉為基於Stream的底層運算元,並同時完成Logical Plan及Operator Chaining等優化
轉為邏輯計劃(Logical Plan)
上述UNION ALL SQL依據Relational Algebra轉換為下面的邏輯計劃:
SQL欄位與邏輯計劃有如下的對應關係:
優化Logical Plan
理論基礎
冪等
數學: 19 * 10 * 1 * 1 = 19 * 10 = 190
SQL: SELECT * FROM (SELECT user, product FROM OrderA) = SELECT user, product FROM OrderA
交換律
數學:10 * 19 = 19 * 10 = 190
SQL: tableA UNION ALL tableB = tableB UNION ALL tableA
結合律
數學:
(1900 * 0.5)* 0.2 = 1900 * (0.5 * 0.2) = 190
1900 * (1.0 + 0.01) = 1900 * 1.0 + 1900 * 0.01 = 1919
SQL:
SELECT * FROM (SELECT user, amount FROM OrderA) WHERE amount > 2
SELECT * FROM (SELECT user, amount FROM OrderA WHERE amount > 2)
優化過程
Flink的邏輯計劃優化規則清單請見: FlinkRuleSets
此Union All例子根據冪等,交換律和結合律來完成以下三步優化:
消除冗餘的Project
利用冪等特性,消除冗餘的Project。
下推Filter
利用交換率和結合律特性,下推Filter。
合併Filter
利用結合律,合併Filter。
轉為物理計劃(Physical Plan)
轉換後的Flink的物理執行計劃如下:
有Physical Plan優化這一步驟,但對以上例子沒有效果,所以忽略。
這樣,加上Source和Sink,產生了如下的Stream Graph:
優化Stream Graph
通過Task Chaining來減少上下游運算元的資料傳輸消耗,從而提高效能。
Chaining判斷條件
Chaining結果
按深度優先的順序遍歷Stream Graph,最終產生5個Task任務。
GROUP BY
將以滾動視窗的GROUP BY來闡述Streaming SQL裡的Window和Emit語義,
及其背後的Streaming的Where(Window)和When(Watermark和Trigger)的概念及關係。
例子
Source
Water Mark
簡單地把最新的EventTime減去Offset。
SQL
按使用者加滾動視窗進行Group By。
Sink
轉換Table為Stream
因為Union All例子比較詳細地闡述了轉換規則,此處只討論特殊之處。
轉為邏輯計劃(Logical Plan)
優化Logical Plan
GROUP BY優化:把{“User + Window” -> SUM} 轉為 {User -> {Window -> SUM}}。
新的資料結構確保同一User下所有Window都會被分配到同一個Operator,以便實現SessionWindow的Merge功能。
轉為物理計劃(Physical Plan)
優化Stream Graph
經過Task Chaining優化後,最終生成3個Task。
Streaming各基本概念之間的聯絡
此處希望以圖表的形式闡述各個概念之間的關係。
Window和EventTime
Flink支援三種Window型別: Tumbling Windows , Sliding Windows 和 Session Windows
每個事件的EventTime決定事件會落到哪些TimeWindow。
但只有Window的第一個資料來到時,Window才會被真正建立。
Window和WaterMark
可以設定TimeWindow的AllowedLateness,從而使Window可以處理延時資料。
只有當WaterMark超過TimeWindow.end + AllowedLateness時,Window才會被銷燬。
TimeWindow,EventTime,ProcessTime 和 Watermark
我們以WaterMark的推進圖來闡述這四者之間的關係。
Window為TumbleWindow,視窗大小為1小時,允許的資料延遲為1小時。
WaterMark和EventTime:
新資料的最新Eventime推進WaterMark。
TimeWindow的生命週期:
以下三條資料的EventTime決定TimeWindow的狀態轉換。
資料1的Eventtime屬於Window[10:00, 11,00),因為Window不存在,所以建立此Window。
資料2的Eventime推進WaterMark超過11:00(Window.end),所以觸發Pass End。
資料3的Eventime推進WaterMark超過12:00(Window.end + allowedLateness), 所以關閉此Window。
TimeWindow的結果輸出:
使用者可以通過Trigger來控制視窗結果的輸出,按視窗的狀態型別有以下三種Trigger。
Flink的Streaming SQL目前只支援PassEnd Trigger,且預設AllowedLateness = 0。
如果觸發頻率是Repeated,比如:每分鐘, 往下游輸出一次。那麼這個時間只能是ProcessTime。
因為WarkMark在不同場景下會有不同推進速度,比如處理一小時的資料,
可能只需十分鐘(重跑),一個小時(正常執行)或 大於1小時(積壓)。
執行結果
允許資料亂序是分散式系統能夠併發處理訊息的前提。
當前這個例子,資料如果亂序可以產生不同的輸出結果。
資料有序
SUM運算元接收到的資料
資料的Eventtime按升序排列。
WarterMark推進圖
每條新資料都能推進Watermark。
結果輸出
所有資料都被處理,沒有資料被丟棄。
資料亂序
SUM運算元接收到的資料
第四條事件延時到來。
WarterMark推進圖
延遲的資料不會推進WaterMark,且被丟棄。
輸出結果
沒有統計因延遲被丟棄的第四條事件。
JOIN
將通過此例子來闡述Streaming的Retraction語義。
例子
Source
SQL
廣告的展現LEFT JOIN 廣告的點選來更新狀態:showed 或 clicked。
Sink
LEFT JOIN 可能會發送多條資料到下游。
因此必須轉為RetractionStream,讓下游運算元有機會能撤銷前次輸出,從而只產生一條最終結果。
轉換Table為Stream
RetractionStream沒有引入特殊變化。
轉為邏輯計劃(Logical Plan)
優化Logical Plan
轉為物理計劃(Physical Plan)
優化Stream Graph
執行結果
結果資料的首個欄位為標誌位,True為正常資料,False為Retract資料。
RetractJoin的執行邏輯請見:NonWindowOuterJoin
ImpressionId = 1這條資料的ReactJoin執行過程。
1: Left流的Show訊息先到: Show("1", "show", "2018-10-10 10:10:10")
因為之前沒有輸出,所以無需Retrcact。
只輸出: (true, 1,2018-10-10 10:10:10,showed)
2: Right流的Click訊息後到:Click("1", "click", "2018-10-10 10:13:11")
因為之前已輸出過結果,所以需要Retract,輸出:
(false, 1,2018-10-10 10:10:10,showed)
然後再輸出新結果,
(true, 1,2018-10-10 10:10:10,clicked)
如上可知,Retraction流相當於把一條UPDATE訊息分別拆成一條DELETE和一條INSERT訊息。
Retraction Stream
雖然Retraction機制最多增加一倍的資料傳輸量,但能降低下游運算元的儲存負擔和撤銷實現難度。
傳遞
我們在Left Join的輸出流後加一個GROUP BY,以觀察Retraction流的後續運算元的輸出。
可能得到以下的GROUP BY輸出:
由此可見,Retraction具有傳遞性,RetractStream的後續的Stream也會是RetractionStream。
終止
最終需要支援Retraction的Sink來終止RetractionStream,比如:
最終輸出retractedResults:
儲存
只有外部儲存支援UPDATE或DELETE操作時,才能實現RetractionSink。
常見的KV儲存和資料庫,如HBase,Mysql都可實現RetractionSink。
後續程式總能從這些儲存中讀取最新資料,上游是否是Retraction流對使用者是透明的。
常見的訊息佇列,如Kafka,只支援APPEND操作,則不能實現RetractionSink。
後續程式從這些訊息佇列可能會讀到重複資料,因此使用者需要在後續程式中處理重複資料。
總結
Flink Streaming SQL的實現從上到下共有三層:
1:Streaming SQL
2:Streaming 和 Window
3:Distributed Snapshots
其中“Streaming Data Model” 和 “Distributed Snapshot” 是Flink這個分散式流計算系統的核心架構設計。
“Streaming Data Model”的What, Where, When, How 明確了流計算系統的表達能力及預期應用場景。
“Distributed Snapshots”針對預期的應用場景在資料準確性,系統穩定性和執行效能上做了合適的折中。
本文通過例項闡述了流計算開發者需要了解的最上面兩層的概念和原理,
以便流計算開發者能在資料準確性和資料時效性上做適合業務場景的折中和取捨。