1. 程式人生 > >【Storm總結-4】Storm 中acker的工作流程

【Storm總結-4】Storm 中acker的工作流程

概述

我們知道storm一個很重要的特性是它能夠保證你發出的每條訊息都會被完整處理, 完整處理的意思是指:

一個tuple被完全處理的意思是: 這個tuple以及由這個tuple所導致的所有的tuple都被成功處理。而一個tuple會被認為處理失敗瞭如果這個訊息在timeout所指定的時間內沒有成功處理。

也就是說對於任何一個spout-tuple以及它的所有子孫到底處理成功失敗與否我們都會得到通知。關於如果做到這一點的原理,可以看看Twitter Storm如何保證訊息不丟失這篇文章。從那篇文章裡面我們可以知道,storm裡面有個專門的acker來跟蹤所有tuple的完成情況。這篇文章就來討論acker的詳細工作流程。

原始碼列表

這篇文章涉及到的原始碼主要包括:

演算法簡介

acker對於tuple的跟蹤演算法是storm的主要突破之一, 這個演算法使得對於任意大的一個tuple樹, 它只需要恆定的20位元組就可以進行跟蹤了。原理很簡單:acker對於每個spout-tuple儲存一個ack-val的校驗值,它的初始值是0, 然後每發射一個tuple/ack一個tuple,那麼tuple的id都要跟這個校驗值異或一下,並且把得到的值更新為ack-val的新值。那麼假設每個發射出去的tuple都被ack了, 那麼最後ack-val一定是0(因為一個數字跟自己異或得到的值是0)。

進入正題

那麼下面我們從原始碼層面來看看哪些元件在哪些時候會給acker傳送什麼樣的訊息來共同完成這個演算法的

。acker對訊息進行處理的主要是下面這塊程式碼:

幫助
  1. (let [id (.getValue tuple 0)  
  2.    ^TimeCacheMap pending @pending
  3.    curr (.get pending id)  
  4.    curr (condp = (.getSourceStreamId tuple)  
  5.         ACKER-INIT-STREAM-ID (-> curr  
  6.                (update-ack id)  
  7.                (assoc :spout-task (.getValue tuple 1
    )))  
  8.         ACKER-ACK-STREAM-ID (update-ack  
  9.                          curr (.getValue tuple 1))  
  10.         ACKER-FAIL-STREAM-ID (assoc curr :failed true))]  
  11.             ...)  


Spout建立一個新的tuple的時候給acker傳送訊息

訊息格式(看上面程式碼的第1行和第7行對於tuple.getValue()的呼叫)

  1. (spout-tuple-id, task-id)  


訊息的streamId是__ack_init(ACKER-INIT-STREAM-ID)

這是告訴acker, 一個新的spout-tuple出來了, 你跟蹤一下,它是由id為task-id的task建立的(這個task-id在後面會用來通知這個task:你的tuple處理成功了/失敗了)。處理完這個訊息之後, acker會在它的pending這個map(型別為TimeCacheMap)裡面新增這樣一條記錄:

  1. {spout-tuple-id {:spout-tasktask-id :valack-val)}  
幫助

這就是acker對spout-tuple進行跟蹤的核心資料結構, 對於每個spout-tuple所產生的tuple樹的跟蹤都只需要儲存上面這條記錄。acker後面會檢查:val什麼時候變成0,變成0, 說明這個spout-tuple產生的tuple都處理完成了。

Bolt發射一個新tuple的時候會給acker傳送訊息麼?

任何一個bolt在發射一個新的tuple的時候,是不會直接通知acker的,如果這樣做的話那麼每發射一個訊息會有三條訊息了:

  1. Bolt建立這個tuple的時候,把它發給下一個bolt的訊息
  2. Bolt建立這個tuple的時候,傳送給acker的訊息
  3. ack tuple的時候傳送的ack訊息

事實上storm裡面只有第一條和第三條訊息,它把第二條訊息省掉了, 怎麼做到的呢?storm這點做得挺巧妙的,bolt在發射一個新的bolt的時候會把這個新tuple跟它的父tuple的關係儲存起來。然後在ack每個tuple的時候,storm會把要ack的tuple的id, 以及這個tuple新建立的所有的tuple的id的異或值傳送給acker。這樣就給每個tuple省掉了一個訊息(具體看下一節)。

Tuple被ack的時候給acker傳送訊息

每個tuple在被ack的時候,會給acker傳送一個訊息,訊息格式是:

  1. (spout-tuple-id, tmp-ack-val)  

訊息的streamId是__ack_ack(ACKER-ACK-STREAM-ID)

注意,這裡的tmp-ack-val是要ack的tuple的id與由它新建立的所有的tuple的id異或的結果:

  1. tuple-id ^ (child-tuple-id1 ^ child-tuple-id2 ... )  

我們可以從task.clj裡面的send-ack方法看出這一點:

幫助
  1. (defn- send-ack [^TopologyContext topology-context  
  2.                           ^Tuple input-tuple  
  3.                           ^List generated-ids send-fn]  
  4.   (let [ack-val (bit-xor-vals generated-ids)]  
  5.     (doseq [  
  6.       [anchor id] (.. input-tuple  
  7.                       getMessageId  
  8.                       getAnchorsToIds)]  
  9.       (send-fn (Tuple. topology-context  
  10.                  [anchor (bit-xor ack-val id)]  
  11.                  (.getThisTaskId topology-context)  
  12.                  ACKER-ACK-STREAM-ID))  
  13.       )))  


這裡面的generated-ids引數就是這個input-tuple的所有子tuple的id, 從程式碼可以看出storm會給這個tuple的每一個spout-tuple傳送一個ack訊息。

為什麼說這裡的generated-ids是input-tuple的子tuple呢? 這個send-ack是被OutputCollectorImpl裡面的ack方法呼叫的:

幫助
  1. publicvoid ack(Tuple input) {  
  2.     List generated = getExistingOutput(input);  
  3.     // don't just do this directly in case
  4.     // there was no output
  5.     _pendingAcks.remove(input);  
  6.     _collector.ack(input, generated);  
  7. }  

generated是由getExistingOutput(input)方法計算出來的, 我們再來看看這個方法的定義: 幫助
  1. private List getExistingOutput(Tuple anchor) {  
  2.     if(_pendingAcks.containsKey(anchor)) {  
  3.         return _pendingAcks.get(anchor);  
  4.     } else {  
  5.         List ret = new ArrayList();  
  6.         _pendingAcks.put(anchor, ret);  
  7.         return ret;  
  8.     }  
  9. }  


_pendingAcks裡面存的是什麼東西呢?

幫助
  1. private Tuple anchorTuple(Collection< Tuple > anchors,  
  2.                                 String streamId,  
  3.                                 List< Object > tuple) {  
  4.     // The simple algorithm in this function is the key
  5.     // to Storm. It is what enables Storm to guarantee
  6.     // message processing.
  7.     // 這個map存的東西是 spout-tuple-id到ack-val的對映
  8.     Map< Long, Long > anchorsToIds  
  9.                        = new HashMap<Long, Long>();  
  10.     // anchors 其實就是它的所有父親:spout-tuple
  11.     if(anchors!=null) {  
  12.         for(Tuple anchor: anchors) {  
  13.             long newId = MessageId.generateId();  
  14.             // 告訴每一個父親,你們又多了一個兒子了。
  15.             getExistingOutput(anchor).add(newId);  
  16.             for(long root: anchor.getMessageId()  
  17.                           .getAnchorsToIds().keySet()) {  
  18.                 Long curr = anchorsToIds.get(root);  
  19.                 if(curr == null) curr = 0L;  
  20.                 // 更新spout-tuple-id的ack-val
  21.                 anchorsToIds.put(root, curr ^ newId);  
  22.             }  
  23.         }  
  24.     }  
  25.     returnnew Tuple(_context, tuple,  
  26.                     _context.getThisTaskId(),  
  27.                     streamId,  
  28.                     MessageId.makeId(anchorsToIds));  
  29. }  

從上面程式碼裡面的紅色部分我們可以看出, _pendingAcks裡面維護的其實就是tuple到自己兒子的對應關係。

Tuple處理失敗的時候會給acker傳送失敗訊息

acker會忽略這種訊息的訊息內容(訊息的streamId為ACKER-FAIL-STREAM-ID), 直接將對應的spout-tuple標記為失敗(最上面程式碼第9行)

最後Acker發訊息通知spout-tuple對應的Worker

最後, acker會根據上面這些訊息的處理結果來通知這個spout-tuple對應的task:

幫助
  1. (when (and curr  
  2.           (:spout-task curr))  
  3.  (cond (= 0 (:val curr))  
  4.        ;; ack-val == 0 說明這個tuple的所有子孫都  
  5.        ;; 處理成功了(都發送ack訊息了)  
  6.        ;; 那麼傳送成功訊息通知建立這個spout-tuple的task.  
  7.        (do
  8.          (.remove pending id)  
  9.          (acker-emit-direct @output-collector  
  10.                             (:spout-task curr)  
  11.                             ACKER-ACK-STREAM-ID  
  12.                             [id]  
  13.                             ))  
  14.        ;; 如果這個spout-tuple處理失敗了  
  15.        ;; 傳送失敗訊息給建立這個spout-tuple的task  
  16.        (:failed curr)  
  17.        (do
  18.          (.remove pending id)  
  19.          (acker-emit-direct @output-collector  
  20.                             (:spout-task curr)  
  21.                             ACKER-FAIL-STREAM-ID  
  22.                             [id]  
  23.                             ))  
  24.        ))