Storm Trident狀態
Trident中有對狀態數據進行讀取和寫入操作的一流抽象工具。狀態既可以保存在拓撲內部,比如保存在內容中並由HDFS存儲,也可以通過外部存儲(比如Memcached或Cassandra)存儲在數據庫中。而對於Trident的API而言,這兩種機制沒有任何區別。
Trident以容錯的方式來管理狀態,當遇到重試或則錯誤時狀態的更新是冪等的,在數據統計分析中,冪等性是一個很重要的指標,因為它可以保證即使數據被處理了多次,但是站在結果的角度看和處理一次完全一樣。
我們來看一個例子,假定你正在對一個流做計數處理,並把計算結果寫入到數據庫。如果在數據庫中使用一個值來表示這個計數,然後每處理一個tuple,就將這個計數值加1.當錯誤發生時,tuple會被重新處理。這就引發了一個問題,當進行狀態更新時,你完全不知道事前是否已經處理成功這個tuple。這樣就可能導致,原來處理過的tuple在這裏對應的存儲計數值任然加了1。
當然,若數據庫中值存一個計數的話,是區分不出來這個tuple之前是否被正確處理的,這就需要更多的信息來支持。Trident提供了下面幾個原語來實現只處理一次的語義。
1.tuples是被分成一組組小的集合來處理的。
2.每一個batch會給分配一個唯一的id(事物id,txid),當batch被重新處理時,txid是不變的
3.batch之間的狀態更新是嚴格有序的,就是說batch2沒有處理萬的情況下batch3絕對不會被處理
有了這些原語就,在處理狀態更新的時候就能知道這個batch之前有沒有被處理過。然後采取合適的操作即可。下面我們看看每一種Spout類型都支持什麽樣的容錯級別。
事務性Spout
Trident是以batch的方式來處理tuple的,同時每個batch會分配一個唯一的transaction id,Spout的特性根據他們所提供容錯性保證機制來決定的,而且這種機制也會對每個批次發生作用。事務型Spout有如下特性:
1.一個batch無論被重發多少次,只有一個唯一且相同的事物id,同事所包含的tuple都是完全一致的
2.每個tuple必須且至多屬於一個batch
事務性Spout很容易理解,但是在極端的情況下也會有一些問題。假設一批消息在被Bolt消費的過程中失敗了,需要Spout重發,這時,如果剛好消息發送的中間件故障,Spout為了保證重發的時候每個batch包含的tuple一致,就智能等待消息中間件恢復了,整個處理就這樣阻塞了。
來看一個例子
設計一個計算WordCount的Topology,將單詞的出現的次數以KV的形式存儲到數據庫中。Key就是單詞,V 對應單詞出現的次數。可以將Value和事物ID一起存儲到數據庫中。每次更新Value前,先將當前的事物ID和數據庫中存儲的事物ID進行比較。如果一樣就忽略,否則執行存儲操作。例如下面的一個batch
batch(事物ID為3) 1.["man"] 2.["man"] 3.["dog"]
數據庫中保存的信息如下:
1.man => [count=2,txid=1] 2.dog => [count=4,txid=3] 3.apple => [count=10,txid=2]
單詞"man"對應的txid為3,而當前的txid為1,可以確定沒有為這個batch中的tuple更新過這個單詞的數量,所以直接更新txid為3,而dog對應的txid和當前的txid相同忽略更新,單詞apple保持不變,更新後的數據如下:
1.man => [count=4,txid=3] 2.dog => [count=4,txid=3] 3.apple => [count=10,txid=2]
不透明事務性Spout
不透明事務型Spout不能保證相同txid對應的批次中的元組數據完全一致,有一下特性
tuple只在一個batch中被成功處理,如果tuple在一個batch中被處理失敗,有可能會在另一個batch中被成功處理。也就是說一個tuple第一次在txid為2的batch中出現,以後有可能在txid為4的batch中再次出現。
不透明事務性Spout有很好的容錯性,但是需要額外的存儲空間。出了Value和txid,你還需要在數據庫中存儲之前的數據,我們還以數據庫中存儲計數為例。假設當前數據庫中存儲的信息如下:
{ value=4, preValue=1, txid=2 }
下一次batch的txid為3,計數值為2,和數據庫中的txid不同這種情況下將value中的值放入到preValue中,新增的值加到Value上去,更新後的數據庫信息如下:
{ value=6, preValue=4, txid=3 }
如果當前batch的txid任然為2,與數據庫中存儲的相同,怎麽操作呢?我們知道,數據庫中的value值是通過與這次的txid相同的上個batch更新而來的,但是batch可能已經變化了所以我們要忽略它,這種情況下需要做的就是更新value的值為prevalue加上本次的batch值,結果應該是這樣的
{ value=3, preValue=1, txid=2 }
此方式的正確定是基於Trident保證了batch的強順序性。Trident處理一個batch時,一定不會重復或則回溯到之前的batch。每個tuple只會在一個batch中被成功處理,所以更新是原子的。
非事物型Spout
非事務型Spout不能為批次提供任何保證。所以可能出現”至多一次”的處理,即在某個批次處理過程中失敗了,但是不會在重新處理;也可能提供“至少一次”的處理,即可能會有多個批次分別處理某個元組。也就是沒有辦法實現“恰好一次”的語義。
Spout和State類型小結
下面是不同的spout/狀態組合是否支持“恰好一次”處理語義:
不透明事務狀態有最強的容錯性,但是因為存儲txid和兩個結果帶來更大的開銷。事務型狀態只需要存儲一個狀態結果,但是只對事務型Spout有效。非事務型狀態要求存儲的數據更少,但是不能實現“恰好一次”的處理語義。所以在選擇容錯與存儲空間中,需要根據具體的需要選擇合適的組合。
Storm Trident狀態