1. 程式人生 > >Flink WaterMark機制

Flink WaterMark機制

如果您正在構建實時流媒體應用程式,則事件時間處理是您必須遲早使用的功能之一。由於在大多數現實世界的用例中,訊息到達無序,應該有一些方法,您建立的系統瞭解訊息可能遲到並且相應地處理的事實。在這篇博文中,我們將看到為什麼我們需要事件時間處理,以及我們如何在ApacheFlink中啟用它。

EventTime是事件在現實世界中發生的時間,ProcessingTime是Flink系統處理該事件的時間。要了解事件時間處理的重要性,我們首先要建立一個基於處理時間的系統,看看它的缺點。

我們將建立一個大小為10秒的SlidingWindow,每5秒滑動一次,在視窗結束時,系統將發出在此期間收到的訊息數。一旦瞭解EventTime處理如何與SlidingWindow相關的工作,那麼瞭解如何在TumblingWindow中工作也不難。所以讓我們開始吧。

1.基於處理時間的系統

對於這個例子,我們期望訊息具有格式值,timestamp,其中value是訊息,timestamp是在源生成此訊息的時間。由於我們正在構建基於處理時間的系統,因此以下程式碼忽略了時間戳部分。

瞭解訊息應包含生成時間的資訊是一個重要的方面。Flink或任何其他系統不是一個魔術盒,可以以某種方式自己形成這個。稍後我們將看到,事件時間處理提取此時間戳資訊以處理較晚的訊息。

val text = senv.socketTextStream("localhost", 9999)
val counts = text.map {(m: String) => (m.split(
",")(0), 1) } .keyBy(0) .timeWindow(Time.seconds(10), Time.seconds(5)) .sum(1) counts.print senv.execute("ProcessingTime processing example")

情況1:訊息到達不間斷

假設源分別在時間13秒,第13秒和第16秒產生型別a的三個訊息。(小時和分鐘不重要,因為視窗大小隻有10秒)。

在這裡插入圖片描述 這些訊息將落入Windows中,如下所示。在第13秒產生的前兩個訊息將落入視窗1 [5s-15s]和window2 [10s-20s],第16個時間生成的第三個訊息將落入window2 [ 10s-20s]和window3 [15s-25s] ]。每個視窗發出的最終計數分別為(a,2),(a,3)和(a,1)。 在這裡插入圖片描述

該輸出可以被認為是預期的行為。現在我們將看看當一個訊息到達系統的時候會發生什麼。

情況2:訊息到達延遲

現在假設其中一條訊息(在第13秒生成)到達延遲6秒(第19秒),可能是由於某些網路擁塞。你能猜測這個訊息會落入哪個視窗? 在這裡插入圖片描述 延遲的訊息落入視窗2和3,因為19在10-20和15-25之間。在window2中計算沒有任何問題(因為訊息應該落入該視窗),但是它影響了window1和window3的結果。我們現在將嘗試使用EventTime處理來解決這個問題。

2.基於EventTime的系統

要啟用EventTime處理,我們需要一個時間戳提取器,從訊息中提取事件時間資訊。請記住,訊息是格式值,時間戳。該extractTimestamp方法獲取時間戳部分並將其作為一個長期。現在忽略getCurrentWatermark方法,我們稍後再回來。

class TimestampExtractor extends AssignerWithPeriodicWatermarks[String] with Serializable {
  override def extractTimestamp(e: String, prevElementTimestamp: Long) = {
    e.split(",")(1).toLong 
  }
  override def getCurrentWatermark(): Watermark = { 
      new Watermark(System.currentTimeMillis)
  }
}

我們現在需要設定這個時間戳提取器,並將TimeCharactersistic設定為EventTime。其餘的程式碼與ProcessingTime的情況保持一致。

senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val text = senv.socketTextStream("localhost", 9999)
                .assignTimestampsAndWatermarks(new TimestampExtractor) 
val counts = text.map {(m: String) => (m.split(",")(0), 1) }
      .keyBy(0)
      .timeWindow(Time.seconds(10), Time.seconds(5))
      .sum(1)
counts.print
senv.execute("EventTime processing example")

執行上述程式碼的結果如下圖所示。 在這裡插入圖片描述 結果看起來更好,視窗2和3現在發出正確的結果,但是window1仍然是錯誤的。Flink沒有將延遲的訊息分配給視窗3,因為它現在檢查了訊息的事件時間,並且理解它不在該視窗中。但是為什麼沒有將訊息分配給視窗1?原因是在延遲的資訊到達系統時(第19秒),視窗1的評估已經完成了(第15秒)。現在讓我們嘗試通過使用水印來解決這個問題。

ps:請注意,在視窗2中,延遲的訊息仍然位於第19秒,而不是第13秒(事件時間)。該圖中的描述是故意表示視窗中的訊息不會根據事件時間進行排序。(這可能會在將來改變)

3.水印

水印是一個非常重要和有趣的想法,我將盡力給您一個簡短的概述。如果您有興趣瞭解更多資訊,您可以從Google 觀看這個令人敬畏的演講,還可以從dataArtisans那裡閱讀此部落格。水印本質上是一個時間戳。當Flink中的運算子接收到水印時,它明白(假設)它不會看到比該時間戳更早的訊息。因此,在“EventTime”中,水印也可以被認為是一種告訴Flink它有多遠的一種方式。

為了這個例子的目的,把它看作是一種告訴Flink一個訊息延遲多少的方式。在最後一次嘗試中,我們將水印設定為當前系統時間。因此,不要指望任何延遲的訊息。我們現在將水印設定為當前時間-5秒,這告訴Flink希望訊息最多有5s的延遲,這是因為每個視窗僅在水印通過時被評估。由於我們的水印是當前時間-5秒,所以第一個視窗[5s-15s]將僅在第20秒被評估。類似地,視窗[10s-20s]將在第25秒進行評估,依此類推。

override def getCurrentWatermark(): Watermark = { 
      new Watermark(System.currentTimeMillis - 5000)
  }

通常最好保持接收到的最大時間戳,並建立具有最大預期延遲的水印,而不是從當前系統時間減去。 進行上述更改後執行程式碼的結果是: 在這裡插入圖片描述 最後我們得到了正確的結果,所有這三個視窗現在都按照預期的方式發射計數,這是(a,2),(a,3)和(a,1)。

更新:我們也可以使用AllowedLateness功能設定訊息的最大允許時間來解決這個問題。

結論:

實時流處理系統的重要性日益增長,必須處理延遲的訊息是您構建的任何此類系統的一部分。在這篇博文中,我們看到到達的訊息遲到會影響系統的結果,以及如何使用ApacheFlink的事件時間處理功能來解決它​​們。謝謝你的閱讀!

參考: