1. 程式人生 > 其它 >Flink 時間語義

Flink 時間語義

在流資料處理應用中,一個很重要很常見的的操作就是視窗計算。這裡的視窗指的是劃定的一段時間範圍,即時間窗,在這個範圍內的資料進行資料處理就是我們所說的視窗計算,因此視窗和時間是分不開的。

一、FLink 時間語義

時間從理論和哲學的角度解釋有一點玄妙,但對於普通人來說,時間其實是生活中在熟悉不過的一個概念。一年有多少天,一天有多少小時,大家都很清楚,時間就像緩緩流淌的河水。不疾不徐,不間斷的前進著,她是我們衡量事件發生和進展的標準尺度。時間本身就有著流的特性,她可以用拍判斷事件發生先後以及間隔;所以如果我們想要劃定視窗來收集資料,一般就要基於時間。對於批處理來說,這似乎沒什麼討論的必要,原因是資料已經收集完畢,想怎麼劃分視窗都可以,而對於流處理來說,想要處理更加實時,就必須對時間有更加精細的劃分。

1、Flink 中的時間語義簡介

對於一臺機器而言,“時間”自然就是指系統時間。但我們知道,Flink是一個分散式處理系統。分散式架構最大的特點,就是節點彼此獨立、互不影響,這帶來了更高的吞吐量和容錯性;但有利必有弊,最大的問題也來源於此。在分散式系統中,節點“各自為政”,是沒有統一時鐘的,資料和控制資訊都通過網路進行傳輸。比如現在有一個任務是視窗聚合,我們希望將每個小時的資料收集起來進行統計處理。而對於並行的視窗子任務,它們所在節點不同,系統時間也會有差異;當我們希望統計8點~9點的資料時,對並行任務來說其實並不是“同時”的,收集到的資料也會有誤差。那既然一個叢集中有JobManager作為管理者,是不是讓它統一向所有TaskManager傳送同步時鐘訊號就行了呢?這也是不行的。因為網路傳輸會有延遲,而且這延遲是不確定的,所以JobManager發出的同步訊號無法同時到達所有節點;想要擁有一個全域性統一的時鐘,在分散式系統裡是做不到的。另一個麻煩的問題是,在流式處理的過程中,資料是在不同的節點間不停流動的,這同樣也會有網路傳輸的延遲。這樣一來,當上下游任務需要跨節點傳輸資料時,它們對於“時間”的理解也會有所不同。例如,上游任務在8點59分59秒發出一條資料,到下游要做視窗計算時已經是9點零1秒了,那這條資料到底該不該被收到8點~9點的視窗呢?所以,當我們希望對資料按照時間視窗來進行收集計算時,“時間”到底以誰為標準就非常重要了。

 

 我們重新梳理一下流式資料處理的過程。如圖6-1所示,在事件發生之後,生成的資料被收集起來,首先進入分散式訊息佇列,然後被Flink系統中的Source運算元讀取消費,進而向下遊的轉換運算元(視窗運算元)傳遞,最終由視窗運算元進行計算處理。很明顯,這裡有兩個非常重要的時間點:一個是資料產生的時間,我們把它叫作“事件時間”(Event Time);另一個是資料真正被處理的時刻,叫作“處理時間”(Processing Time)。我們所定義的視窗操作,到底是以那種時間作為衡量標準,就是所謂的“時間語義”(Notions of Time)。由於分散式系統中網路傳輸的延遲和時鐘漂移,處理時間相對事件發生的時間會有所滯後

處理時間(Processing Time)

處理時間的概念非常簡單,就是指執行處理操作的機器的系統時間。如果我們以它作為衡量標準,那麼資料屬於哪個視窗就很明顯了:只看視窗任務處理這條資料時,當前的系統時間。比如之前舉的例子,資料8點59分59秒產生,而視窗計算時的時間是9點零1秒,那麼這條資料就屬於9點—10點的視窗;如果資料傳輸非常快,9點之前就到了視窗任務,那麼它就屬於8點—9點的視窗了。每個並行的視窗子任務,就只按照自己的系統時鐘劃分視窗。假如我們在早上8點10分啟動執行程式,那麼接下來一直到9點以前處理的所有資料,都屬於第一個視窗;9點之後、10點之前的所有資料就將屬於第二個視窗。這種方法非常簡單粗暴,不需要各個節點之間進行協調同步,也不需要考慮資料在流中的位置,簡單來說就是“我的地盤聽我的”。所以處理時間是最簡單的時間語義。

事件時間(Event Time)

事件時間,是指每個事件在對應的裝置上發生的時間,也就是資料生成的時間。資料一旦產生,這個時間自然就確定了,所以它可以作為一個屬性嵌入到資料中。這其實就是這條資料記錄的“時間戳”(Timestamp)。在事件時間語義下,我們對於時間的衡量,就不看任何機器的系統時間了,而是依賴於資料本身。打個比方,這相當於任務處理的時候自己本身是沒有時鐘的,所以只好來一個數據就問一下“現在幾點了”;而資料本身也沒有表,只有一個自帶的“出廠時間”,於是任務就基於這個時間來確定自己的時鐘。由於流處理中資料是源源不斷產生的,一般來說,先產生的資料也會先被處理,所以當任務不停地接到資料時,它們的時間戳也基本上是不斷增長的,就可以代表時間的推進。當然我們會發現,這裡有個前提,就是“先產生的資料先被處理”,這要求我們可以保證資料到達的順序。但是由於分散式系統中網路傳輸延遲的不確定性,實際應用中我們要面對的資料流往往是亂序的。在這種情況下,就不能簡單地把資料自帶的時間戳當作時鐘了,而需要用另外的標誌來表示事件時間進展,在Flink中把它叫作事件時間的“水位線”(Watermarks)。關於水位線的概念和用法,我們會稍後介紹。

2、兩種時間語義怎麼選擇

從《星球大戰》說起

為了更加清晰地說明兩種語義的區別,我們來舉一個非常經典的例子:電影《星球大戰》。

 

 《星球大戰》是一部經典的科幻電影,在1977拍攝上映之後就引起了巨大的反響,票房爆棚好評如潮。我們知道,但凡一部商業電影叫好又叫座,那十有八九都是要拍續集的——於是6年內又上映了兩部續集,這就是當時轟動一時的星戰三部曲。好IP總是要反覆拿來用,所以十幾年後又有了星戰前傳三部曲,到了2015年之後又以每年一部的頻率繼續拍攝後傳和外傳。而星戰系列的命名也很有趣,是按照故事時間線的發展來的:經典三部曲是系列的四、五、六部,之後是前傳一、二、三,2015年開始的後傳就從第七部算起了。如圖6-2所示,我們會發現,看電影其實就是處理影片中資料的過程,所以影片的上映時間就相當於“處理時間”;而影片的資料就是所描述的故事,它所發生的背景時間就相當於“事件時間”。現在我們考慮一下,作為沒有看過星戰的新影迷,如果想要入坑一覽,該選擇什麼樣的觀影順序呢?這就要看我們具體的需求了:如果你是劇情黨,重點想看一個完整的故事,那最好的選擇無疑就是按照系列的編號,沿著故事發展的時間線來看;而如果你是特效黨,更想體驗炫目的視覺效果和時代技術的發展,那就按照電影的拍攝順序來觀看,不過劇情可能就需要多腦補一下了。所以,兩種時間語義都有各自的用途,適用於不同的場景。

資料處理系統中的時間語義

在計算機系統中,考慮資料處理的“時代變化”是沒什麼意義的,我們更關心的,顯然是資料本身產生的時間。比如我們計算網站的PV、UV等指標,要統計每天的訪問量。如果某個使用者在23點59分59秒有一次訪問,但我們的任務處理這條資料的時間已經是第二天0點0分01秒了;那麼這條資料,是應該算作當天的訪問,還是第二天的訪問呢?很明顯,統計使用者行為,需要考慮行為本身發生的時間,所以我們應該把這條資料統計入當天的訪問量。這時我們用到的視窗,就是以事件時間作為劃分標準的,跟處理時間無關。所以在實際應用中,事件時間語義會更為常見。一般情況下,業務日誌資料中都會記錄資料生成的時間戳(timestamp),它就可以作為事件時間的判斷基礎。

兩種時間語義的對比

實際應用中,資料產生的時間和處理的時間可能是完全不同的。很長時間收集起來的資料,處理或許只要一瞬間;也有可能資料量過大、處理能力不足,短時間堆了大量資料處理不完,產生“背壓”(back pressure)。通常來說,處理時間是我們計算效率的衡量標準,而事件時間會更符合我們的業務計算邏輯。所以更多時候我們使用事件時間;不過處理時間也不是一無是處。對於處理時間而言,由於沒有任何附加考慮,資料一來就直接處理,因此這種方式可以讓我們的流處理延遲降到最低,效率達到最高。但是我們前面提到過,在分散式環境中,處理時間其實是不確定的,各個並行任務時鐘不統一;而且由於網路延遲,導致資料到達各個運算元任務的時間有快有慢,對於視窗操作就可能收集不到正確的資料了,資料處理的順序也會被打亂。這就會影響到計算結果的正確性。所以處理時間語義,一般用在對實時性要求極高、而對計算準確性要求不太高的場景。而在事件時間語義下,水位線成為了時鐘,可以統一控制時間的進度。這就保證了我們總可以將資料劃分到正確的視窗中,比如8點59分59秒產生的資料,無論網路傳輸的延遲是多少,它永遠屬於8點~9點的視窗,不會錯分。但我們知道資料還可能是亂序的,要想讓視窗正確地收集到所有資料,就必須等這些錯亂的資料都到齊,這就需要一定的等待時間。所以整體上看,事件時間語義是以一定延遲為代價,換來了處理結果的正確性。由於網路延遲一般只有毫秒級,所以即使是事件時間語義,同樣可以完成低延遲實時流處理的任務。另外,除了事件時間和處理時間,Flink還有一個“攝入時間”(Ingestion Time)的概念,它是指資料進入Flink資料流的時間,也就是Source運算元讀入資料的時間。攝入時間相當於是事件時間和處理時間的一箇中和,它是把Source任務的處理時間,當作了資料的產生時間新增到資料裡。這樣一來,水位線(watermark)也就基於這個時間直接生成,不需要單獨指定了。這種時間語義可以保證比較好的正確性,同時又不會引入太大的延遲。它的具體行為跟事件時間非常像,可以當作特殊的事件時間來處理。在Flink中,由於處理時間比較簡單,早期版本預設的時間語義是處理時間;而考慮到事件時間在實際應用中更為廣泛,從1.12版本開始,Flink已經將事件時間作為了預設的時間語義.