1. 程式人生 > >Storm應用系列之——可靠性與acker機制

Storm應用系列之——可靠性與acker機制

本文屬原創系列,轉載請註明。

對於Storm,它有一個很重要的特性:“Guarantee no data loss” ——可靠性

很顯然,要做到這個特性,必須要track每個data的去向和結果。Storm是如何做到的呢——acker機制。

先概括下acker所參與的工作流程:

1. Spout建立一個新的Tuple時,會發一個訊息通知acker去跟蹤;

2. Bolt在處理Tuple成功或失敗後,也會發一個訊息通知acker;

3. acker會找到發射該Tuple的Spout,回撥其ack或fail方法。

我們說RichBolt和BasicBolt的區別是後者會自動ack。那麼是不是我們只要實現了Spout的ack或fail方法就能看到反饋了呢?

試試在RandomSpout中加入如下程式碼:

@Override
    public void ack(Object msgId) {
		System.err.println("ack " + msgId);
    }

    @Override
    public void fail(Object msgId) {
    	System.err.println("fail " + msgId);
    }

重新執行ExclaimBasicTopo,看下結果。並沒有任何的ack 和 fail 出現?

原因是,Storm要求如果要track一個Tuple,必須要指定其messageId,也就是回調回ack和fail方法的引數。如果我們不指定,Storm是不會去track該tuple的,即不保證訊息丟失!

我們改下Spout程式碼,為每個訊息加入一個唯一Id。同時,為了方便看結果,加入更多的列印,並且靠sleep減慢傳送速度。(只是為了演示!)

public class RandomSpout extends BaseRichSpout {

	private SpoutOutputCollector collector;

	private Random rand;
	
	private AtomicInteger counter;
	
	private static String[] sentences = new String[] {"edi:I'm happy", "marry:I'm angry", "john:I'm sad", "ted:I'm excited", "laden:I'm dangerous"};
	
	@Override
	public void open(Map conf, TopologyContext context,
			SpoutOutputCollector collector) {
		this.collector = collector;
		this.rand = new Random();
		counter = new AtomicInteger();
	}

	@Override
	public void nextTuple() {
		Utils.sleep(5000);
		String toSay = sentences[rand.nextInt(sentences.length)];
		int msgId = this.counter.getAndIncrement();
		toSay = "["+ msgId + "]"+ toSay;
		PrintHelper.print("Send " + toSay );
		
		this.collector.emit(new Values(toSay), msgId);
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("sentence"));
	}
	
	@Override
    public void ack(Object msgId) {
		PrintHelper.print("ack " + msgId);
    }

    @Override
    public void fail(Object msgId) {
    	PrintHelper.print("fail " + msgId);
    }

}

PrintHelper類:

public class PrintHelper {

	private static SimpleDateFormat sf = new SimpleDateFormat("mm:ss:SSS");
	
	public static void print(String out){
		System.err.println(sf.format(new Date()) + " [" + Thread.currentThread().getName() + "] " + out);
	}
	
}

同時把PrintBolt裡面列印也換成PrintHelper.print列印


看下列印結果:
53:33:891 [Thread-26-spout] Send [0]ted:I'm excited
53:33:896 [Thread-20-print] Bolt[0] String recieved: [0]ted:I'm excited!
53:38:895 [Thread-26-spout] Send [1]edi:I'm happy
53:38:895 [Thread-22-print] Bolt[1] String recieved: [1]edi:I'm happy!
53:38:895 [Thread-26-spout] ack 0
53:43:896 [Thread-26-spout] Send [2]edi:I'm happy
53:43:896 [Thread-22-print] Bolt[1] String recieved: [2]edi:I'm happy!
53:43:896 [Thread-26-spout] ack 1
53:48:896 [Thread-26-spout] Send [3]edi:I'm happy
53:48:896 [Thread-26-spout] ack 2
53:48:896 [Thread-24-print] Bolt[2] String recieved: [3]edi:I'm happy!
53:53:896 [Thread-26-spout] Send [4]ted:I'm excited
53:53:896 [Thread-26-spout] ack 3
53:53:896 [Thread-20-print] Bolt[0] String recieved: [4]ted:I'm excited!
53:58:897 [Thread-26-spout] Send [5]laden:I'm dangerous
53:58:897 [Thread-26-spout] ack 4
53:58:898 [Thread-24-print] Bolt[2] String recieved: [5]laden:I'm dangerous!

很明顯看到:

a. 併發度為1的Spout確實是一個執行緒,併發度為3的Bolt確實是三個執行緒;

b. 訊息完全處理完成後,確實回調了ack(Object msgId)方法,而且msgId的值,即為我們emit的msgId;

c. 雖然我們在topology中定義了兩個bolt,但實際上ack對於每個tuple只調用了一次;

d. spout發出tuple後,Bolt很快就完成了,但是ack直到5秒後spout醒來才打印。

Tuple樹

對於Spout建立的Tuple,在topology定義的流水線中經過Bolt處理時,可能會產生一個或多個新的Tuple。源Tuple+新產生的Tuple構成了一個Tuple樹。當整棵樹被處理完成,才算一個Tuple被完全處理,其中任何一個節點的Tuple處理失敗或超時,則整棵樹失敗。

超時的值,可以通過定義topology時,conf.setMessageTimeoutSecs方法指定。

Anchor

在我們例子中ExclaimRichBolt

collector.emit(inputTule, new Values(newTupleValue));

發射一個新的tuple。

第一個引數是傳入Bolttuple,第二個引數是新產生的tuplevalue,這種emit的方式,在Storm中稱為: "anchor"。

Tuple的ack

前面我們一直提到acker,看到這裡,你應該能猜出acker其實就是Storm裡面track一個Tuple保證其一定被處理的功能。acker也是一個component

我們來看看acker的工作流程:

1. Spout在初始化時會產生一個tasksId;

2. Spout中建立新的Tuple,其id是一個64位的隨機數;

3. Spout將新建的Tuple傳送出去(給出了messageId來開啟Tuple的追蹤), 同時會發送一個訊息到某個acker,要求acker進行追蹤。該訊息包含兩部分:

  • Spout的taskId:使用者acker在整個tuple樹被完全處理後找到原始的Spout進行回撥ack或fail
  • 一個64位的ack val值: 標誌該tuple是否被完全處理。初始值為0。

3. 一個Bolt在處理完Tuple後,如果發射了一個新的anchor tuple,Storm會維護anchor tuple的列表;

4. 該Bolt呼叫OutputCollector.ack()時,Storm會做如下操作:

  • anchor tuple列表中每個已經ack過的和新建立的Tuple的id做異或(XOR)。假定Spout發出的TupleID是tuple-id-0,該Bolt新生成的TupleID為tuple-id-1,那麼,tuple-id-0XORtuple-id-0XOR tuple-id-1
  • Storm根據該原始TupleID進行一致性hash演算法,找到最開始Spout傳送的那個acker,然後把上面異或後得出的ack val值傳送給acker

5. acker收到新的ack val值後,與儲存的原始的Tuple的id進行異或,如果為0,表示該Tuple已被完全處理,則根據其taskId找到原始的Spout,回撥其ack()方法。

fail的機制類似,在發現fail後直接回調Spout的fail方法。

Storm就是通過這個acker的機制來保證資料不丟失。

回頭再看看上面的列印結果,b、c兩條得到很好的解釋了。那d是為什麼呢?

在最開始時,我曾經提到過,Storm的設計模型中,Spout是源源不斷的產生資料的,所以其nextTuple()方法在任何時候不應該被打斷。ack,fail 和 nextTuple是在同一個執行緒中完成的。

所以,雖然acker發現一個Tuple已經完全處理完成,但是由於Spout執行緒在Sleep,無法回撥。

在設計中,我們應儘量避免在Spout、Bolt中去Sleep。如果確實需要控制,最好用非同步執行緒來做,例如用非同步執行緒讀取資料到佇列,再由Spout去取佇列中資料。非同步執行緒可以隨意控制速度等。

另外,

Storm是否會自動重發失敗的Tuple?

這裡答案已經很明顯了。fail方法如何實現取決於你自己。只有在fail中做了重發機制,才有重發。

注:Trident除外。這是Storm提供的特殊的事務性API,它確實會幫你自動重發的。

Unanchor

如果我們在Bolt中用OutputCollector.emit()發射一個新的Tuple時,並沒有指定輸入的Tuple(IBasicBolt的實現類用的是BasicOutPutCollector,其emit方法實際上還是呼叫OutputCollector.emit(),只不過內部會幫你填上輸入的Tuple),那麼行為稱之為“Unanchor”。

是否用Unanchor方式取決於你的實現。

調整可靠性

在某些特定的情況下,你或許想調整Storm的可靠性。例如,你並不關心資料是否丟失,或者你想看看後面是否有某個Bolt拖慢了Spout的速度? 那麼,有三種方法可以實現: 1. 在build topology時,設定acker數目為0,即conf.setNumAckers(0); 2. 在Spout中,不指定messageId,使得Storm無法追蹤; 3. 在Bolt中,使用Unanchor方式發射新的Tuple。