【Storm總結-4】Storm 中acker的工作流程
概述
我們知道storm一個很重要的特性是它能夠保證你發出的每條訊息都會被完整處理, 完整處理的意思是指:
一個tuple被完全處理的意思是: 這個tuple以及由這個tuple所導致的所有的tuple都被成功處理。而一個tuple會被認為處理失敗瞭如果這個訊息在timeout所指定的時間內沒有成功處理。
也就是說對於任何一個spout-tuple以及它的所有子孫到底處理成功失敗與否我們都會得到通知。關於如果做到這一點的原理,可以看看Twitter Storm如何保證訊息不丟失這篇文章。從那篇文章裡面我們可以知道,storm裡面有個專門的acker來跟蹤所有tuple的完成情況。這篇文章就來討論acker的詳細工作流程。
原始碼列表
這篇文章涉及到的原始碼主要包括:
演算法簡介
acker對於tuple的跟蹤演算法是storm的主要突破之一, 這個演算法使得對於任意大的一個tuple樹, 它只需要恆定的20位元組就可以進行跟蹤了。原理很簡單:acker對於每個spout-tuple儲存一個ack-val的校驗值,它的初始值是0, 然後每發射一個tuple/ack一個tuple,那麼tuple的id都要跟這個校驗值異或一下,並且把得到的值更新為ack-val的新值。那麼假設每個發射出去的tuple都被ack了, 那麼最後ack-val一定是0(因為一個數字跟自己異或得到的值是0)。
進入正題
那麼下面我們從原始碼層面來看看哪些元件在哪些時候會給acker傳送什麼樣的訊息來共同完成這個演算法的
- (let [id (.getValue tuple 0)
- ^TimeCacheMap pending @pending
- curr (.get pending id)
- curr (condp = (.getSourceStreamId tuple)
- ACKER-INIT-STREAM-ID (-> curr
- (update-ack id)
-
(assoc :spout-task (.getValue tuple 1
- ACKER-ACK-STREAM-ID (update-ack
- curr (.getValue tuple 1))
- ACKER-FAIL-STREAM-ID (assoc curr :failed true))]
- ...)
Spout建立一個新的tuple的時候給acker傳送訊息
訊息格式(看上面程式碼的第1行和第7行對於tuple.getValue()
的呼叫)助
- (spout-tuple-id, task-id)
訊息的streamId是__ack_init(ACKER-INIT-STREAM-ID)
這是告訴acker, 一個新的spout-tuple出來了, 你跟蹤一下,它是由id為task-id的task建立的(這個task-id在後面會用來通知這個task:你的tuple處理成功了/失敗了)。處理完這個訊息之後, acker會在它的pending這個map(型別為TimeCacheMap)裡面新增這樣一條記錄:
- {spout-tuple-id {:spout-tasktask-id :valack-val)}
這就是acker對spout-tuple進行跟蹤的核心資料結構, 對於每個spout-tuple所產生的tuple樹的跟蹤都只需要儲存上面這條記錄。acker後面會檢查:val什麼時候變成0,變成0, 說明這個spout-tuple產生的tuple都處理完成了。
Bolt發射一個新tuple的時候會給acker傳送訊息麼?
任何一個bolt在發射一個新的tuple的時候,是不會直接通知acker的,如果這樣做的話那麼每發射一個訊息會有三條訊息了:
- Bolt建立這個tuple的時候,把它發給下一個bolt的訊息
-
Bolt建立這個tuple的時候,傳送給acker的訊息 - ack tuple的時候傳送的ack訊息
事實上storm裡面只有第一條和第三條訊息,它把第二條訊息省掉了, 怎麼做到的呢?storm這點做得挺巧妙的,bolt在發射一個新的bolt的時候會把這個新tuple跟它的父tuple的關係儲存起來。然後在ack每個tuple的時候,storm會把要ack的tuple的id, 以及這個tuple新建立的所有的tuple的id的異或值傳送給acker。這樣就給每個tuple省掉了一個訊息(具體看下一節)。
Tuple被ack的時候給acker傳送訊息
每個tuple在被ack的時候,會給acker傳送一個訊息,訊息格式是:助
- (spout-tuple-id, tmp-ack-val)
訊息的streamId是__ack_ack(ACKER-ACK-STREAM-ID)
注意,這裡的tmp-ack-val是要ack的tuple的id與由它新建立的所有的tuple的id異或的結果:
- tuple-id ^ (child-tuple-id1 ^ child-tuple-id2 ... )
我們可以從task.clj裡面的send-ack方法看出這一點:
幫助- (defn- send-ack [^TopologyContext topology-context
- ^Tuple input-tuple
- ^List generated-ids send-fn]
- (let [ack-val (bit-xor-vals generated-ids)]
- (doseq [
- [anchor id] (.. input-tuple
- getMessageId
- getAnchorsToIds)]
- (send-fn (Tuple. topology-context
- [anchor (bit-xor ack-val id)]
- (.getThisTaskId topology-context)
- ACKER-ACK-STREAM-ID))
- )))
這裡面的generated-ids
引數就是這個input-tuple的所有子tuple的id, 從程式碼可以看出storm會給這個tuple的每一個spout-tuple傳送一個ack訊息。
為什麼說這裡的generated-ids
是input-tuple的子tuple呢? 這個send-ack是被OutputCollectorImpl裡面的ack方法呼叫的:
- publicvoid ack(Tuple input) {
- List generated = getExistingOutput(input);
- // don't just do this directly in case
- // there was no output
- _pendingAcks.remove(input);
- _collector.ack(input, generated);
- }
generated是由
getExistingOutput(input)
方法計算出來的,
我們再來看看這個方法的定義:
幫助
- private List getExistingOutput(Tuple anchor) {
- if(_pendingAcks.containsKey(anchor)) {
- return _pendingAcks.get(anchor);
- } else {
- List ret = new ArrayList();
- _pendingAcks.put(anchor, ret);
- return ret;
- }
- }
_pendingAcks
裡面存的是什麼東西呢?
- private Tuple anchorTuple(Collection< Tuple > anchors,
- String streamId,
- List< Object > tuple) {
- // The simple algorithm in this function is the key
- // to Storm. It is what enables Storm to guarantee
- // message processing.
- // 這個map存的東西是 spout-tuple-id到ack-val的對映
- Map< Long, Long > anchorsToIds
- = new HashMap<Long, Long>();
- // anchors 其實就是它的所有父親:spout-tuple
- if(anchors!=null) {
- for(Tuple anchor: anchors) {
- long newId = MessageId.generateId();
- // 告訴每一個父親,你們又多了一個兒子了。
- getExistingOutput(anchor).add(newId);
- for(long root: anchor.getMessageId()
- .getAnchorsToIds().keySet()) {
- Long curr = anchorsToIds.get(root);
- if(curr == null) curr = 0L;
- // 更新spout-tuple-id的ack-val
- anchorsToIds.put(root, curr ^ newId);
- }
- }
- }
- returnnew Tuple(_context, tuple,
- _context.getThisTaskId(),
- streamId,
- MessageId.makeId(anchorsToIds));
- }
從上面程式碼裡面的紅色部分我們可以看出, _pendingAcks
裡面維護的其實就是tuple到自己兒子的對應關係。
Tuple處理失敗的時候會給acker傳送失敗訊息
acker會忽略這種訊息的訊息內容(訊息的streamId為ACKER-FAIL-STREAM-ID
), 直接將對應的spout-tuple標記為失敗(最上面程式碼第9行)
最後Acker發訊息通知spout-tuple對應的Worker
最後, acker會根據上面這些訊息的處理結果來通知這個spout-tuple對應的task:
幫助- (when (and curr
- (:spout-task curr))
- (cond (= 0 (:val curr))
- ;; ack-val == 0 說明這個tuple的所有子孫都
- ;; 處理成功了(都發送ack訊息了)
- ;; 那麼傳送成功訊息通知建立這個spout-tuple的task.
- (do
- (.remove pending id)
- (acker-emit-direct @output-collector
- (:spout-task curr)
- ACKER-ACK-STREAM-ID
- [id]
- ))
- ;; 如果這個spout-tuple處理失敗了
- ;; 傳送失敗訊息給建立這個spout-tuple的task
- (:failed curr)
- (do
- (.remove pending id)
- (acker-emit-direct @output-collector
- (:spout-task curr)
- ACKER-FAIL-STREAM-ID
- [id]
- ))
- ))