1. 程式人生 > 其它 >你應該瞭解的Watermark

你應該瞭解的Watermark

一、時間語義

Flink在流處理中提供了不同的時間語義支援,其中有兩種核心的時間語義:ProcessingTime與EventTime。

ProcessingTime表示的是處理時間,在處理時間流處理中,所有涉及的時間計算都是以本地機器的時間為準,例如每5分鐘的一個時間視窗操作,0-5分鐘的視窗觸發需要滿足本地機器到達5分鐘後然後觸發視窗函式操作這5分鐘內的資料。由於操作的資料是這段時間到達的資料,所以其處理的資料量容易受上游處理的速度影響,處理的快匯聚的資料就多,所以其結果不具有確定性,特別是在流重放的情況下;

EventTime表示的是事件時間,也就是資料本身含有的時間屬性,例如點選頁面事件的點選時間,那麼在事件時間的流處理中,事件時間就是表示當前的時間進度,而不是本地機器時間。在分配時間視窗的資料時會按照資料的時間屬性來分配,由於資料時間屬性不會改變,因此在資料重放過程中,分配在視窗的資料也不會改變,因此得到一個比較確定的結果,其目標也就是儘可能的還原資料場景。

二、EventTime與Watermark

在ProcessingTime流處理中時間視窗的觸發是當本地機器時間達到視窗結束時自動觸發,而在EventTime流處理中時間視窗的觸發是當前Watermark大於等於視窗的結束時間時觸發。

何為Watermark?Watermark也稱之為水位,是用來衡量在EventTime語義的流處理中時間進度,也就是當前流處理達到的時間點,其本質上就是一個時間戳,系統會認為在水位以下(事件時間小於watermark值)的資料已經全部到達。

事件中已經包含時間屬性,為什麼還需要Watermark?在流處理中不同節點處理資料的速度不一致,那麼就會導致下游節點獲取到的資料在時間上是一個亂序的資料序列,我們希望程式能夠處理所屬視窗時間範圍內的資料,然而資料亂序會導致資料延時到達,那麼在程式處理中需要等待延時資料的到達,但是程式不可能無限制等待,所以引入Watermark機制,使用Watermark來判斷是否應該觸發視窗函式。

Watermark如何產生?第一種定時產生,需要依託事件時間屬性,也就是從事件中提取得到的,但是由於資料亂序,需要設定允許的延時時間,例如事件時間是10,允許的延時時間是2,那麼此時得到的watermark值就是8;第二種遇到特定事件時產生watermark, 特定事件由使用者指定,當在流處理中遇到一條特殊標記則產生watermark。在實際使用中第一種更為常見。

三、Watermark在流中的流轉

在Flink中認為Watermark是一種特殊的資料,會隨著正常資料在任務中一起向下流動,它永遠不會超越業務資料位置。Watermark每到達一個處理節點都會生成一個新的Watermark向下流動,由於在任務中可能會需要對資料進行重分佈,例如keyBy操作,會導致某個Task輸入是由上游多個Task的輸出,因此Flink對watermark的流轉也制定了特定的規則:

  • 單輸入取最大值

  • 多輸入取最小值

單輸入指的是Task的資料流入是由上游一個Task的輸出,例如在dataStream.map.filter 操作流中並且其並行度一致,那麼filter的每一個task都是與map中每一個Task一一對應,因此Watermark採用forward形式向下流動,並且是單調遞增的,即下一次傳送的Watermark只能比上一次的值大;

多輸入指的是Task的資料流入是上游多個Task的輸出,例如dataStream.map.keyBy.window 操作流中,keyBy操作會導致資料根據key流入下游特定的task中,那麼對於一個window task的流入就是由上游多個Task輸出,對於這種情況Watermark採用broadcast形式向下流動即上游每一個map task會給下游所有的window task傳送Watermark, 由於每一個map task的處理速度不一樣,那麼就會導致window task收到watermark也是在不同的時刻,對於這種情況Flink是如何做到取最小值呢?

在Window Task會做一個InputChannelStatus陣列的初始化,該陣列對應上游operator tasks,圖例中陣列大小為2,數組裡面每一個InputChannelStatus都會有一個初始化為Long.MIN_VALUE的watermark值,也就是有可能在監控頁面看到的-9223372036854775808值,另外還包含StreamStatus(表示流狀態的)與Boolean型別的isWatermarkAligned,isWatermarkAligned表示的是否對齊與StreamStatus有關,如果流的狀態是非啟用狀態的在後續watermark取值上忽略對應InputChannelStatus,在這裡我們只關心watermark大小取值,忽略其他因素影響。除了InputChannelStatus陣列還有一個lastOutputWatermark,表示最近傳送的watermark值,也可以理解為window task生成的最新的watermark值,初始值為Long.MIN_VALUE

看下其處理流程:
window1收到的watermark值的順序是:w10(map0)->w9(map1)->w12(map0)->w10(map1)
a. w10(map0), w10大於InputChannelStatus[0]的watermakr值,將w10覆蓋其值,然後從InputChannelStatus數組裡面找到最小的watermark,此時最小的watermark是InputChannelStatus[1]的Long.MIN_VALUE值,判斷得到的最小值不大於lastOutputWatermark,不會產生新的watermark;

b. w9(map1),w9大於InputChannelStatus[1]的watermakr值,將w9覆蓋其值,然後從InputChannelStatus數組裡面找到最小的watermark,此時最小的watermark是InputChannelStatus[1]的9,大於值為Long.MIN_VALUE的lastOutputWatermark,那麼產生新的watermark 9;

c. w12(map0),w12大於InputChannelStatus[0]的watermakr值,將w12覆蓋其值,然後從InputChannelStatus數組裡面找到最小的watermark,此時最小的watermark是InputChannelStatus[1]的9,不大於值為9的lastOutputWatermark,不會產生新的watermark;

d. w10(map1),w10大於InputChannelStatus[1]的watermakr值,將w10覆蓋其值,然後從InputChannelStatus數組裡面找到最小的watermark,此時最小的watermark是InputChannelStatus[1]的10,大於值為9的lastOutputWatermark,會產生新的watermark 10;

四、Watermark使用

Watermark分配方式:

  1. 在source端分配,通過在SourceFunction.run方法中呼叫SourceContext的collectWithTimestamp 傳送一條帶有時間屬性的資料,呼叫SourceContext的emitWatermark傳送一條Watermark資料,至於什麼時候呼叫由使用者自行決定,也就是說需要使用者自定義實現SourceFunction介面;

  2. 通過assignTimestampsAndWatermarks方式分配,可以在流處理任意位置指定,同樣有兩種方式AssignerWithPeriodicWatermarks與AssignerWithPunctuatedWatermarks,AssignerWithPunctuatedWatermarks表示由使用者指定根據事件生成;AssignerWithPeriodicWatermarks表示週期性的生成方式,在這種方式也是常用的使用方式,通過使用 BoundedOutOfOrdernessTimestampExtractor指定最大延時與事件時間提取;

這兩種方式有各自優缺點,在source端指定可以更早的去處理亂序資料,通過assignTimestampsAndWatermarks可以在過濾無效資料之後來指定,以免無效資料對watermark造成影響。

Watermark 觸發動作:

    1. 會迴圈遍歷事件時間的優先順序佇列,如果取出來的時間小於Watermark則觸發相應的動作,例如視窗函式操作或者使用者註冊的事件時間定時器

    2. 在ProcessFunction可獲取到到當前的Watermark值,可根據Watermakr來做一些處理。