1. 程式人生 > >Storm之——批處理事務原理(基於Storm0.9.x)

Storm之——批處理事務原理(基於Storm0.9.x)

事務-批處理

對於容錯機制,Storm通過一個系統級別的元件acker,結合xor校驗機制判斷一個tuple是否傳送成功,進而spout可以重發該tuple ,保證一個tuple在出錯的情況下至少被重發一次。
但是在需要精確統計tuple的數量如銷售金額場景時,希望每個tuple”被且僅被處理一次” 。Storm 0.7.0引入了Transactional Topology, 它可以保證每個tuple”被且僅被處理一次”, 這樣我們就可以實現一種非常準確,且高度容錯方式來實現計數類應用。逐個處理單個tuple,增加很多開銷,如寫庫、輸出結果頻率過高事務處理單個tuple效率比較低,因此storm中引入batch處理,批處理是一次性處理一批(batch)tuple,事務可確保該批次要麼全部處理成功,如果有處理失敗的則全部不計,Storm會對失敗的批次重新發送,且確保每個batch被且僅被處理一次。

事務機制原理

對於只處理一次的需要,從原理上來講,需要在傳送tuple的時候帶上txid,在需要事務處理的時候,根據該txid是否以前已經處理成功來決定是否進行處理,當然需要把txid和處理結果一起做儲存。
在事務batch處理中,一批tuple賦予一個txid,為了提高batch之間處理的並行度,storm採用了pipeline(管道)處理模型,這樣多個事務可以並行執行,但是commit的是嚴格按照順序的。
Storm事務處理中,把一個batch的計算分成了兩個階段processing和commit階段:
Processing 階段:多個batch可以平行計算;
Commiting階段:batch之間強制按照順序進行提交

事務topo

Processing 階段:多個batch可以平行計算,上面例子中bolt2是普通的batchbolt(實現IBatchBolt),那麼多個batch在 bolt2的task之間可以並行執行.
Commiting階段:batch之間強制按照順序進行提交,上圖中Bolt3實現IBatchBolt並且標記需要事務處理的(實現了ICommitter介面或者通過TransactionalTopologyBuilder的setCommitterBolt方法把BatchBolt新增到topology裡面),那麼在Storm認為可以提交batch的時候呼叫 finishbatch,在finishBatch做txid的比較以及狀態儲存工作。例子中batch2必須等待batch1提交後,才可以進行提交。

事務Topologies

使用Transactional Topologies的時候, storm為你做下面這些事情:
管理狀態: Storm把所有實現Transactional Topologies所必須的狀態儲存在zookeeper裡面,包括當前transaction id及定義每個batch的一些元資料。
協調事務: Storm幫你管理所有事情,如幫你決定在任何一個時間點是該proccessing    還是該committing。
錯誤檢測: Storm利用acking框架來高效地檢測什麼時候一個batch被成功處理了,被成功提交了,或者失敗了。Storm然後會相應地replay對應的batch。你不需要自己手    動做任何acking或者anchoring (emit時發生的動作)。
內建的批處理API: Storm在普通bolt之上包裝了一層API來提供對tuple的批處理支援。Storm管理所有的協調工作,包括決定什麼時候一個bolt接收到一個特定transaction的所有tuple。Storm同時也會自動清理每個transaction所產生的中間資料。

事務Topology實現

事務性的spout需要實現ITransactionalSpout,這個介面包含兩個內部介面類Coordinator和Emitter。在topology執行的時候,事務性的spout內部包含一個子Topology,如下圖:

這裡面有兩種型別的tuple,一種是事務性的tuple,一種是batch中的tuple;
coordinator 開啟一個事務準備發射一個batch時候,進入一個事務的processing階段,會發射一個事務性 tuple(transactionAttempt & metadata)到”batch emit”流,Emitter以all grouping(廣播)的方式訂閱coordinator的”batch emit”流,負責為每個batch實際發射tuple。傳送的tuple都必須以TransactionAttempt作為第一個field,storm根據這個field來判斷tuple屬於哪一個batch。coordinator只有一個,emitter根據並行度可以有多個例項

TransactionAttempt和元資料

TransactionAttempt包含兩個值:一個transaction id,一個attempt id。transaction id的作用就是我們上面介紹的對於每個batch中的tuple是唯一的,而且不管這個batch replay多少次都是一樣的。attempt id是對於每個batch唯一的一個id, 但是對於同一個batch,它replay之後的attempt id跟replay之前就不一樣了,我們可以把attempt id理解成replay-times, storm利用這個id來區別一個batch發射的tuple的不同版本。
metadata(元資料)中包含當前事務可以從哪個point進行重放資料,存放在zookeeper中的,spout可以通過Kryo從zookeeper中序列化和反序列化該元資料。

事務性Bolt

BaseTransactionalBolt
處理batch在一起的tuples,對於每一個tuple呼叫呼叫execute方 法,而在整個batch處理(processing)完成的時候呼叫finishBatch方法。如果BatchBolt被標記成Committer,則 只能在commit階段呼叫finishBatch方法。一個batch的commit階段由storm保證只在前一個batch成功提交之後才會執行。並且它會重試直到topology裡面的所有bolt在commit完成提交。那麼如何知道batch的processing完成了,也就是bolt是否接收處理了batch裡面所有的tuple;在bolt內部,有一個 CoordinatedBolt的模型。

CoordinateBolt

每個CoordinateBolt記錄兩個值:有哪些task給我傳送了tuple(根據topology的grouping資訊);我要給哪些task傳送資訊(同樣根據groping資訊)。
等所有的tuple都發送完了之後,CoordinateBolt通過另外一個特殊的stream以emitDirect的方式告訴所有它傳送過 tuple的task,它傳送了多少tuple給這個task。下游task會將這個數字和自己已經接收到的tuple數量做對比,如果相等,則說明處理 完了所有的tuple。
下游CoordinateBolt會重複上面的步驟,通知其下游