1. 程式人生 > >Flink事件時間處理和水印

Flink事件時間處理和水印

下面是原文:
如果您正在構建實時流媒體應用程式,則事件時間處理是您必須遲早使用的功能之一。由於在大多數現實世界的用例中,訊息到達無序,應該有一些方法,您建立的系統瞭解訊息可能遲到並且相應地處理的事實。在這篇博文中,我們將看到為什麼我們需要事件時間處理,以及我們如何在ApacheFlink中啟用它。

EventTime是事件在現實世界中發生的時間,ProcessingTime是Flink系統處理該事件的時間。要了解事件時間處理的重要性,我們首先要建立一個基於處理時間的系統,看看它的缺點。

我們將建立一個大小為10秒的SlidingWindow,每5秒滑動一次,在視窗結束時,系統將發出在此期間收到的訊息數。一旦瞭解EventTime處理如何與SlidingWindow相關的工作,那麼瞭解如何在TumblingWindow中工作也不難。所以讓我們開始吧。

基於處理時間的系統

對於這個例子,我們期望訊息具有格式值,timestamp,其中value是訊息,timestamp是在源生成此訊息的時間。由於我們正在構建基於處理時間的系統,因此以下程式碼忽略了時間戳部分。

瞭解訊息應包含生成時間的資訊是一個重要的方面。Flink或任何其他系統不是一個魔術盒,可以以某種方式自己形成這個。稍後我們將看到,事件時間處理提取此時間戳資訊以處理較晚的訊息。

val text = senv.socketTextStream("localhost", 9999)
val counts = text.map {(m: String) => (m.split
(",")(0), 1) } .keyBy(0) .timeWindow(Time.seconds(10), Time.seconds(5)) .sum(1) counts.print senv.execute("ProcessingTime processing example")

情況1:訊息到達不間斷

假設源分別在時間13秒,第13秒和第16秒產生型別a的三個訊息。(小時和分鐘不重要,因為視窗大小隻有10秒)。
這裡寫圖片描述
這些訊息將落入Windows中,如下所示。在第13秒產生的前兩個訊息將落入視窗1 [5s-15s]和window2 [10s-20s],第16個時間生成的第三個訊息將落入window2 [ 10s-20s]和window3 [15s-25s] ]。每個視窗發出的最終計數分別為(a,2),(a,3)和(a,1)。
這裡寫圖片描述


該輸出可以被認為是預期的行為。現在我們將看看當一個訊息到達系統的時候會發生什麼。

情況2:訊息到達延遲

現在假設其中一條訊息(在第13秒生成)到達延遲6秒(第19秒),可能是由於某些網路擁塞。你能猜測這個訊息會落入哪個視窗?這裡寫圖片描述
延遲的訊息落入視窗2和3,因為19在10-20和15-25之間。在window2中計算沒有任何問題(因為訊息應該落入該視窗),但是它影響了window1和window3的結果。我們現在將嘗試使用EventTime處理來解決這個問題。

基於EventTime的系統

要啟用EventTime處理,我們需要一個時間戳提取器,從訊息中提取事件時間資訊。請記住,訊息是格式值,時間戳。該extractTimestamp方法獲取時間戳部分並將其作為一個長期。現在忽略getCurrentWatermark方法,我們稍後再回來。

class TimestampExtractor extends AssignerWithPeriodicWatermarks[String] with Serializable {
  override def extractTimestamp(e: String, prevElementTimestamp: Long) = {
    e.split(",")(1).toLong 
  }
  override def getCurrentWatermark(): Watermark = { 
      new Watermark(System.currentTimeMillis)
  }
}

我們現在需要設定這個時間戳提取器,並將TimeCharactersistic設定為EventTime。其餘的程式碼與ProcessingTime的情況保持一致。

senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val text = senv.socketTextStream("localhost", 9999)
                .assignTimestampsAndWatermarks(new TimestampExtractor) 
val counts = text.map {(m: String) => (m.split(",")(0), 1) }
      .keyBy(0)
      .timeWindow(Time.seconds(10), Time.seconds(5))
      .sum(1)
counts.print
senv.execute("EventTime processing example")

執行上述程式碼的結果如下圖所示。這裡寫圖片描述
結果看起來更好,視窗2和3現在發出正確的結果,但是window1仍然是錯誤的。Flink沒有將延遲的訊息分配給視窗3,因為它現在檢查了訊息的事件時間,並且理解它不在該視窗中。但是為什麼沒有將訊息分配給視窗1?原因是在延遲的資訊到達系統時(第19秒),視窗1的評估已經完成了(第15秒)。現在讓我們嘗試通過使用水印來解決這個問題。

ps:請注意,在視窗2中,延遲的訊息仍然位於第19秒,而不是第13秒(事件時間)。該圖中的描述是故意表示視窗中的訊息不會根據事件時間進行排序。(這可能會在將來改變)

水印

水印是一個非常重要和有趣的想法,我將盡力給您一個簡短的概述。如果您有興趣瞭解更多資訊,您可以從Google 觀看這個令人敬畏的演講,還可以從dataArtisans那裡閱讀此部落格。水印本質上是一個時間戳。當Flink中的運算子接收到水印時,它明白(假設)它不會看到比該時間戳更早的訊息。因此,在“EventTime”中,水印也可以被認為是一種告訴Flink它有多遠的一種方式。

為了這個例子的目的,把它看作是一種告訴Flink一個訊息延遲多少的方式。在最後一次嘗試中,我們將水印設定為當前系統時間。因此,不要指望任何延遲的訊息。我們現在將水印設定為當前時間-5秒,這告訴Flink希望訊息最多有5s的延遲,這是因為每個視窗僅在水印通過時被評估。由於我們的水印是當前時間-5秒,所以第一個視窗[5s-15s]將僅在第20秒被評估。類似地,視窗[10s-20s]將在第25秒進行評估,依此類推。

override def getCurrentWatermark(): Watermark = { 
      new Watermark(System.currentTimeMillis - 5000)
  }

通常最好保持接收到的最大時間戳,並建立具有最大預期延遲的水印,而不是從當前系統時間減去。
進行上述更改後執行程式碼的結果是:這裡寫圖片描述
最後我們得到了正確的結果,所有這三個視窗現在都按照預期的方式發射計數,這是(a,2),(a,3)和(a,1)。

更新:我們也可以使用AllowedLateness功能設定訊息的最大允許時間來解決這個問題。

結論:
實時流處理系統的重要性日益增長,必須處理延遲的訊息是您構建的任何此類系統的一部分。在這篇博文中,我們看到到達的訊息遲到會影響系統的結果,以及如何使用ApacheFlink的事件時間處理功能來解決它​​們。謝謝你的閱讀!

相關推薦

Flink事件時間處理水印

下面是原文: 如果您正在構建實時流媒體應用程式,則事件時間處理是您必須遲早使用的功能之一。由於在大多數現實世界的用例中,訊息到達無序,應該有一些方法,您建立的系統瞭解訊息可能遲到並且相應地處理的事實。在這篇博文中,我們將看到為什麼我們需要事件時間處理,以及

Flink 事件時間的陷進及解決思路

0x1 摘要 大家都知道Flink引入了事件時間(eventTime)這個重要概念,來提升資料統計的準確性,但引入事件時間後在具體業務實現時存在一些問題必需要合理去解決,否則會造成非常嚴重的問題。 0x2 Flink 時間概念介紹 Flink 支援不同的時間概念,包括: Event Time :

flink 事件時間

當流媒體程式在處理時間執行時,所有基於時間的操作(如時間視窗)將使用執行相應操作員的機器的系統時鐘。例如,每小時處理時間視窗將包括在系統時鐘顯示整整一小時的時間和到達特定操作員的所有記錄。處理時間是最簡單的時間概念,不需要流和機器之間的協調。它提供了最佳的效能和最低的延遲。然

【Android View】Android中View對觸控事件處理傳遞dispatchTouchEvent、onInterceptTouchEvent

View中存在dispatchTouchEvent、onTouchEvent兩個方法。 而ViewGroup中則存在dispatchTouchEvent、onInterceptTouchEvent、onTouchEvent三個方法。 呼叫順序依次為dispatchTouch

7、Flink 流計算處理處理平臺

一、Flink 基本概念 Flink 是一個批處理和流處理結合的統一計算框架,其核心是一個提供了資料分發以及並行化計算的流資料處理引擎。它的最大亮點是流處理,是業界最頂級的開源流處理引擎。Flink 與 Storm 類似,屬於事件驅動型實時流系統。 所謂說事件驅動型指

Flink基礎:時間水印

​ 往期推薦: Flink基礎:入門介紹 Flink基礎:DataStream API Flink基礎:實時處理管道與ETL Flink深入淺出:資源管理 Flink深入淺出:部署模式 Flink深入淺出:記憶體模型 Flink深入淺出:JDBC Source從理論到實戰 Flink深入淺出:Sql Ga

JS綁定事件移除事件處理方法

nbsp 觸發 ner 它的 msu eve code ont 一個 addEventListener()與removeEventListener()用於處理指定和刪除事件處理程序操作。全部的DOM節點中都包括這兩種方法,而且它們都接受3個參數:要處理的事件名、作為事件

錯誤處理時間函數

中華 bsp com 所有 default 註意 def 時區 但是 錯誤處理和時間函數 一、錯誤處理 a) 錯誤報告級別 語法錯誤: error 會給一個致命錯誤 終止程序繼續執行 運行時錯誤: notice warning 運行代碼的時候錯了

[Python3]日期時間處理

num import time模塊 now() sda 年份 -s 好的 等價 概述 在python中, date、time、datetime類提供了一系列處理日期、時間和時間間隔的函數。 在Python裏我們大致可以把其實現日期時間類分為5個: date僅用

python 3 之日期與時間處理模塊(datedatetime)

python 時間 處理模塊 前言相關術語的解釋時間的表現形式time模塊datetime模塊時間格式碼總結前言 在開發工作中,我們經常需要用到日期與時間,如: 作為日誌信息的內容輸出計算某個功能的執行時間用日期命名一個日誌文件的名稱記錄或展示某文章的發布或修改時間其他Python中提供了多個用於

Netty事件監聽處理(上)

事件處理 多路復用 linu 自定義事件 性能 cdn 處理請求 fancybox 客戶 陪產假結束了,今天又開始正常上班了,正好趕上米粉節活動,又要忙上一陣了,米粉節活動時間為4.03 - 4.10,有不少優惠,感興趣的可以關註mi.com或小米商城app。 今天

Netty事件監聽處理(下)

關註 proto cte meta pro sse cti 讀取 線程模型 上一篇 介紹了事件監聽、責任鏈模型、socket接口和IO模型、線程模型等基本概念,以及Netty的整體結構,這篇就來說下Netty三大核心模塊之一:事件監聽和處理。 前面提到,Netty是一個N

2,StructuredStreaming的事件時間窗口操作

struct tps cdr sta lin apache mode second fmt 推薦閱讀:1,StructuredStreaming簡介 使用Structured Streaming基於事件時間的滑動窗口的聚合操作是很簡單的,很像分組聚合。在一個分組聚合操作中,

55、控制元件、事件處理佈局介紹

學習目標: 1、瞭解Swing提供的控制元件 2、掌握控制元件的不同的事件器 學習過程: 這裡我們就一一介紹一下swing中常用的一些控制元件。 一、常用控制元件例項: 先看看那下圖,初步瞭解一下每個控制元件的基本形式。 1、jLable

時間格式的處理資料填充分頁---laravel

時間格式文件地址:http://carbon.nesbot.com/docs/   這是些時間格式,只需要我們這麼做就可以 我們在模板層,找到對應的模型物件那裡進行處理就可以啦 2018-11-08 16:10:32 轉換 Nov 8, 2018 這樣就ok了  

VUE:事件處理表單輸入繫結

事件處理 <!DOCTYPE html> <html> <head> <meta charset="UTF-8"> <title>07_事件處理</title> </

VUE:事件處理表單輸入綁定

點擊 event http def nbsp mode 愛好 gpa method 事件處理 <!DOCTYPE html> <html> <head> <meta charset="UTF-8"

java中的時間計算格式處理方法

//一天秒數 public final static long MILLISECONDS_OF_DAY = 1000 * 60 * 60 * 24; public final static long MINUTES_OF_DAY = 1000 * 60 * 60 * 24; p

Android輸入系統(二)IMS的啟動過程輸入事件處理

本文首發於 劉望舒的部落格 地址:liuwangshu.cn/framework/i… 關聯絡列 解析WMS系列 深入理解JNI系列 輸入系統系列 前言 在上一篇文章中,我們學習了IMS的誕生(建立),IMS建立後還會進行啟動,這篇文章我們來學習IMS的啟動過程和輸入事件的處理。 1.IMS

PHP字串處理時間格式化整理

一、PHP字串相關 1、字串擷取(開始位置、長度) echo substr("Hello world",0,10)."<br>";      //Hello worlecho substr("Hello world",0,-1)."<br>";      //Hello worle