1. 程式人生 > >Storm應用系列之——Spout、Bolt API

Storm應用系列之——Spout、Bolt API

前言:

        昨天有朋友聊天說,我寫的前面三篇太簡單了,沒有太多深入的東西。好吧,這說明我的目的達到了。我寫這個系列的原因就是為了面向應用,進一步細化為兩點:

        1. 以例子說話,由簡入深,一步步瞭解如何在Storm上開發應用,不會讀起來吃力;

        2. 對於一些原理性的東西,不去過於深究,只要記住Storm是這樣實現的,開發的時候加以利用或規避。

在明白了這些基礎的東西以後,如果對於原理性的東西Storm是如何實現的感興趣,可以再去看原始碼也不遲。畢竟這部分對開發應用的幫助並不直接。我認為,不必每個用Storm的人都必須瞭解Storm底層是如何實現的,當然,我會嘗試在適當的位置插入相關原理性解釋的連結,有興趣可以直接去看看。就此原因,我把標題改成“Storm應用系列”。

注:轉帖請註明,原帖地址:

Component

Storm中,Spout和Bolt都是其Component。所以,Storm定義了一個名叫IComponent的總介面
全家普如下:
綠色部分是我們最常用、比較簡單的部分。紅色部分是與事務相關的,在以後的文章會具體講解。 BaseComponent 是Storm提供的“偷懶”的類。為什麼這麼說呢,它及其子類,都或多或少實現了其介面定義的部分方法。這樣我們在用的時候,可以直接繼承該類,而不是自己每次都寫所有的方法。但值得一提的是,BaseXXX這種定義的類,它所實現的方法,都是空的,直接返回null。 

Spout

在前面基本例子中,我們實現了一個RandomSpout,來看看其類圖


  • Spout的最頂層抽象是ISpout介面。


open方法是初始化動作。允許你在該spout初始化時做一些動作,傳入了上下文,方便取上下文的一些資料。

close方法在該spout關閉前執行,但是並不能得到保證其一定被執行。spout是作為task執行在worker內,在cluster模式下,supervisor會直接kill -9 woker的程序,這樣它就無法執行了。而在本地模式下,只要不是kill -9, 如果是傳送停止命令,是可以保證close的執行的。

activatedeactivate :一個spout可以被暫時啟用和關閉,這兩個方法分別在對應的時刻被呼叫。

nextTuple 用來發射資料。

ack(Object) 

傳入的Object其實是一個id,唯一表示一個tuple。該方法是這個id所對應的tuple被成功處理後執行。

fail(Object)

同ack,只不過是tuple處理失敗時執行。

我們的RandomSpout 由於繼承了BaseRichSpout,所以不用實現close、activate、deactivate、ack、fail和getComponentConfiguration方法,只關心最基本核心的部分。

結論:

通常情況下(Shell和事務型的除外),實現一個Spout,可以直接實現介面IRichSpout,如果不想寫多餘的程式碼,可以直接繼承BaseRichSpout。

Bolt

ExclaimBasicBolt的類圖:

這裡可以看到一個奇怪的問題: 為什麼IBasicBolt並沒有繼承IBolt? 我們帶著問題往下看。 IBolt定義了三個方法:
  • IBolt繼承了java.io.Serializable,我們在nimbus上提交了topology以後,創建出來的bolt會序列化後傳送到具體執行的worker上去。worker在執行該Bolt時,會先呼叫prepare方法傳入當前執行的上下文
  • execute接受一個tuple進行處理,並用prepare方法傳入的OutputCollector的ack方法(表示成功)或fail(表示失敗)來反饋處理結果
  • cleanup 同ISpout的close方法,在關閉前呼叫。同樣不保證其一定執行。
紅色部分是Bolt實現時一定要注意的地方。而Storm提供了IBasicBolt介面,其目的就是實現該介面的Bolt不用在程式碼中提供反饋結果了,Storm內部會自動反饋成功。 如果你確實要反饋失敗,可以丟擲FailedException。 我們來再寫一個Bolt繼承BaseRichBolt替代ExclaimBasicBolt。程式碼如下:
public class ExclaimRichBolt extends BaseRichBolt {

	private OutputCollector collector;
	
	@Override
	public void prepare(Map stormConf, TopologyContext context,
			OutputCollector collector) {
		this.collector = collector;
	}

	@Override
	public void execute(Tuple tuple) {
		this.collector.emit(tuple, new Values(tuple.getString(0)+"!"));
		this.collector.ack(tuple);
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("after_excl"));
	}

}
修改topology
//builder.setBolt("exclaim", new ExclaimBasicBolt(), 2).shuffleGrouping("spout");
builder.setBolt("exclaim", new ExclaimRichBolt(), 2).shuffleGrouping("spout");
執行下,結果一致。 結論: 通常情況下,實現一個Bolt,可以實現IRichBolt介面或繼承BaseRichBolt,如果不想自己處理結果反饋,可以實現IBasicBolt介面或繼承BaseBasicBolt,它實際上相當於自動做掉了prepare方法和collector.emit.ack(inputTuple);