Storm批處理事務詳解
1.為什麼需要批處理事務
在流式計算中,我們經常需要保證 exactly-once 語義。Storm的一個Spout在傳送資料後如果處理失敗,由於其ack/fail機制,我們可以得知是那一批資料處理失敗,從而重新發送資料進行處理,但是這時會有一個問題,有可能會重複處理了同一批資料,尤其在一些要求比較高的場景(比如支付場景),這樣會造成嚴重的後果,因此為了確保 exactly-once 語義,保證資料僅且被執行一次,在Storm 0.7.0之後引入了transactional topologies。
2.事務機制原理
對於只需要處理一次的場景,為了確保 exactly-once 語義,我們可以設計為每一個tuple設定一個tid進行標識,我們可以通過tid來判斷當前事務是否被執行過,所有比tid小的事務必須執行完畢。對於這種設計,如果我們需要連線資料判斷tid,那麼對於每一個tuple都必須和資料進行互動,消耗了大量的資源而且降低了處理速度;因此,我們可以以batch為處理單位,為每一批tuple設定一個tid。
雖然上面這種設定避免了資源的浪費,但是對於每一個batch都必須等待其他batch處理完畢,如下面topology:
為了提高並行度,storm採用了pipeline(管道)處理模型,將一個batch分為兩個階段,processing和commit階段。在processing階段,多個batch可以平行計算,在commit階段,必須按照強順序性執行,最後提交事務。
在使用Transactional Topologies的時候,Storm會做以下幾點:
- 管理狀態:Storm使用zookeeper來儲存事務狀態,包括一些tid以及元資料。
- 協調事務:Strom內部幫你管理一切事務的執行,如在任何一點是processing階段還是commit階段。
- 錯誤檢測:Storm利用acking框架高效的來處理失敗的事務,當事務失敗時會replay相應的batch,你不需要手動的進行acking或者anchoring。
- 內建的批處理API:Storm在普通的bolt之上封裝了一層API來提供對tuple事務的處理。Storm管理所有的協調工作,保證一個bolt什麼時候收到一個特定的transaction的所有tuple,同時也會清除每一個transaction產生的中間資料。
3.事務API的簡單介紹
與之前的Storm程式一樣,我們需要使用一個TopologyBuilder來設定Spout和Bolt。相應的事務topology使用TransactionalTopologyBuilder來設定。對於Spout我們可以實現ITransactionalSpout介面,在這個介面中包含兩個內部介面類,Coordinator和Emitter,ITransactionalSpout保證了相同的批處理事務必須傳送相同的tid。實際上,實現ITransactionalSpout介面的Spout是一個sub-topology,如下圖所示:
Coordinator開啟一個事務準備發射一個batch的時候,進入一個事務的processing階段,會發射一個事務tuple(包括transactionAttempt和metadata)到“batch emit”流中。(其中transactionAttempt包含"transaction id"和"attempt id","transaction id"對batch進行標識,"attempt id"唯一標識了對於同一batch發射不同的tuple;metadata中包含當前事務可以從當前哪個point進行重放資料,存放在zookeeper中,spout可以通過kryo從ZK中序列化和反序列化元資料)。
Emitter以all grouping(廣播)的方式訂閱coordinator的"batch emit",負責為每一個batch發射tuple。傳送的tuple都必須以transactionAttempt作為第一個field,storm會根據它來判斷髮送的tuple屬於哪一個batch。
其中Coordinator只有一個,Emitter可以通過並行度來設定。
對於普通的批處理Bolt可以實現IBatchBolt介面,對於事務Bolt可以實現BaseTransactionalBolt。在BaseTransactionalBolt介面中會繼承父類的幾個方法,如下:
void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, T id);
void execute(Tuple tuple);
void finishBatch();
在處理batch的時候,處理每一個tuple的時候都可以呼叫execute方法,而在整個batch處理完畢(processing階段完畢)的時候才能呼叫finishBatch方法。如果一個Bolt被標記為Committer,那麼只有在commit階段才能呼叫finshBatch方法。storm保證該bolt之前的所有bolt執行完畢。
下面有兩個問題,Storm如何保證之前的所有Bolt都執行完畢?怎樣將一個Bolt設定為Committer?
首先第一個問題,在bolt內部,有一個CoordinatedBolt模型,在CoordinatedBolt中記錄著兩個值,有哪些task給我傳送了tuple,我需要給哪些task傳送tuple。等所有的tuple傳送完畢之後,CoordinateBolt通過另外一個特殊的stream以emitDirect的方式告訴所有傳送過它tuple的task,它傳送了多少tuple給這個task,下游task會將這個數字和自己接受到的tuple數進行對比,如果相等,則表示處理完了所有tuple。下游的CoordinateBolt重複上述步驟,繼續通知其下游。
對於第二個問題,如何設定一個Bolt為Committer,總共有兩種方式。可以實現ICommitter介面來標識為Committer或者使用TransactionalTopologyBuilder 的setCommitterBolt 方法來設定一個Committer。
最後要提的是儘管Strom的批處理事務被標記為deprecated ,使用Trident框架來替代,但是理解了批處理事務的原理也是學習Trident框架的基礎和關鍵,因此,值得了解該部分內容。
歡迎加入大資料開發交流群 731423890: