Storm Window機制詳解
概念
window 型別
Tumbling Window
按照固定的時間間隔或者Tuple數量劃分視窗。
例子一,按照固定時間滾動,5秒滾一個視窗:
| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |...
0 5 10 15 -> time
| w1 | w2 | w3 |...
例子二,按照固定Tuple數量滾動,5個Tuple滾一個視窗
| e1 e2 e3 e4 e5 | e6 e7 e8 e9 e10 |... 0 5 10 -> count | w1 | w2 |...
Sliding Window
也可以根據時間間隔或者Tuple數量來劃分視窗,由於視窗長度也可以是時間或者Tuple數量,所以Sliding Window的形式比Tumbling Window多
例子一,視窗長度為10s,滑動間隔為5s
| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |...
0 5 10 15 -> time
w1----->|
|----------w2---------|
|-----------w3-----------|
例子二,視窗長度為10s,滑動間隔為5個Tuple
| e1 e2 e3 e4 e5 | e6 e7 e8 e9 e10 | e11 ...
0 5 10 -> count
|-------w1------|
|----------w2------------|
例子三,視窗長度為10個Tuple,滑動間隔為5個Tuple
| e1 e2 e3 e4 e5 | e6 e7 e8 e9 e10 | e11 e12 e13 e14 e15 | ... 0 5 10 15 -> count |-------w1-------| |-----------------w2---------------| |-------------------w2-----------------|
例子四,視窗長度為10個Tuplp,滑動間隔為5s
| e1 e2 e3 e4 e5 e6 e7 e8 e9 e10 e11 e12 e13 e14 e15 ...
0 5 10 15 -> time
|------------w1---------|
|--------------w2--------------|
當視窗長度和滑動距離相等時,便成了滾動視窗
TriggerPolicy
視窗的觸發策略,用於確定視窗的計算點,以時間或者Tuplt數量為標準
EvictionPolicy
視窗的事件回收策略,用標記的方式確定事件是否屬於本次視窗
Watermark和Lag
Watermark用於標記資料的處理進度,Lag主要是應對資料亂序的情況。
從當前資料中的最新一條資料的時間算起,往前減去Lag,得到一個時間,這個時間成為Watermark,認為Watermark之前的資料都已經到了。
06:00:00的資料有可能在06:00:06之後才到,若Lag=5s,不好意思,進不了視窗,會被 當成超時的資料。
程式碼分析
EvictionPolicy
事件回收策略介面,目前有4種實現,用於將event標記為以下4個狀態:
EXPIRE:失效的事件,會從queue中移除
PROCESS:將在最近的一個window中處理事件
KEEP:將在以後的window中處理些事件
STOP:停止處理些事件之後的event,認為此event之後的event將不再滿足這個策略
介面程式碼如下:
public interface EvictionPolicy<T> {
enum Action {
EXPIRE,PROCESS,KEEP,STOP
}
Action evict(Event<T> event); //對事件進行標記
void track(Event<T> event); //對事件進行跟蹤
void setContext(EvictionContext context); //設定context
}
分別介紹4種EvictionPolicy的實現類。
CountEvictionPolicy
以event數量做為視窗長度,只會標記兩種狀態:EXPIRE和PROCESS,有track()和evict()兩個主要方法。
track方法如下,用成員變數currentCount記錄已經跟蹤的event資料,但並不包括watermark event。
@Override
public void track(Event<T> event) {
if (!event.isWatermark()) {
currentCount.incrementAndGet();
}
}
evict方法如下,返回一個標記後的Action,成員變數threshold即為視窗長度,當currentCount的值大於threshold時,則表示此事件不在本次視窗處理之內,需要標記為EXPIRE,否則標記為PROCESS。
@Override
public Action evict(Event<T> event) {
while (true) {
long curVal = currentCount.get();
if (curVal > threshold) {
if (currentCount.compareAndSet(curVal, curVal - 1)) {
return Action.EXPIRE;
}
} else {
break;
}
}
return Action.PROCESS;
}
WatermarkCountEvictionPolicy
繼承自CountEvictionPolicy,增加referenceTime和processed兩個私有成員變數
referenceTime:即watermark(後面有解釋)
processed:本次視窗中已經被標記為PROCESS狀態的event數量
WatermarkCountEvictionPolicy比CountEvictionPolicy多了一個KEEP狀態,被標記為KEEP狀態的event將在下一個視窗中處理。
track()方法實現為空
evict的實現如下
@Override
public Action evict(Event<T> event) {
Action action;
if (event.getTimestamp() <= referenceTime && processed < currentCount.get()) {
action = super.evict(event);
if (action == Action.PROCESS) {
++processed;
}
} else {
action = Action.KEEP;
}
return action;
}
TimeEvictionPolicy
以時間做為視窗的長度,只會標記兩種狀態:EXPIRE和PROCESS,用事件的時間和最新的時間做為標記的依據
track方法實現為空
evicty方法如下,成員變數referenceTime可能是在window中計算而來,或者是系統當前時間,
public Action evict(Event<T> event) {
long now = referenceTime == null ? System.currentTimeMillis() : referenceTime;
long diff = now - event.getTimestamp();
if (diff >= windowLength) {
return Action.EXPIRE;
}
return Action.PROCESS;
}
WatermarkTimeEvictionPolicy
WatermarkTimeEvictionPolicy繼承自TimeEvictionPolicy,增加了成員變數lag,標記的狀態也比TimeEvictionPolicy多了STOP和KEEP
重寫了evict方法
1 public Action evict(Event<T> event) {
2 long diff = referenceTime - event.getTimestamp();
3 if (diff < -lag) {
4 return Action.STOP;
5 } else if (diff < 0) {
6 return Action.KEEP;
7 } else {
8 return super.evict(event);
9 }
10 }
第3行的判斷可以理解為event的時間比referenceTime大了一個lag以上,在此標記為STOP,後面的scan方法將在此處停止,認為後面的event都不可能在本次視窗事件中處理
第5行的判斷為event的時間比referenceTime大了一個lag以內,標記為KEEP。
TriggerPolicy
window的觸發策略介面,滿足觸發條件時,WindowManager的onTrigger方法得以執行。介面程式碼如下:
public interface TriggerPolicy<T> {
void track(Event<T> event); //跟蹤每個event,看是否滿足觸發條件
void reset();
void start();
void shutdown();
}
也有4個實現類
CountTriggerPolicy
track方法如下,很簡單,當跟蹤的event資料大於count時,觸發onTrigger,count為構造方法傳的觸發上限。
public void track(Event<T> event) {
if (started && !event.isWatermark()) {
if (currentCount.incrementAndGet() >= count) {
evictionPolicy.setContext(new DefaultEvictionContext(System.currentTimeMillis()));
handler.onTrigger();
}
}
}
WatermarkCountTriggerPolicy
直接實現TriggerPolicy,而不是繼承CountTriggerPolicy。由於watermark event來觸發onTrigger
track方法如下:
public void track(Event<T> event) {
if (started && event.isWatermark()) {
handleWaterMarkEvent(event);
}
}
private void handleWaterMarkEvent(Event<T> waterMarkEvent) {
long watermarkTs = waterMarkEvent.getTimestamp();
List<Long> eventTs = windowManager.getSlidingCountTimestamps(lastProcessedTs, watermarkTs, count);
for (long ts : eventTs) {
evictionPolicy.setContext(new DefaultEvictionContext(ts, null, Long.valueOf(count)));
handler.onTrigger();
lastProcessedTs = ts;
}
}
TimeTriggerPolicy
定時來觸發視窗的計算。這個實現中用會啟動一個ScheduledExecutorService定時器,週期性執行內部執行緒newTriggerTask,由newTriggerTask來呼叫onTrigger方法。(程式碼很簡單,就不貼啦)
WatermarkTimeTriggerPolicy
這個類也是直接實現了TriggerPolicy介面,在track方法中判斷事件是否是watermark event,來決定是否觸發視窗計算。
track方法如下:
public void track(Event<T> event) {
if (started && event.isWatermark()) {
handleWaterMarkEvent(event);
}
}
handleWaterMarkEvent方法通過while偱環,windowEndTs是當然計算的視窗的終點,起點就是終點減去視窗長度,本次視窗計算結束,onTrigger如果返回為true,windowEndTs將加上一個slidingIntervalMs(滑動長度)做為下一個視窗的終點。onTrigger如果返回為false,則表示這次計算的視窗中沒有event,將通過getNextAlignedWindowT方法來找到下一個視窗的終點。
private void handleWaterMarkEvent(Event<T> event) {
long watermarkTs = event.getTimestamp();
long windowEndTs = nextWindowEndTs;
LOG.debug("Window end ts {} Watermark ts {}", windowEndTs, watermarkTs);
while (windowEndTs <= watermarkTs) {
long currentCount = windowManager.getEventCount(windowEndTs);
evictionPolicy.setContext(new DefaultEvictionContext(windowEndTs, currentCount));
if (handler.onTrigger()) {
windowEndTs += slidingIntervalMs;
} else {
/*
* 如果上次onTrigger沒有event,將通過getNextAlignedWindowTs方法來找到下一個視窗
*/
long ts = getNextAlignedWindowTs(windowEndTs, watermarkTs);
LOG.debug("Next aligned window end ts {}", ts);
if (ts == Long.MAX_VALUE) {
LOG.debug("No events to process between {} and watermark ts {}", windowEndTs, watermarkTs);
break;
}
windowEndTs = ts;
}
}
nextWindowEndTs = windowEndTs;
}
getNextAlignedWindowT方法通過找到windowEndTs到watermark這段時間裡最早的一個event的時間戳,以nextTs + (slidingIntervalMs - (nextTs % slidingIntervalMs))的方式做時間對齊,找到小於最早時間戳裡,能被滑動間隔整除的最小的一個時間點,做為下次計算的視窗的終點。
private long getNextAlignedWindowTs(long startTs, long endTs) {
long nextTs = windowManager.getEarliestEventTs(startTs, endTs);
if (nextTs == Long.MAX_VALUE || (nextTs % slidingIntervalMs == 0)) {
return nextTs;
}
return nextTs + (slidingIntervalMs - (nextTs % slidingIntervalMs));
}
通過EvictionPolicy和TriggerPolicy這兩個介面的組合,形成了前面所講的6個視窗型別。這兩個介面的實現類中,也可以分為帶watermark的類和不帶watermark的類,如果使用者設定了時間欄位,就會以帶watermark的類處理event.
WaterMark的計算
在api中使用以下方法來改變watermark的產生週期,預設值是1000ms
public BaseWindowedBolt withWatermarkInterval(Duration interval)
interval實際被設定到了WaterMarkEventGenerator中,WaterMarkEventGenerator是一個執行緒,每隔interval時間間隔被ScheduledExecutorService執行一次,看以下WaterMarkEventGenerator中的幾個關鍵方法。
track
/**
* Tracks the timestamp of the event in the stream, returns
* true if the event can be considered for processing or
* false if its a late event.
* track在WindowedBoltExecutor的execute方法中被呼叫,以(ts >= lastWaterMarkTs)來判斷事件是否應該被放到queue中
*/
public boolean track(GlobalStreamId stream, long ts) {
Long currentVal = streamToTs.get(stream);
if (currentVal == null || ts > currentVal) {
streamToTs.put(stream, ts); //更新streamid對應的時間戳
}
checkFailures();
return ts >= lastWaterMarkTs;
}
computeWaterMarkTs
/**
*計算新的watermark,watermark是所有輸入流中最新的tuple時間戳的最小值(減去延時)
*/
private long computeWaterMarkTs() {
long ts = 0;
// only if some data has arrived on each input stream
if (streamToTs.size() >= inputStreams.size()) {
ts = Long.MAX_VALUE;
for (Map.Entry<GlobalStreamId, Long> entry : streamToTs.entrySet()) {
ts = Math.min(ts, entry.getValue());
}
}
return ts - eventTsLag;
}
Example
(來自於官網)
若基於以下引數和資料:
Window length = 20s, sliding interval = 10s, watermark emit frequency = 1s, max lag = 5s
當前時間 = 09:00:00
Tuples e1(6:00:03), e2(6:00:05), e3(6:00:07), e4(6:00:18), e5(6:00:26), e6(6:00:36)
在 9:00:00
and 9:00:01
之前到達
在 09:00:01
產生了新的 watermark, w1 = 6:00:31
,此時,早於 6:00:31
的event到達時,將被當成超時資料。
通過事件中最早的一個 timestamp (06:00:03)和sliding interval來計算後,將產生三個window,第一個window的終點是06:00:10,如下所示:
5:59:50 - 06:00:10
: e1, e2, e36:00:00 - 06:00:20
: e1, e2, e3, e46:00:10 - 06:00:30
: e4, e5
由於e6(6:00:36) 比watermark(6:00:31)要晚,所以不在本次觸發中處理。
Tuples e7(8:00:25), e8(8:00:26), e9(8:00:27), e10(8:00:39)
在 9:00:01
and 9:00:02
之間到達
在 09:00:02
產生下一個 watermark, w2 = 08:00:34
,此時,早於 8:00:34
的event到達時,將被當成超時資料。
將產生以下window:
6:00:20 - 06:00:40
: e5, e66:00:30 - 06:00:50
: e68:00:10 - 08:00:30
: e7, e8, e9
e10 (8:00:39
)比 watermark 8:00:34
晚,所以不在本次處理
其它
第一個視窗怎麼產生
程序啟動初次觸發視窗計算時,WindowManager的onTrigger()方法會返回false,WatermarkTimeTriggerPolicy中的getNextAlignedWindowTs()方法會被呼叫,從而產生第一個真正的Window。
之後每次觸發視窗計算,會用上一次計算的最後一個視窗的結束時間加上sliding interval得到本次計算的下一個視窗的結束時間。
Guarantees
storm window提供了at-least once的保障,tuple在window中經過 windowLength + slidingInterval
後被expire後,然後自動被ack,因此topology.message.timeout.secs
必須遠大於 windowLength + slidingInterval
。如果是基於count觸發的window,需要去結合實際的視窗長度和滑動時間才調整超時時間的大小。