Flink如何巧用WaterMark機制解決亂序問題
問:資料工程師最期望資料怎麼來?
答:按順序來。
MapReduce當初能用起來,就是因為Map階段對所有資料都進行排序了,後面的Reduce階段就可以直接用排序好的資料了。
批處理的時候因為資料已經落地了,咱可以慢慢排序。但是流式資料都是一條一條過來的,這個時候資料到達的時間和出發時的順序不一致會導致非常多的問題,這該咋整呢?
Sparkstreaming對亂序支援很差,因為它其實是“微批”,不是真正的流。加州伯克利大學AMP實驗室設計Spark的時候,想的就是弄一個更快的計算引擎,壓根就沒打算做成來一條處理一條的流式資料處理。所以對於一些亂序資料根本就不太關心,所以導致Sparkstreaming不能或者不太能支援亂序資料的處理。
但是Flink不行啊,資料一條一條的過來,然後進行視窗處理,亂序會導致各種統計問題,這就得必須解決了。
什麼是亂序
一條資料在Flink裡,有三個時間:
-
Event Time:事件產生的時間;
-
Ingestion Time:事件進入Flink的時間;
-
Window Processing Time:事件被處理的時間。
當資料一條一條規規矩矩的按流程傳送,MQ傳輸,Flink接受然後處理,這個時候,就是有序的資料。
當出現各種異常,有些資料延遲了,排在後面的資料跑前面去了,這就出現了亂序。
請思考一下,我們應該以哪個時間戳判定亂序呢?
Flink的WaterMark機制
亂序會導致各種統計上的問題。比如一個Time Window本應該計算1、2、3,結果3遲到了,那這個視窗統計就丟資料了。這可太坑了。
為了解決這個問題,Flink設定了一個三個機制來解決這個問題:
-
WaterMark--水位線,;
-
allowLateNess--資料遲到時間;
-
sideOutPut--超長遲到資料收集;
水位線的設定很簡單(系統時間為準):
override def getCurrentWatermark(): Watermark = { new Watermark(System.currentTimeMillis - 5000)
設定Watermark為-5秒。但是怎麼理解這個-5秒的水位線呢?
經常戶外徒步的同學應該知道一個徒步小隊通常會有一正兩副領隊,隊首隊尾各一個副隊,正隊長在隊伍中穿插協調。
隊尾的領隊叫後隊領隊,後隊領隊要保證所有隊員都在前面,也就是說後隊領隊是整個隊伍的隊尾,當收隊的時候,看見後隊領隊,那就說明整個隊伍都已經完全到達了。
這個Watermark就相當於給整個資料流設定一個後隊領隊。但是視窗是不知道具體要來幾個數的,所以只能設定一個時間上的限制,以此來推測當前視窗最後一條資料是否已經到達。假設視窗大小為10秒,Watermark為-5秒,那麼他會做以下事情:
-
每來一條資料,取當前視窗內所有資料的最大時間戳;
-
用最大時間戳扣減Watermark後看看是不是符合視窗關閉條件;
-
如果不符合,則繼續進資料;
-
如果符合,則關閉視窗開始計算。
你看,多像戶外徒步?
-
每來一個人,就問問出發時是幾號,然後確認所有已到隊員最大的號碼;
-
用最大的號碼對比一下後隊領隊的號碼;
-
如果比後隊領隊的號碼小,就不收隊;
-
如果號碼大於等於後隊領隊號碼,就收隊。
遲到的資料
當然啊,即便是用了Watermark機制,依然還會存在遲到的資料。就像戶外徒步一樣,有人走錯路然後又趕上來。後隊領隊分明沒超過任何一個隊員,但是還是有隊員落在後面了。
所以Flink還增設了三種應對方式:
-
allowLateNess--對於遲到一小會的資料,設定一個允許遲到時間;
-
sideOutPut--對於超過允許遲到時間的資料,全部收集起來,後續再處理;
-
如果都不處理,Flink就預設自動丟棄。
也就是說,在watermark機制下,視窗雖然到了關閉時間,但是如果你設定了allowLateNess=10秒,那這個視窗還會再等10秒,看看是否還有他那個小隊的資料,10秒後窗口關閉,開始計算。
如果等了10秒還沒等到,11秒的時候,原本屬於該視窗的資料才姍姍來遲,那麼sideOutPut會把資料收集起來,放到側輸出流,等待後續處理。這個資料肯定就不會在當前視窗計算進去了。