1. 程式人生 > 其它 >Fl9nk 水位線簡介

Fl9nk 水位線簡介

什麼是水位線

在事件時間語義下,我們不依賴系統時間,而是基於資料自帶的時間戳去定義了一個時鐘,用來表示當前時間的進展。於是每個並行子任務都會有一個自己的邏輯時鐘,它的前進是靠資料的時間戳來驅動的。但在分散式系統中,這種驅動方式又會有一些問題。因為資料本身在處理轉換的過程中會變化,如果遇到視窗聚合這樣的操作,其實是要攢一批資料才會輸出一個結果,那麼下游的資料就會變少,時間進度的控制就不夠精細了。另外,資料向下遊任務傳遞時,一般只能傳輸給一個子任務(除廣播外),這樣其他的並行子任務的時鐘就無法推進了。例如一個時間戳為9整的資料到來,當前任務的時鐘就已經是9點了;處理完當前資料要傳送到下游,如果下游任務是一個視窗計算,並行度為3,那麼接收到這個資料的子任務,時鐘也會進展到9點,9點結束的視窗就可以關閉進行計算了;而另外兩個並行子任務則時間沒有變化,不能進行視窗計算。所以我們應該把時鐘也以資料的形式傳遞出去,告訴下游任務當前時間的進展;而且這個時鐘的傳遞不會因為視窗聚合之類的運算而停滯。一種簡單的想法是,在資料流中加入一個時鐘標記,記錄當前的事件時間;這個標記可以直接廣播到下游,當下遊任務收到這個標記,就可以更新自己的時鐘了。由於類似於水流中用來做標誌的記號,在Flink中,這種用來衡量事件時間(Event Time)進展的標記,就被稱作“水位線”(Watermark)。具體實現上,水位線可以看作一條特殊的資料記錄,它是插入到資料流中的一個標記點,主要內容就是一個時間戳,用來指示當前的事件時間。而它插入流中的位置,就應該是在某個資料到來之後;這樣就可以從這個資料中提取時間戳,作為當前水位線的時間戳了。

 

如上圖所示;每個事件產生的資料,都包含了一個時間戳,我們直接用一個整數表示。這裡沒有指定單位,可以理解為秒或者毫秒(方便起見,下面講述統一認為是秒)。當產生於2秒的資料到來之後,當前的事件時間就是2秒;在後面插入一個時間戳也為2秒的水位線,隨著資料一起向下遊流動。而當5秒產生的資料到來之後,同樣在後面插入一個水位線,時間戳也為5,當前的時鐘就推進到了5秒。這樣,如果出現下游有多個並行子任務的情形,我們只要將水位線廣播出去,就可以通知到所有下游任務當前的時間進度了。水位線就像它的名字所表達的,是資料流中的一部分,隨著資料一起流動,在不同任務之間傳輸。這看起來非常簡單;接下來我們就進一步探討一些複雜的狀況

 1. 有序流中的水位線

在理想狀態下,資料應該按照它們生成的先後順序、排好隊進入流中;也就是說,它們處理的過程會保持原先的順序不變,遵守先來後到的原則。這樣的話我們從每個資料中提取時間戳,就可以保證總是從小到大增長的,從而插入的水位線也會不斷增長、事件時鐘不斷向前推進。實際應用中,如果當前資料量非常大,可能會有很多資料的時間戳是相同的,這時每來一條資料就提取時間戳、插入水位線就做了大量的無用功。而且即使時間戳不同,同時湧來的資料時間差會非常小(比如幾毫秒),往往對處理計算也沒什麼影響。所以為了提高效率,一般會每隔一段時間生成一個水位線,這個水位線的時間戳,就是當前最新資料的時間戳。所以這時的水位線,其實就是有序流中的一個週期性出現的時間標記。   

 

這裡需要注意的是,水位線插入的“週期”,本身也是一個時間概念。在當前事件時間語義下,假如我們設定了每隔100ms生成一次水位線,那就是要等事件時鐘推進100ms才能插入;但是事件時鐘本身的進展,本身就是靠水位線來表示的——現在要插入一個水位線,可前提又是水位線要向前推進100ms,這就陷入了死迴圈。所以對於水位線的週期性生成,週期時間是指處理時間(系統時間),而不是事件時間。

2. 亂序流中的水位線

有序流的處理非常簡單,看起來水位線也並沒有起到太大的作用。但這種情況只存在於理想狀態下。我們知道在分散式系統中,資料在節點間傳輸,會因為網路傳輸延遲的不確定性,導致順序發生改變,這就是所謂的“亂序資料”。這裡所說的“亂序”(out-of-order),是指資料的先後順序不一致,主要就是基於資料的產生時間而言的。如圖6-7所示,一個7秒時產生的資料,生成時間自然要比9秒的資料早;但是經過資料快取和傳輸之後,處理任務可能先收到了9秒的資料,之後7秒的資料才姍姍來遲。這時如果我們希望插入水位線,來指示當前的事件時間進展,又該怎麼做呢? 最直觀的想法自然是跟之前一樣,我們還是靠資料來驅動,每來一個數據就提取它的時間戳、插入一個水位線。不過現在的情況是資料亂序,所以有可能新的時間戳比之前的還小,如果直接將這個時間的水位線再插入,我們的“時鐘”就回退了——水位線就代表了時鐘,時光不能倒流,所以水位線的時間戳也不能減小。解決思路也很簡單:我們插入新的水位線時,要先判斷一下時間戳是否比之前的大,否則就不再生成新的水位線,也就是說,只有資料的時間戳比當前時鐘大,才能推動時鐘前進,這時才插入水位線。  

 如果考慮到大量資料同時到來的處理效率,我們同樣可以週期性地生成水位線。這時只需要儲存一下之前所有資料中的最大時間戳,需要插入水位線時,就直接以它作為時間戳生成新的水位線

 

 

 這樣做盡管可以定義出一個事件時鐘,卻也會帶來一個非常大的問題:我們無法正確處理“遲到”的資料。在上面的例子中,當9秒產生的資料到來之後,我們就直接將時鐘推進到了9秒;如果有一個視窗結束時間就是9秒(比如,要統計0~9秒的所有資料),那麼這時視窗就應該關閉、將收集到的所有資料計算輸出結果了。但事實上,由於資料是亂序的,還可能有時間戳為7秒、8秒的資料在9秒的資料之後才到來,這就是“遲到資料”(late data)。它們本來也應該屬於0~9秒這個視窗,但此時視窗已經關閉,於是這些資料就被遺漏了,這會導致統計結果不正確。如果用之前我們類比班車的例子,現在的狀況就是商品不是按照生產時間順序到來的,所以有可能出現這種情況:9點生產的商品已經到了,我們認為已經到了9點,所以直接發車;但是可能還會有8點59分59秒生產的商品遲到了,沒有趕上這班車。那怎麼解決這個問題呢?其實我們有很多生活中的經驗。假如是一個團隊出去團建,那肯定希望每個人都不能落下;如果有人因為堵車沒能準時到車上,我們可以稍微等一會兒。9點發車,我們可以等到9點10分,等人都到齊了再出發。當然,實際應用的網路環境不可能跟北京的交通一樣堵,所以不需要等那麼久,或許只要等一兩秒鐘就可以了。具體在商品班車的例子裡,我們可以多等2秒鐘,也就是當生產時間為9點零2秒的商品到達,時鐘推進到9點零2秒,這時就認為所有8點到9點生產的商品都到齊了,可以正式發車。不過這樣相當於更改了發車時間,屬於“違規操作”。為了做到形式上仍然是9點發車,我們可以更改一下時鐘推進的邏輯:當一個商品到達時,不要直接用它的生產時間作為當前時間,而是減上兩秒,這就相當於把車上的邏輯時鐘調慢了。這樣一來,當9點生產的商品到達時,我們當前車上的時間是8點59分58秒,還沒到發車時間;當9點零2秒生產的商品到達時,車上時間剛好是9點,這時該到的商品都到齊了,準時發車就沒問題了。回到上面的例子,為了讓視窗能夠正確收集到遲到的資料,我們也可以等上2秒;也就是用當前已有資料的最大時間戳減去2秒,就是要插入的水位線的時間戳,如圖6-10所示。這樣的話,9秒的資料到來之後,事件時鐘不會直接推進到9秒,而是進展到了7秒;必須等到11秒的資料到來之後,事件時鐘才會進展到9秒,這時遲到資料也都已收集齊,0~9秒的視窗就可以正確計算結果了。

 

如果仔細觀察就會看到,這種“等2秒”的策略其實並不能處理所有的亂序資料。比如22秒的資料到來之後,插入的水位線時間戳為20,也就是當前時鐘已經推進到了20秒;對於10~20秒的視窗,這時就該關閉了。但是之後又會有17秒的遲到資料到來,它本來應該屬於10~20秒視窗,現在卻被遺漏丟棄了。那又該怎麼辦呢?既然現在等2秒還是等不到17秒產生的遲到資料,那自然我們可以試著多等幾秒,也就是把時鐘調得更慢一些。最終的目的,就是要讓視窗能夠把所有遲到資料都收進來,得到正確的計算結果。對應到水位線上,其實就是要保證,當前時間已經進展到了這個時間戳,在這之後不可能再有遲到資料來了。下面是一個示例,我們可以使用週期性的方式生成正確的水位線。

 

 第一個水位線時間戳為7,它表示當前事件時間是7秒,7秒之前的資料都已經到齊,之後再也不會有了;同樣,第二個、第三個水位線時間戳分別為12和20,表示11秒、20秒之前的資料都已經到齊,如果有對應的視窗就可以直接關閉了,統計的結果一定是正確的。這裡由於水位線是週期性生成的,所以插入的位置不一定是在時間戳最大的資料後面。另外需要注意的是,這裡一個視窗所收集的資料,並不是之前所有已經到達的資料。因為資料屬於哪個視窗,是由資料本身的時間戳決定的,一個視窗只會收集真正屬於它的那些資料。也就是說,上圖中儘管水位線W(20)之前有時間戳為22的資料到來,10~20秒的視窗中也不會收集這個資料,進行計算依然可以得到正確的結果。關於視窗的原理,我們會在後面繼續展開講解。

水位線特點

  • 水位線是插入到資料流中的一個標記,可以認為是一個特殊的資料
  • 水位線主要的內容是一個時間戳,用來表示當前事件時間的進展
  • 水位線是基於資料的時間戳生成的
  • 水位線的時間戳必須單調遞增,以確保任務的事件時間時鐘一直向前推進
  • 水位線可以通過設定延遲,來保證正確處理亂序資料
  • 一個水位線Watermark(t),表示在當前流中事件時間已經達到了時間戳t, 這代表t之前的所有資料都到齊了,之後流中不會出現時間戳t’≤t的資料