Storm解讀之路(二、基本 Java-API 篇)
寫這些東西其實本質上是記錄因工作接觸 Storm 之後的學習進度,既然是工作,當然要敲程式碼,所以這一篇就分享下基本 Java-API 吧。
首先看下面的圖(畫圖不行見諒),這是 Storm API 使用中最基本的介面和抽象類關係。
OK,這裡我們可以清楚的看到,IComponent 是 API 核心介面,那麼其是怎麼的構成呢?
public interface IComponent extends Serializable {
/**
* @param declarer this is used to declare output stream ids, output fields, and whether or not each output stream is a direct stream
*/
void declareOutputFields(OutputFieldsDeclarer declarer);
Map<String, Object> getComponentConfiguration();
}
這兩個方法很簡單,declareOutputFields 是申明 topology 中流的輸出模式(具體講 Stream 模式的時候再說),而 getComponentConfiguration 是獲取 Storm 配置資訊的。
其實在 Visio 圖中是有兩個基礎介面我沒畫出來的,分別是 ISpout 和 Ibolt,為什麼呢?因為我們可以理解為 IRichSpout 和 IRichBolt 就是兩者與 IComponent 的合體(繼承)。接著一個個來,先說 Spout:
void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
//Spout 終止的時候呼叫(不保證一定被呼叫)
void close();
//Spout 啟用的時候呼叫
void activate();
//Spout 失活(可能重新啟用)時呼叫,呼叫此方法時不會呼叫 nextTuple
void deactivate();
void nextTuple();
void ack(Object msgId);
void fail(Object msgId);
簡單容易理解的方法在這裡就不提了(做了註釋),說一下比較重要的方法。
先說 open,當 Spout 初始化的那時候呼叫,一共接收了三個物件,一個配置物件、一個 Topology 上下文物件,還有一個 輸出控制器物件。重點提一下 SpoutOutputCollector 這個類,這個是控制整個 Spout 關於元組傳輸的類,很重要,主要關注下面幾個方法:
List<Integer> emit(String streamId, List<Object> tuple, Object messageId);
void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId);
long getPendingCount();
前兩個方法,都是將 tuple 提交到 stream 中去的,區別在於後者是定向提交的。其可以傳遞引數
int taskId, String streamId, List<Object> tuple, Object messageId
前三個引數分別的意義就是字面上意思(論取名的規範性),而 messageId 是作錨定所用(之後談)的。
然後說下 nextTuple 方法,這是個 non-blocking 的方法,也就是說當沒有 tuple 來 emit 的時候,其是立即返回的(非阻塞的)。好像 Storm 是在 0.8.1 版本之後 emit 空的話 nextTuple 就預設 sleep 1秒鐘(可配置,SleepSpoutWaitStrategy 介面),主要為了 cpu 資源的合理分配。總之你的 topology 活著(除了某些特例情況),你的 nextTuple 方法就是不斷被呼叫的,一直請求 tuple,一般我們也在這裡呼叫 SpoutOutputCollector 物件的 emit 方法傳送資料。
最後說下 ack、fail 方法,連帶 messageId 一起提。講之前先說下,Storm Spout 的 nextTuple、ack、fail 好像是一個執行緒的,所以才設計為非阻塞模式,具體底層我也看不了,哎(據說 JStorm 是分了多執行緒的)。所以可以根據實際情況把 nextTuple 的業務執行緒單出來。OK,迴歸正題,ack 方法是 Storm 錨定機制,要說簡單點的話可以這要講:Spout emit 一個 tuple,如果攜帶了 messageId(別告訴我你忘記這東西了),這個 tuple 的傳遞過程就將被追蹤,一直到其傳送成功或者失敗呼叫 fail 方法。關於 fail 方法,預設是 tuple 失敗後重新進入 queue,重發。具體的重發配置我還沒研究,有研究的朋友可以交流下,另外 getPendingCount 方法我也沒搞懂什麼作用,懂的朋友一樣歡迎指教,開源萬歲!
Spout 講完接著咱說 Bolt,老樣子,先看看原始碼
void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
void execute(Tuple input);
//Bolt 終止時呼叫(不保證一定被呼叫)
void cleanup();
同 Spout,cleanup 就不解釋了,這裡說說 prepare 和 execute。先說 prepare 方法:
這是 Bolt 的初始化方法,三個物件和 Spout 不一樣的只有 OutputCollector:
List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple);
void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple);
void ack(Tuple input);
void fail(Tuple input);
void resetTimeout(Tuple input);
其實也 OutputCollector 只是把 ack 和 fail 方法囊括進去了,多了個超時重置配置,用法和 SpoutOutputCollector 基本相同。
然後重點看的是 execute 方法,這是個用作邏輯處理方法,你可以在這裡取得從 Spout 傳遞過來的 tuple,然後在 execute 中對其作你需要的業務實現。當然,如果你還想要向下繼續傳輸你的 tuple,那就得呼叫你在 prepare 方法中初始化好的 OutputCollector 物件,emit 你的 tuple(至於是否錨定還是看業務是否注重資料可靠)。
剛發現漏了說個重要的東西,Tuple,嗨呀好氣啊,補上補上:
Tuple 這個類,包含了你要傳輸的元組元資訊、內容以及操作方法,繼承自 ITuple,以下放一些方法(實在太多)
public GlobalStreamId getSourceGlobalStreamId();
public String getSourceComponent();
public int getSourceTask();
public MessageId getMessageId();
/**
* 判斷 tuple 是否包含該命名的 field
*/
public boolean contains(String field);
/**
* 通過位置引數返回 tuples 的 field(動態型別)
*/
public Object getValue(int i);
/**
* 通過位置引數返回 tuples 的 field(String 型別)
*/
public String getString(int i);
/**
* 通過命名返回 tuples 的 field(String 型別)
*/
public String getStringByField(String field);
這裡只是 Tuple 的一部分方法,很多實現其實都大同小異,可以返回各種上下文資訊,可以通過 tuples 的位置和命名(具體講 Stream 模式的時候再說)返回動態或已知型別的 field,也就是你傳遞的實際資料,順便說下,所謂 Value 其實就是個封裝 ArrayList 的類
public class Values extends ArrayList<Object>{
public Values() {
}
public Values(Object... vals) {
super(vals.length);
for(Object o: vals) {
add(o);
}
}
}
那麼 Spout 和 Bolt 基本的 API 介面分析就到這裡,接著說一個 Bolt 的擴充套件介面 IBasicBolt
public interface IBasicBolt extends IComponent {
void prepare(Map stormConf, TopologyContext context);
void execute(Tuple input, BasicOutputCollector collector);
void cleanup();
}
其實之前看懂了朋友這裡應該是很容易看明白的,BasicOutputCollector,這就是關鍵
public interface IBasicOutputCollector extends IErrorReporter{
List<Integer> emit(String streamId, List<Object> tuple);
void emitDirect(int taskId, String streamId, List<Object> tuple);
void resetTimeout(Tuple tuple);
}
IBasicOutputCollector 自己幫助你實現了 ack 機制的 emit,不需要你自己去寫,對於一些要求可靠性而且不復雜的業務 IBasicBolt 非常實用。
OK,本篇就到這裡,抽象類這裡我就不說了(沒啥說的)。其實 Spout 和 Bolt 的 API 還有一些功能性的封裝,像 ITransactionSpout、KafkaSpout之類的(本次專案時所用),各位可以自己去檢視原始碼,其實同樣是我說道的這些方法加上其各自的功能點,最多實現邏輯複雜些,還是能看明白的。