1. 程式人生 > >流式統計的幾個難點

流式統計的幾個難點

流式統計聽著挺容易的一個事情,說到底不就是數數嘛,每個告警系統裡基本上都有一個簡單的流式統計模組。但是當時基於storm做的時候,這幾個問題還是困擾了我很長時間的。沒有用過spark streaming/flink,不知道下面這些問題在spark streaming/flink裡是不是都已經解決得很好了。

時間視窗切分問題

做流式統計首要的問題是把一個時間視窗內的資料統計到一起。問題是,什麼是時間視窗?有兩種選擇

  • 日誌時間(event timestamp)
  • 牆上時間(wall clock)

最簡單的時間視窗統計的是基於“牆上時間”的,每過1分鐘就切分出一個新窗口出來。比如statsd,它的視窗切分就是這樣的。這種基於“牆上時間”的統計有一個非常嚴重的問題是不能回放資料流。當資料流是實時產生的時候,“牆上時間”的一分鐘也就只會有一分鐘的event被產生出來。但是如果統計的資料流是基於歷史event的,那麼一分鐘可以產生消費的event數量只受限於資料處理速度。另外event在分散式採集的時候也遇到有快有慢的問題,一分鐘內產生的event未必可以在一分鐘內精確到達統計端,這樣就會因為採集的延遲波動影響統計資料的準確性。實際上基於“牆上時間”統計需要

collection latency = wall clock - event timestamp

基於“牆上時間”的統計需要採集延遲非常小,波動也很小才可以工作良好。大部分時候更現實的選擇是需要基於“日誌時間”來進行視窗統計的。

使用“日誌時間”就會引入資料亂序的問題,對於一個實時event stream流,其每個event的timestamp未必是嚴格遞增的。這種亂序有兩種因素引入:

  • event產生的機器的時鐘不完全同步(NTP有100ms左右的不同步)
  • event從採集到到達kafka的速度不均衡(不同的網路線路有快有慢)

我們希望的流式統計是這樣的:

但是實際上資料只是基本有序的,也就是在時間視窗的邊緣會有一些event需要跨到另外一個視窗去:

最簡單的分發event到時間視窗程式碼是這樣的

window index = event timestamp / window size

對1分鐘的時間視窗 window size 就是60,timestamp除以60為相同window index的event就是在同一個時間視窗的。問題的關鍵是,什麼時候我可以確信這個時間視窗內的event都已經到齊了。如果到齊了,就可以開始統計出這個時間視窗內的指標了。然後突然又有一個落後於大夥的event落到這個已經被計算過的時間視窗如何處理?

  • 對於大部分統計而言,一個時間視窗統計出多條結果存入db並不是什麼大的問題,從db裡查詢的時候把多條結果再合併就可以了。
  • 對於一些型別的統計(非monad),比如平均值,時間視窗內的event分為兩批統計出來的結果是沒有辦法被再次彙總的。
  • 實時類的計算對時間敏感,來晚了的資料就沒有意義了。比如告警,一個時間窗過去了就沒有必要再理會這個時間視窗了。

所以對於來晚了的資料就兩種策略:要麼再統計一條結果出來,要麼直接丟棄。要確定什麼時候一個時間視窗內的event已經到齊了,有幾種策略:

  • sleep 等待一段時間(牆上時間)
  • event timestamp超過了時間視窗一點點不關閉當前時間視窗,而是要等event timestamp大幅超出時間視窗的時候才關閉視窗。比如12:05:30秒的event到了才關閉12:04:00 ~ 12:05:00的時間視窗。
  • 一兩個event超出了時間視窗不關閉,只有當“大量”的event超出時間窗口才關閉。比如1個event超過12:05分不關閉,如果有100個event超過了12:05的時間視窗就關閉它。

三種策略其實都是“等”,只是等的依據不同。實踐中,第二種策略也就是根據“日誌時間”的等待是最容易實現的。如果對於過期的event不是丟棄,而是要再次統計一條結果出來,那麼過期的視窗要重新開啟,又要經過一輪“等待”去判斷這個過去的視窗什麼時候再被關閉。

多流合併的問題

一個kafka的partition就是一個流,一個kafka topic的多個partition就是多個獨立的流(offset彼此獨立增長)。多個kafka topic顯然是多個獨立的流。流式統計經常需要把多個流合併統計到一起。這種裡會遇到兩個難題

  • 多個流的速度不一樣,如何判斷一個時間視窗內的event都到齊了。如果按照前面的等待策略,可能處理一個流內部的基本有序區域性亂序是有效的,但是對於多個流速差異很大的流就無能為力了。一個很快的流很容易把時間視窗往後推得很遠,把其他流遠遠跑到後面。
  • 流速不均不能靠下游兜著,下游的記憶體是有限的。根本上是需要一種“背壓”的機制,讓下游通知流速過快的上游,你慢點產生新的event,等等其他人。

舉一個具體的例子:

spout 1 emit 12:05
spout 1 emit 12:06
spout 2 emit 12:04
spout 1 emit 12:07
spout 2 emit 12:05 // this is when 12:05 is ready

要想知道12:05這個時間窗的event都到齊了,首先要知道相關的流有幾個(在這例子裡是spout1和spout2兩個流),然後要知道什麼時候spout1產生了12:05的資料,什麼時候spout2產生了12:05的資料,最後才可以判斷出來12:05的資料是到齊了的。在某個地方要存一份這樣的流速的資料去跟蹤,在視窗內資料到齊之後發出訊號讓相關的下游往前推動時間視窗。考慮到一個分散式的系統,這個跟蹤要放在哪個地方做,怎麼去通知所有的相關方。

極端一些的例子

spout 1 emit 13:05
spout 2 emit 12:31
spout 1 emit 13:06
spout 2 emit 12:32

多個流的流速可能會相差到半個小時以上。考慮到如果用歷史的資料匯入到實時統計系統裡時,很容易因為計算速度不同導致不同節點之間的處理進度不一致。要計算出正確的結果,下游需要快取這些差異的半個小時內的所有資料,這樣很容易爆記憶體。但是上游如何感知到下游要處理不過來了呢?多個上游之間又如何感知彼此之間的速度差異呢?又有誰來仲裁誰應該流慢一些呢?

一個相對簡單的做法是在整個流式統計的分散式系統裡引入一個coordinator的角色。它負責跟蹤不同流的流速,在時間視窗的資料到齊之後通知下游flush,在一些上游流速過快的時候(比如最快的流相比最慢的流差距大於10分鐘)由coordinator傳送backoff指令給流速過快的上游,然後接到指令之後sleep一段時間。一段基本堪用的跟蹤不同流流速的程式碼:https://gist.github.com/taowen/2d0b3bcc0a4bfaecd404

資料一致性問題

低檔一些的說法是這樣的。假設統計出來的曲線是這樣的:

如果中間,比如08:35左右重啟了統計程式,那麼曲線能否還是連續的?

高檔一些的說法是,可以把流式統計理解為主資料庫與分析資料庫之間通過kafka訊息佇列進行非同步同步。主資料庫與分析資料庫之間應該保持eventual consistency。

要保證資料不重不丟,就要做到生產到kafka的時候,在主資料庫和kafka訊息佇列之間保持一個事務一致性。舉一個簡單的例子:

使用者下了一個訂單
主資料庫裡插入了一條訂單的資料記錄
kafka訊息佇列裡多了一條OrderPlaced的event

這個流程中一個問題就是,主資料插入成功了之後,可能往kafka訊息佇列裡enqueue event失敗。如果把這個操作反過來

使用者下了一個訂單
kafka訊息佇列裡多了一條OrderPlaced的event
主資料庫裡插入了一條訂單的資料記錄

又可能出現kafka訊息佇列裡enqueue了,但是主資料庫插入失敗的情況。就kafka佇列的目前的設計而言,對這個問題是無解的。一旦enqueue的event,除非過期是無法刪除的。

在消費端,當我們從kafka裡取出資料之後,去更新分析資料庫的過程也要保持一個分散式事務的一致性。

取出下一條OrderPlaced evnet(指向的offset+1)
當前時間窗的統計值+1
重複以上過程,直到視窗被關閉,資料寫入到分析資料庫

kafka的資料是可以重放的,只要指定offset就可以把這個offset以及之後的資料讀取出來。所謂消費的過程就是把客戶端儲存的offset值加1的過程。問題是,這個offset指標儲存在哪裡的問題。常規的做法是把消費的offset儲存到zookeeper裡。那麼這就有一個分散式的一致性問題了,zookeeper裡offset+1了,但是分析資料庫並沒有實際把值統計進去。考慮到統計一般不是每條輸入的event都會更新分析資料庫,而是把中間狀態快取在記憶體中的。那麼就有可能消費了成千上萬個event,狀態都在記憶體裡,然後“啪”的一下機器掉電了。如果每次讀取event都移動offset的話,這些event就丟掉了。如果不是每次都移動offset的話,又可能在重啟的時候導致重複統計。

搞統計的人在乎這麼一兩條資料嗎?其實大部分人是不在乎的。不少團隊壓根連offset都不儲存,每次開始統計直接seek到佇列的尾部開始。實時計算嘛,實時最重要了。準確計算?重放歷史?這個讓hadoop搞定就好了。但是如果就是要較這個真呢?或者我們不追求嚴格的強一致,只要求重啟之後曲線不斷開那麼難看就好了。

別的流式計算框架不清楚,storm的ack機制是毫無幫助的。

storm的ack機制是基於每個message來做的。這就要求如果做一個每分鐘100萬個event的統計,一分鐘就要跟蹤100萬個message id。就算是100萬個int,也是一筆相當可觀的記憶體開銷。要知道,從kafka裡讀出來的event都是順序offset的,處理也是順序,只要記錄一個offset就可以跟蹤整個流的消費進度了。1個int,相比100萬個int,storm的per message ack的機制對於流式處理的進度跟蹤來說,沒有利用訊息處理的有序性(storm根本上假設message之間是彼此獨立處理的),而變得效率低下。

要做到強一致是很困難的,它需要把

  • 更新儲存的offset
  • 更新插入分析資料庫

變成一個原子事務來完成。大部分分析資料庫都沒有原子性事務的能力,連插入三條資料都不能保持同時變為可見,且不說還要用它來記錄offset了。考慮到kafka在生產端都無法提供分散式事務,event從生產出來就不是完全一致的(多產生了或者少產生了),真正高一致的計費場景還是用其他的技術棧。所以值得解決的問題是,如何在重啟之後,把之前重啟的時候丟棄掉的記憶體狀態重新恢復出來,使得統計出來的曲線仍然是連續的。

解決思路有三點:

  • 上游備份策略:重啟的時候重放kafka的歷史資料,恢復記憶體狀態
  • 中間狀態持久化:把統計的狀態放到外部的持久的資料庫裡,不放記憶體裡
  • 同時跑兩份:同時有兩個完全一樣的統計任務,重啟一個,另外一個還能正常執行。

記憶體狀態管理的問題

做流式統計的有兩種做法:

  • 依賴於外部儲存管理狀態:比如沒收到一個event,就往redis裡發incr增1
  • 純記憶體統計:在記憶體裡設定一個counter,每收到一個event就+1

基於外部儲存會把整個壓力全部壓到資料庫上。一般來說流式統計的流速是很快的,遠大於普通的關係型資料庫,甚至可能會超過單臺redis的承載。這就使得基於純記憶體的統計非常有吸引力。大部分的時候都是在更新時間視窗內的記憶體狀態,只有當時間視窗關閉的時候才把資料刷到分析資料庫裡去。刷資料出去的同時記錄一下當前流消費到的位置(offset)。

這種純記憶體的狀態相對來說容易管理一些。計算直接是基於這個記憶體狀態做的。如果重啟丟失了,重放一段歷史資料就可以重建出來。

但是記憶體的問題是它總是不夠用的。當統計的維度組合特別多的時候,比如其中某個欄位是使用者的id,那麼很快這個記憶體狀態就會超過單機的記憶體上限。這種情況有兩種辦法:

  • 利用partition把輸入的input分割,一個流分成多個流,每個統計程式需要跟蹤的維度組合就變少了
  • 把儲存移到外邊去

簡單地在流式統計程式裡開關資料庫連線是可以解決這個容量問題的:

但是這種對外部資料庫使用不小心就會導致兩個問題:

  • 處理速度慢。不用一些批量的操作,資料庫操作很快就會變成瓶頸
  • 資料庫的狀態不一直。記憶體的狀態重啟了就丟失了,外部的狀態重啟之後不丟失。重放資料流就可能導致資料的重複統計

但是這種把視窗統計的中間狀態落地的好處也是顯而易見的。重啟之後不用通過重算來恢復記憶體狀態。如果一個時間視窗有24小時,重算24小時的歷史資料可能是很昂貴的操作。

版本跟蹤,批量等都不應該是具體的統計邏輯的實現者的責任。理論上框架應該負責把冷熱資料分離,自動把冷資料下沉到外部的儲存,以把本地記憶體空閒出來。同時每次小批量處理event的時候都要記錄處理的offset,而不是要等到視窗關閉等待時候。

資料庫狀態和記憶體狀態要變成一個緊密結合的整體。可以把兩者的關係想象成作業系統的filesystem page cache。用mmap把狀態對映到記憶體裡,由框架負責什麼時候把記憶體裡的變更持久化到外部儲存裡。

總結

基於storm做流式統計缺乏對以下四個基本問題的成熟解決方案。其trident框架可能可以提供一些答案,但是實踐中好像使用的人並不多,資料也太少了。可以比較自信的說,不僅僅是storm,對於大多數流式計算平臺都是如此。

  • 時間視窗切分的問題
  • 多流合併的問題
  • 資料一致性問題(重啟之後曲線斷開的問題)
  • 記憶體狀態管理問題

這些問題要好好解決,還是需要一番功夫的。新一代的流式計算框架比如spark streaming/flink應該有很多改進。即便底層框架提供了支援,從這四個角度去考察一下它們是如何支援的也是非常有裨益的事情。