1. 程式人生 > >storm中的ack機制

storm中的ack機制

我們知道storm一個很重要的特性是它能夠保證你發出的每條訊息都會被完整處理, 完整處理的意思是指:

一個tuple被完全處理的意思是: 這個tuple以及由這個tuple所導致的所有的tuple都被成功處理。而一個tuple會被認為處理失敗瞭如果這個訊息在timeout所指定的時間內沒有成功處理。

也就是說對於任何一個spout-tuple以及它的所有子孫到底處理成功失敗與否我們都會得到通知。關於如果做到這一點的原理,可以看看Twitter Storm如何保證訊息不丟失這篇文章。從那篇文章裡面我們可以知道,storm裡面有個專門的acker來跟蹤所有tuple的完成情況。這篇文章就來討論acker的詳細工作流程。

原始碼列表

這篇文章涉及到的原始碼主要包括:

演算法簡介

acker對於tuple的跟蹤演算法是storm的主要突破之一, 這個演算法使得對於任意大的一個tuple樹, 它只需要恆定的20位元組就可以進行跟蹤了。原理很簡單:acker對於每個spout-tuple儲存一個ack-val的校驗值,它的初始值是0, 然後每發射一個tuple/ack一個tuple,那麼tuple的id都要跟這個校驗值異或一下,並且把得到的值更新為ack-val的新值。那麼假設每個發射出去的tuple都被ack了, 那麼最後ack-val一定是0(因為一個數字跟自己異或得到的值是0)。

進入正題

那麼下面我們從原始碼層面來看看哪些元件在哪些時候會給acker傳送什麼樣的訊息來共同完成這個演算法的。acker對訊息進行處理的主要是下面這塊程式碼:

幫助
01 02 03 04 05 06 07 08 09 10 11 (let [id (.getValue tuple 0) ^TimeCacheMap pending @pending curr (.get pending id) curr (condp = (.getSourceStreamId tuple)
ACKER-INIT-STREAM-ID (-> curr (update-ack id) (assoc :spout-task (.getValue tuple 1))) ACKER-ACK-STREAM-ID (update-ack curr (.getValue tuple 1)) ACKER-FAIL-STREAM-ID (assoc curr :failed true))] ...)

Spout建立一個新的tuple的時候給acker傳送訊息

訊息格式(看上面程式碼的第1行和第7行對於tuple.getValue()的呼叫)

幫助
1 (spout-tuple-id, task-id)

訊息的streamId是__ack_init(ACKER-INIT-STREAM-ID)

這是告訴acker, 一個新的spout-tuple出來了, 你跟蹤一下,它是由id為task-id的task建立的(這個task-id在後面會用來通知這個task:你的tuple處理成功了/失敗了)。處理完這個訊息之後, acker會在它的pending這個map(型別為TimeCacheMap)裡面新增這樣一條記錄:

幫助
1 {spout-tuple-id {:spout-task task-id :val ack-val)}

這就是acker對spout-tuple進行跟蹤的核心資料結構, 對於每個spout-tuple所產生的tuple樹的跟蹤都只需要儲存上面這條記錄。acker後面會檢查:val什麼時候變成0,變成0, 說明這個spout-tuple產生的tuple都處理完成了。

Bolt發射一個新tuple的時候會給acker傳送訊息麼?

任何一個bolt在發射一個新的tuple的時候,是不會直接通知acker的,如果這樣做的話那麼每發射一個訊息會有三條訊息了:

  1. Bolt建立這個tuple的時候,把它發給下一個bolt的訊息
  2. Bolt建立這個tuple的時候,傳送給acker的訊息
  3. ack tuple的時候傳送的ack訊息

事實上storm裡面只有第一條和第三條訊息,它把第二條訊息省掉了, 怎麼做到的呢?storm這點做得挺巧妙的,bolt在發射一個新的bolt的時候會把這個新tuple跟它的父tuple的關係儲存起來。然後在ack每個tuple的時候,storm會把要ack的tuple的id, 以及這個tuple新建立的所有的tuple的id的異或值傳送給acker。這樣就給每個tuple省掉了一個訊息(具體看下一節)。

Tuple被ack的時候給acker傳送訊息

每個tuple在被ack的時候,會給acker傳送一個訊息,訊息格式是:

幫助
1 (spout-tuple-id, tmp-ack-val)

訊息的streamId是__ack_ack(ACKER-ACK-STREAM-ID)

注意,這裡的tmp-ack-val是要ack的tuple的id與由它新建立的所有的tuple的id異或的結果:

相關推薦

Stormack機制

1.Storm的Bolt有BsicBolt和RichBolt:   在BasicBolt中,BasicOutputCollector在emit資料的時候,會自動和輸入的tuple相關聯,而在execute方法結束的時候那個輸入tuple會被自動ack。   使用RichBo

stormack機制(可靠性)

一  可靠性 簡介        Storm的可靠性是指Storm會告知使用者每一個訊息單元是否在一個指定的時間(timeout)內被完全處理。 完全處理的意思是該MessageId繫結的源Tuple以及由該源Tuple衍生的所有Tuple都經過了Topology中每一

協議設計ACK機制的影響

在TCP/IP中,延時ACK和Nagle演算法。 TCP為了同時處理成塊資料(通常為512位元組的使用者資料)和互動資料(通常使用者資料比較少,例如不大於10個位元組),採用了延時ACK和Nagle演算法來處理他們。 延時ACK: TCP在接收到資料的時候

stormack機制

我們知道storm一個很重要的特性是它能夠保證你發出的每條訊息都會被完整處理, 完整處理的意思是指: 一個tuple被完全處理的意思是: 這個tuple以及由這個tuple所導致的所有的tuple都被成功處理。而一個tuple會被認為處理失敗瞭如果這個訊息在

【Streaming】我在拓撲使用了Ack機制,為什麼在Storm UI上有大量Failed資料?

提問: 1. 在Storm UI上,有大量Failed資料,且往往是一旦開始有Fail資料,則Fail資料越積越多 2. 整體事件端到端延遲很大 分析: 當spout接收到大量資料,而後端bolt處理較慢,如果spout傳送的事件在超時時間(topology.mes

Storm的BaseBasicBolt原始碼解析ack機制

http://www.cnblogs.com/intsmaze/p/5924873.html 我們在學習ack機制的時候,我們知道Storm的Bolt有BaseBasicBolt和BaseRichBolt。在BaseBasicBolt中,BasicOutputCollect

ack是什麼,如何使用Ack機制,如何關閉Ack機制,基本實現,STORM的訊息容錯機制Ack機制

1、ack是什麼 ack 機制是storm整個技術體系中非常閃亮的一個創新點。 通過Ack機制,spout傳送出去的每一條訊息,都可以確定是被成功處理或失敗處理, 從而可以讓開發者採取動作。比如在Meta中,成功被處理,即可更新偏移量,當失敗時,重複傳送資料

stormack訊息不丟失機制

1:ack是什麼 ack 機制是storm整個技術體系中非常閃亮的一個創新點。 通過Ack機制,spout傳送出去的每一條訊息,都可以確定是被成功處理或失敗處理, 從而可以讓開發者採取動作。比如在Meta中,成功被處理,即可更新偏移量,當失敗時,重複傳送資料。 因此

storm ack機制流程詳解

第①步,spout發出一個tuple給下游,生成root id、tuple id、taskid(標記spout的本次任務,使用者acker bolt通過emitDirect方式傳送ack結果時使用,假設為222)。此時傳送給acker bolt的ack訊息為<root id = 111,ack valu

storm(二) 事務機制

導致 自然 ping white htm back ase cor 選擇 前言 為了保證tuple的強有序和exactly-once語義,storm提供了事務機制,為每個tuple提供一個id 設計方法1 為每個tuple設置一個事務id,在數據庫保存

JAVA反射機制五(JavaBean的內省與BeanUtils庫)

getc ron 輸出結果 下載 比較 static 完成 自動完成 規則   內省(Introspector) 是Java 語言對JavaBean類屬性、事件的一種缺省處理方法。   JavaBean是一種特殊的類,主要用於傳遞數據信息,這種類中的方法主要用於訪問私有的

JAVA反射機制六(java.lang.reflect包)

instance 檢查 item 類繼承 final win 基類 cte member 一、簡介 java.lang.reflect包提供了用於獲取類和對象的反射信息的類和接口。反射API允許對程序訪問有關加載類的字段,方法和構造函數的信息進行編程訪問。它允許在安全限制

Java反射機制詳解

turn face instance java struct () 分享 2.6 一個     序言       在學習java基礎時,由於學的不紮實,講的實用性不強,就覺得沒用,很多重要的知識就那樣一筆帶過了,像這個馬上要講的反射機制一樣,當時學的時候就忽略了,到後來學習

MXNetbucket機制註記

.org sse shape 沒有 sta ams origin done org Preface 之前看API以為bucket是一個根植於底層操作的接口(MXNet doc功不可沒 -_-|| )。從LSTM看過來,接觸到了一些相關的程序,後面再把bucketing_mo

StormTask數的設置與計算(1.0.1版本)

null 英文 如果 之間 one 很多 ask 其他 tar ==思考問題1== 向集群提交一個拓撲的時候,Storm是如何計算Task數以及Executor數的? ==思考問題2:== 構建拓撲的時候,有3個地方會影響task數,這3個地方之間有什麽關系? bui

Storm篇】--Storm並發機制

兩種 worker 行數 blog body hint mta ati pan 一、前述 為了提高Storm的並行能力,通常需要設置並行。 二、具體原理 1. Storm並行分為幾個方面: Worker – 進程一個Topology拓撲會包含一個或多個Worker(每個Wo

javaimport機制(指定import和import *的區別)

dem single boot 路徑 list http 簡單 至少 package 轉自:https://www.cnblogs.com/dtts/p/4692480.html java中有兩種包的導入機制,總結如下: 單類型導入(single-type-i

關於JavaScriptprototype機制的理解

func image 開始 auto com scrip pla 技術 研究   最近幾天一直在研究JavaScript中原型的機制,從開始的似懂非懂,到今天終於有所領悟。不敢說徹底理解,但是起碼算知道怎麽回事了。   為什麽一開始似懂非懂   開始了解一遍原型機制後,感覺

Storm的容錯機制

信息 完整 提交 nimbus storm 監控 zookeepe 將在 次數 任務級容錯 Bolt任務crash引起的消息未被應答。此時,acker中所有與此Bolt任務關聯的消息都會因為超時而失敗,對應的Spout的fail方法將被調用。 acker任務失敗。如果ack

storm運行jar產生模擬數據的時候,遇見的問題

data ssl error inf find file 遇見 ssd 發現 1.問題由來   命令:java -jar data.jar 1000 >>nginx.log   報錯:       Exception in thread "main" java.