1. 程式人生 > >Storm Trident狀態

Storm Trident狀態

分享 機制 不知道 變化 stat prev 批次 更多 如果

  Trident中有對狀態數據進行讀取和寫入操作的一流抽象工具。狀態既可以保存在拓撲內部,比如保存在內容中並由HDFS存儲,也可以通過外部存儲(比如Memcached或Cassandra)存儲在數據庫中。而對於Trident的API而言,這兩種機制沒有任何區別。

  Trident以容錯的方式來管理狀態,當遇到重試或則錯誤時狀態的更新是冪等的,在數據統計分析中,冪等性是一個很重要的指標,因為它可以保證即使數據被處理了多次,但是站在結果的角度看和處理一次完全一樣。

  我們來看一個例子,假定你正在對一個流做計數處理,並把計算結果寫入到數據庫。如果在數據庫中使用一個值來表示這個計數,然後每處理一個tuple,就將這個計數值加1.當錯誤發生時,tuple會被重新處理。這就引發了一個問題,當進行狀態更新時,你完全不知道事前是否已經處理成功這個tuple。這樣就可能導致,原來處理過的tuple在這裏對應的存儲計數值任然加了1。

  當然,若數據庫中值存一個計數的話,是區分不出來這個tuple之前是否被正確處理的,這就需要更多的信息來支持。Trident提供了下面幾個原語來實現只處理一次的語義。

  1.tuples是被分成一組組小的集合來處理的。

  2.每一個batch會給分配一個唯一的id(事物id,txid),當batch被重新處理時,txid是不變的

  3.batch之間的狀態更新是嚴格有序的,就是說batch2沒有處理萬的情況下batch3絕對不會被處理

  有了這些原語就,在處理狀態更新的時候就能知道這個batch之前有沒有被處理過。然後采取合適的操作即可。下面我們看看每一種Spout類型都支持什麽樣的容錯級別。

事務性Spout

  Trident是以batch的方式來處理tuple的,同時每個batch會分配一個唯一的transaction id,Spout的特性根據他們所提供容錯性保證機制來決定的,而且這種機制也會對每個批次發生作用。事務型Spout有如下特性:

  1.一個batch無論被重發多少次,只有一個唯一且相同的事物id,同事所包含的tuple都是完全一致的

  2.每個tuple必須且至多屬於一個batch

  事務性Spout很容易理解,但是在極端的情況下也會有一些問題。假設一批消息在被Bolt消費的過程中失敗了,需要Spout重發,這時,如果剛好消息發送的中間件故障,Spout為了保證重發的時候每個batch包含的tuple一致,就智能等待消息中間件恢復了,整個處理就這樣阻塞了。

  來看一個例子

  設計一個計算WordCount的Topology,將單詞的出現的次數以KV的形式存儲到數據庫中。Key就是單詞,V 對應單詞出現的次數。可以將Value和事物ID一起存儲到數據庫中。每次更新Value前,先將當前的事物ID和數據庫中存儲的事物ID進行比較。如果一樣就忽略,否則執行存儲操作。例如下面的一個batch

batch(事物ID為3)
1.["man"]
2.["man"]
3.["dog"]

  數據庫中保存的信息如下:

1.man =>  [count=2,txid=1]
2.dog =>  [count=4,txid=3]
3.apple =>  [count=10,txid=2]

  單詞"man"對應的txid為3,而當前的txid為1,可以確定沒有為這個batch中的tuple更新過這個單詞的數量,所以直接更新txid為3,而dog對應的txid和當前的txid相同忽略更新,單詞apple保持不變,更新後的數據如下:

1.man =>  [count=4,txid=3]
2.dog =>  [count=4,txid=3]
3.apple =>  [count=10,txid=2]

不透明事務性Spout

  不透明事務型Spout不能保證相同txid對應的批次中的元組數據完全一致,有一下特性

  tuple只在一個batch中被成功處理,如果tuple在一個batch中被處理失敗,有可能會在另一個batch中被成功處理。也就是說一個tuple第一次在txid為2的batch中出現,以後有可能在txid為4的batch中再次出現。

  不透明事務性Spout有很好的容錯性,但是需要額外的存儲空間。出了Value和txid,你還需要在數據庫中存儲之前的數據,我們還以數據庫中存儲計數為例。假設當前數據庫中存儲的信息如下:

{
    value=4,
    preValue=1,
    txid=2
}

  下一次batch的txid為3,計數值為2,和數據庫中的txid不同這種情況下將value中的值放入到preValue中,新增的值加到Value上去,更新後的數據庫信息如下:

{
    value=6,
    preValue=4,
    txid=3
}

  如果當前batch的txid任然為2,與數據庫中存儲的相同,怎麽操作呢?我們知道,數據庫中的value值是通過與這次的txid相同的上個batch更新而來的,但是batch可能已經變化了所以我們要忽略它,這種情況下需要做的就是更新value的值為prevalue加上本次的batch值,結果應該是這樣的

{
    value=3,
    preValue=1,
    txid=2
}

  此方式的正確定是基於Trident保證了batch的強順序性。Trident處理一個batch時,一定不會重復或則回溯到之前的batch。每個tuple只會在一個batch中被成功處理,所以更新是原子的。

非事物型Spout

  非事務型Spout不能為批次提供任何保證。所以可能出現”至多一次”的處理,即在某個批次處理過程中失敗了,但是不會在重新處理;也可能提供“至少一次”的處理,即可能會有多個批次分別處理某個元組。也就是沒有辦法實現“恰好一次”的語義。

Spout和State類型小結

  下面是不同的spout/狀態組合是否支持“恰好一次”處理語義:

  技術分享圖片

  

  不透明事務狀態有最強的容錯性,但是因為存儲txid和兩個結果帶來更大的開銷。事務型狀態只需要存儲一個狀態結果,但是只對事務型Spout有效。非事務型狀態要求存儲的數據更少,但是不能實現“恰好一次”的處理語義。所以在選擇容錯與存儲空間中,需要根據具體的需要選擇合適的組合。

    

Storm Trident狀態