Storm應用系列之——可靠性與acker機制
本文屬原創系列,轉載請註明。
對於Storm,它有一個很重要的特性:“Guarantee no data loss” ——可靠性
很顯然,要做到這個特性,必須要track每個data的去向和結果。Storm是如何做到的呢——acker機制。
先概括下acker所參與的工作流程:
1. Spout建立一個新的Tuple時,會發一個訊息通知acker去跟蹤;
2. Bolt在處理Tuple成功或失敗後,也會發一個訊息通知acker;
3. acker會找到發射該Tuple的Spout,回撥其ack或fail方法。
我們說RichBolt和BasicBolt的區別是後者會自動ack。那麼是不是我們只要實現了Spout的ack或fail方法就能看到反饋了呢?
試試在RandomSpout中加入如下程式碼:
@Override
public void ack(Object msgId) {
System.err.println("ack " + msgId);
}
@Override
public void fail(Object msgId) {
System.err.println("fail " + msgId);
}
重新執行ExclaimBasicTopo,看下結果。並沒有任何的ack 和 fail 出現?
原因是,Storm要求如果要track一個Tuple,必須要指定其messageId,也就是回調回ack和fail方法的引數。如果我們不指定,Storm是不會去track該tuple的,即不保證訊息丟失!
我們改下Spout程式碼,為每個訊息加入一個唯一Id。同時,為了方便看結果,加入更多的列印,並且靠sleep減慢傳送速度。(只是為了演示!)
public class RandomSpout extends BaseRichSpout { private SpoutOutputCollector collector; private Random rand; private AtomicInteger counter; private static String[] sentences = new String[] {"edi:I'm happy", "marry:I'm angry", "john:I'm sad", "ted:I'm excited", "laden:I'm dangerous"}; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; this.rand = new Random(); counter = new AtomicInteger(); } @Override public void nextTuple() { Utils.sleep(5000); String toSay = sentences[rand.nextInt(sentences.length)]; int msgId = this.counter.getAndIncrement(); toSay = "["+ msgId + "]"+ toSay; PrintHelper.print("Send " + toSay ); this.collector.emit(new Values(toSay), msgId); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sentence")); } @Override public void ack(Object msgId) { PrintHelper.print("ack " + msgId); } @Override public void fail(Object msgId) { PrintHelper.print("fail " + msgId); } }
PrintHelper類:
public class PrintHelper {
private static SimpleDateFormat sf = new SimpleDateFormat("mm:ss:SSS");
public static void print(String out){
System.err.println(sf.format(new Date()) + " [" + Thread.currentThread().getName() + "] " + out);
}
}
同時把PrintBolt裡面列印也換成PrintHelper.print列印
看下列印結果:
53:33:891 [Thread-26-spout] Send [0]ted:I'm excited
53:33:896 [Thread-20-print] Bolt[0] String recieved: [0]ted:I'm excited!
53:38:895 [Thread-26-spout] Send [1]edi:I'm happy
53:38:895 [Thread-22-print] Bolt[1] String recieved: [1]edi:I'm happy!
53:38:895 [Thread-26-spout] ack 0
53:43:896 [Thread-26-spout] Send [2]edi:I'm happy
53:43:896 [Thread-22-print] Bolt[1] String recieved: [2]edi:I'm happy!
53:43:896 [Thread-26-spout] ack 1
53:48:896 [Thread-26-spout] Send [3]edi:I'm happy
53:48:896 [Thread-26-spout] ack 2
53:48:896 [Thread-24-print] Bolt[2] String recieved: [3]edi:I'm happy!
53:53:896 [Thread-26-spout] Send [4]ted:I'm excited
53:53:896 [Thread-26-spout] ack 3
53:53:896 [Thread-20-print] Bolt[0] String recieved: [4]ted:I'm excited!
53:58:897 [Thread-26-spout] Send [5]laden:I'm dangerous
53:58:897 [Thread-26-spout] ack 4
53:58:898 [Thread-24-print] Bolt[2] String recieved: [5]laden:I'm dangerous!
很明顯看到:
a. 併發度為1的Spout確實是一個執行緒,併發度為3的Bolt確實是三個執行緒;
b. 訊息完全處理完成後,確實回調了ack(Object msgId)方法,而且msgId的值,即為我們emit的msgId;
c. 雖然我們在topology中定義了兩個bolt,但實際上ack對於每個tuple只調用了一次;
d. spout發出tuple後,Bolt很快就完成了,但是ack直到5秒後spout醒來才打印。
Tuple樹
對於Spout建立的Tuple,在topology定義的流水線中經過Bolt處理時,可能會產生一個或多個新的Tuple。源Tuple+新產生的Tuple構成了一個Tuple樹。當整棵樹被處理完成,才算一個Tuple被完全處理,其中任何一個節點的Tuple處理失敗或超時,則整棵樹失敗。
超時的值,可以通過定義topology時,conf.setMessageTimeoutSecs方法指定。
Anchor
在我們例子中ExclaimRichBolt用
collector.emit(inputTule, new Values(newTupleValue));
發射一個新的tuple。
第一個引數是傳入Bolt的tuple,第二個引數是新產生的tuple的value,這種emit的方式,在Storm中稱為: "anchor"。
Tuple的ack
前面我們一直提到acker,看到這裡,你應該能猜出acker其實就是Storm裡面track一個Tuple保證其一定被處理的功能。acker也是一個component。
我們來看看acker的工作流程:
1. Spout在初始化時會產生一個tasksId;
2. Spout中建立新的Tuple,其id是一個64位的隨機數;
3. Spout將新建的Tuple傳送出去(給出了messageId來開啟Tuple的追蹤), 同時會發送一個訊息到某個acker,要求acker進行追蹤。該訊息包含兩部分:
- Spout的taskId:使用者acker在整個tuple樹被完全處理後找到原始的Spout進行回撥ack或fail
- 一個64位的ack val值: 標誌該tuple是否被完全處理。初始值為0。
3. 一個Bolt在處理完Tuple後,如果發射了一個新的anchor tuple,Storm會維護anchor tuple的列表;
4. 該Bolt呼叫OutputCollector.ack()時,Storm會做如下操作:
- 將anchor tuple列表中每個已經ack過的和新建立的Tuple的id做異或(XOR)。假定Spout發出的TupleID是tuple-id-0,該Bolt新生成的TupleID為tuple-id-1,那麼,tuple-id-0XORtuple-id-0XOR tuple-id-1
- Storm根據該原始TupleID進行一致性hash演算法,找到最開始Spout傳送的那個acker,然後把上面異或後得出的ack val值傳送給acker
5. acker收到新的ack val值後,與儲存的原始的Tuple的id進行異或,如果為0,表示該Tuple已被完全處理,則根據其taskId找到原始的Spout,回撥其ack()方法。
fail的機制類似,在發現fail後直接回調Spout的fail方法。
Storm就是通過這個acker的機制來保證資料不丟失。
回頭再看看上面的列印結果,b、c兩條得到很好的解釋了。那d是為什麼呢?
在最開始時,我曾經提到過,Storm的設計模型中,Spout是源源不斷的產生資料的,所以其nextTuple()方法在任何時候不應該被打斷。ack,fail 和 nextTuple是在同一個執行緒中完成的。
所以,雖然acker發現一個Tuple已經完全處理完成,但是由於Spout執行緒在Sleep,無法回撥。
在設計中,我們應儘量避免在Spout、Bolt中去Sleep。如果確實需要控制,最好用非同步執行緒來做,例如用非同步執行緒讀取資料到佇列,再由Spout去取佇列中資料。非同步執行緒可以隨意控制速度等。
另外,
Storm是否會自動重發失敗的Tuple?
這裡答案已經很明顯了。fail方法如何實現取決於你自己。只有在fail中做了重發機制,才有重發。
注:Trident除外。這是Storm提供的特殊的事務性API,它確實會幫你自動重發的。
Unanchor
如果我們在Bolt中用OutputCollector.emit()發射一個新的Tuple時,並沒有指定輸入的Tuple(IBasicBolt的實現類用的是BasicOutPutCollector,其emit方法實際上還是呼叫OutputCollector.emit(),只不過內部會幫你填上輸入的Tuple),那麼行為稱之為“Unanchor”。
是否用Unanchor方式取決於你的實現。