storm中的ack機制
我們知道storm一個很重要的特性是它能夠保證你發出的每條訊息都會被完整處理, 完整處理的意思是指:
一個tuple被完全處理的意思是: 這個tuple以及由這個tuple所導致的所有的tuple都被成功處理。而一個tuple會被認為處理失敗瞭如果這個訊息在timeout所指定的時間內沒有成功處理。
也就是說對於任何一個spout-tuple以及它的所有子孫到底處理成功失敗與否我們都會得到通知。關於如果做到這一點的原理,可以看看Twitter Storm如何保證訊息不丟失這篇文章。從那篇文章裡面我們可以知道,storm裡面有個專門的acker來跟蹤所有tuple的完成情況。這篇文章就來討論acker的詳細工作流程。
原始碼列表
這篇文章涉及到的原始碼主要包括:
演算法簡介
acker對於tuple的跟蹤演算法是storm的主要突破之一, 這個演算法使得對於任意大的一個tuple樹, 它只需要恆定的20位元組就可以進行跟蹤了。原理很簡單:acker對於每個spout-tuple儲存一個ack-val的校驗值,它的初始值是0, 然後每發射一個tuple/ack一個tuple,那麼tuple的id都要跟這個校驗值異或一下,並且把得到的值更新為ack-val的新值。那麼假設每個發射出去的tuple都被ack了, 那麼最後ack-val一定是0(因為一個數字跟自己異或得到的值是0)。
進入正題
那麼下面我們從原始碼層面來看看哪些元件在哪些時候會給acker傳送什麼樣的訊息來共同完成這個演算法的。acker對訊息進行處理的主要是下面這塊程式碼:
幫助01 02 03 04 05 06 07 08 09 10 11 |
( let
[ id
(.getValue tuple 0)
^TimeCacheMap
pending @pending
curr
(.get pending id)
curr
(condp = (.getSourceStreamId tuple) ACKER-INIT-STREAM-ID
(-> curr
(update-ack
id)
(assoc
:spout-task
(.getValue tuple 1)))
ACKER-ACK-STREAM-ID
(update-ack
curr
(.getValue tuple 1))
ACKER-FAIL-STREAM-ID
(assoc curr :failed
true)) ]
...)
|
Spout建立一個新的tuple的時候給acker傳送訊息
訊息格式(看上面程式碼的第1行和第7行對於tuple.getValue()
的呼叫)
1 |
(spout-tuple-id,
task-id)
|
訊息的streamId是__ack_init(ACKER-INIT-STREAM-ID)
這是告訴acker, 一個新的spout-tuple出來了, 你跟蹤一下,它是由id為task-id的task建立的(這個task-id在後面會用來通知這個task:你的tuple處理成功了/失敗了)。處理完這個訊息之後, acker會在它的pending這個map(型別為TimeCacheMap)裡面新增這樣一條記錄:
幫助1 |
{spout-tuple-id
{ :spout-task
task-id :val
ack-val)}
|
這就是acker對spout-tuple進行跟蹤的核心資料結構, 對於每個spout-tuple所產生的tuple樹的跟蹤都只需要儲存上面這條記錄。acker後面會檢查:val什麼時候變成0,變成0, 說明這個spout-tuple產生的tuple都處理完成了。
Bolt發射一個新tuple的時候會給acker傳送訊息麼?
任何一個bolt在發射一個新的tuple的時候,是不會直接通知acker的,如果這樣做的話那麼每發射一個訊息會有三條訊息了:
- Bolt建立這個tuple的時候,把它發給下一個bolt的訊息
-
Bolt建立這個tuple的時候,傳送給acker的訊息 - ack tuple的時候傳送的ack訊息
事實上storm裡面只有第一條和第三條訊息,它把第二條訊息省掉了, 怎麼做到的呢?storm這點做得挺巧妙的,bolt在發射一個新的bolt的時候會把這個新tuple跟它的父tuple的關係儲存起來。然後在ack每個tuple的時候,storm會把要ack的tuple的id, 以及這個tuple新建立的所有的tuple的id的異或值傳送給acker。這樣就給每個tuple省掉了一個訊息(具體看下一節)。
Tuple被ack的時候給acker傳送訊息
每個tuple在被ack的時候,會給acker傳送一個訊息,訊息格式是:
幫助1 |
(spout-tuple-id,
tmp-ack-val)
|
訊息的streamId是__ack_ack(ACKER-ACK-STREAM-ID)
注意,這裡的tmp-ack-val是要ack的tuple的id與由它新建立的所有的tuple的id異或的結果:
相關推薦
Storm的ack機制
1.Storm的Bolt有BsicBolt和RichBolt: 在BasicBolt中,BasicOutputCollector在emit資料的時候,會自動和輸入的tuple相關聯,而在execute方法結束的時候那個輸入tuple會被自動ack。 使用RichBo
storm 的ack機制(可靠性)
一 可靠性 簡介 Storm的可靠性是指Storm會告知使用者每一個訊息單元是否在一個指定的時間(timeout)內被完全處理。 完全處理的意思是該MessageId繫結的源Tuple以及由該源Tuple衍生的所有Tuple都經過了Topology中每一
協議設計中ACK機制的影響
在TCP/IP中,延時ACK和Nagle演算法。 TCP為了同時處理成塊資料(通常為512位元組的使用者資料)和互動資料(通常使用者資料比較少,例如不大於10個位元組),採用了延時ACK和Nagle演算法來處理他們。 延時ACK: TCP在接收到資料的時候
storm中的ack機制
我們知道storm一個很重要的特性是它能夠保證你發出的每條訊息都會被完整處理, 完整處理的意思是指: 一個tuple被完全處理的意思是: 這個tuple以及由這個tuple所導致的所有的tuple都被成功處理。而一個tuple會被認為處理失敗瞭如果這個訊息在
【Streaming】我在拓撲中使用了Ack機制,為什麼在Storm UI上有大量Failed資料?
提問: 1. 在Storm UI上,有大量Failed資料,且往往是一旦開始有Fail資料,則Fail資料越積越多 2. 整體事件端到端延遲很大 分析: 當spout接收到大量資料,而後端bolt處理較慢,如果spout傳送的事件在超時時間(topology.mes
Storm的BaseBasicBolt原始碼解析ack機制
http://www.cnblogs.com/intsmaze/p/5924873.html 我們在學習ack機制的時候,我們知道Storm的Bolt有BaseBasicBolt和BaseRichBolt。在BaseBasicBolt中,BasicOutputCollect
ack是什麼,如何使用Ack機制,如何關閉Ack機制,基本實現,STORM的訊息容錯機制,Ack機制
1、ack是什麼 ack 機制是storm整個技術體系中非常閃亮的一個創新點。 通過Ack機制,spout傳送出去的每一條訊息,都可以確定是被成功處理或失敗處理, 從而可以讓開發者採取動作。比如在Meta中,成功被處理,即可更新偏移量,當失敗時,重複傳送資料
storm的ack訊息不丟失機制
1:ack是什麼 ack 機制是storm整個技術體系中非常閃亮的一個創新點。 通過Ack機制,spout傳送出去的每一條訊息,都可以確定是被成功處理或失敗處理, 從而可以讓開發者採取動作。比如在Meta中,成功被處理,即可更新偏移量,當失敗時,重複傳送資料。 因此
storm ack機制流程詳解
第①步,spout發出一個tuple給下游,生成root id、tuple id、taskid(標記spout的本次任務,使用者acker bolt通過emitDirect方式傳送ack結果時使用,假設為222)。此時傳送給acker bolt的ack訊息為<root id = 111,ack valu
storm(二) 事務機制
導致 自然 ping white htm back ase cor 選擇 前言 為了保證tuple的強有序和exactly-once語義,storm提供了事務機制,為每個tuple提供一個id 設計方法1 為每個tuple設置一個事務id,在數據庫保存
JAVA中反射機制五(JavaBean的內省與BeanUtils庫)
getc ron 輸出結果 下載 比較 static 完成 自動完成 規則 內省(Introspector) 是Java 語言對JavaBean類屬性、事件的一種缺省處理方法。 JavaBean是一種特殊的類,主要用於傳遞數據信息,這種類中的方法主要用於訪問私有的
JAVA中反射機制六(java.lang.reflect包)
instance 檢查 item 類繼承 final win 基類 cte member 一、簡介 java.lang.reflect包提供了用於獲取類和對象的反射信息的類和接口。反射API允許對程序訪問有關加載類的字段,方法和構造函數的信息進行編程訪問。它允許在安全限制
Java中反射機制詳解
turn face instance java struct () 分享 2.6 一個 序言 在學習java基礎時,由於學的不紮實,講的實用性不強,就覺得沒用,很多重要的知識就那樣一筆帶過了,像這個馬上要講的反射機制一樣,當時學的時候就忽略了,到後來學習
MXNet中bucket機制註記
.org sse shape 沒有 sta ams origin done org Preface 之前看API以為bucket是一個根植於底層操作的接口(MXNet doc功不可沒 -_-|| )。從LSTM看過來,接觸到了一些相關的程序,後面再把bucketing_mo
Storm中Task數的設置與計算(1.0.1版本)
null 英文 如果 之間 one 很多 ask 其他 tar ==思考問題1== 向集群提交一個拓撲的時候,Storm是如何計算Task數以及Executor數的? ==思考問題2:== 構建拓撲的時候,有3個地方會影響task數,這3個地方之間有什麽關系? bui
【Storm篇】--Storm並發機制
兩種 worker 行數 blog body hint mta ati pan 一、前述 為了提高Storm的並行能力,通常需要設置並行。 二、具體原理 1. Storm並行分為幾個方面: Worker – 進程一個Topology拓撲會包含一個或多個Worker(每個Wo
java中import機制(指定import和import *的區別)
dem single boot 路徑 list http 簡單 至少 package 轉自:https://www.cnblogs.com/dtts/p/4692480.html java中有兩種包的導入機制,總結如下: 單類型導入(single-type-i
關於JavaScript中prototype機制的理解
func image 開始 auto com scrip pla 技術 研究 最近幾天一直在研究JavaScript中原型的機制,從開始的似懂非懂,到今天終於有所領悟。不敢說徹底理解,但是起碼算知道怎麽回事了。 為什麽一開始似懂非懂 開始了解一遍原型機制後,感覺
Storm的容錯機制
信息 完整 提交 nimbus storm 監控 zookeepe 將在 次數 任務級容錯 Bolt任務crash引起的消息未被應答。此時,acker中所有與此Bolt任務關聯的消息都會因為超時而失敗,對應的Spout的fail方法將被調用。 acker任務失敗。如果ack
在storm中運行jar產生模擬數據的時候,遇見的問題
data ssl error inf find file 遇見 ssd 發現 1.問題由來 命令:java -jar data.jar 1000 >>nginx.log 報錯: Exception in thread "main" java.