1. 程式人生 > >Storm 訊息容錯機制和通訊四

Storm 訊息容錯機制和通訊四

ack 是什麼

ack 機制是 storm 整個技術體系中非常閃亮的一個創新點。

通過 ack 機制,spout 傳送的每一條資訊,都可以確定是被成功處理或失敗處理,從而可以讓開發者採取行動。比如在meta中,成功被處理,即可更新偏移量,當失敗時,重複傳送處理。

因此,通過 ack 機制,很容易做到保證所有資料均被處理,一條不漏。

另外需要注意的, 當 spout 觸發 fail 動作時,不會自動重發失敗的 tuple ,需要 spout 自己重新獲取資料,重發一次。

ack 機制

spout 傳送每一條資訊 

  1. 在規定的時間內,spout 收到 Acker 的 ack 響應,即任務該 tuple 被後續 Bolt 成功 處理。
  2. 在規定的時間內,spout 未收到 Acker 的 ack 響應 tuple ,就出發 fail 動作,認為該 tuple 處理失敗。
  3. 或者收到 Acker 傳送的 fail 響應 tuple,也任務失敗,觸發 fail 動作。

另外Ack機制還常用於限流作用: 為了避免spout傳送資料太快,而bolt處理太慢,常常設定pending數,當spout有等於或超過pending數的tuple沒有收到ack或fail響應時,跳過執行nextTuple, 從而限制spout傳送資料。

通過conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, pending);設定spout pend數。

如何使用 Ack 機制

spout 在傳送資料的時候帶上 msgid

設定 acker 數至少大於 0; Config.setNumAckers(conf, ackerParal);

在 bolt 中完成處理 tuple 時,執行 OutputCollector.ack( tuple ),處理失敗時,請丟擲 FailedException,則自動執行 OutputCollector.fail( tuple );

如何關閉 Ack 機制

有兩種方式

  1. spout 傳送資料時不帶上 msgid
  2. 設定 acker 數等於 0 

基本實現

Storm 系統中有一組叫做 acker 的特殊任務,他們負責跟蹤 DAG (有向無環圖) 中的每個訊息。

acker 任務儲存了 spout id 的一堆值的對映。第一個值就是 spout 的任務 id ,通過這個id ,acker 就知道訊息處理完時該通知哪個 spout 任務 。第二個值是一個 64bit 的數字,我們稱之為 ack val ,它是 樹中所有訊息的隨機 id 的異或計算結果。

<TaskId,<RootId,ackValue>>

Spoutid,<系統生成的id,ackValue>

Task-0,64bit,0

ack val 表示了整棵樹的狀態,無論這棵樹多大,只需要這個固定大小的數字就可以跟蹤整棵樹。當訊息被建立和被應答的時候都會有相同的訊息 id 傳送過來做異或。每當acker 發現一棵樹的 ack val 值為0 的時候,他就知道這棵樹已經被完全處理了。

Storm 通訊機制

Worker之間的通訊經常需要通過網路跨節點進行,Storm使用 ZeroMQ 或 Netty 作為程序通訊的訊息框架。

Worker程序內部通訊:不同的 woker 的 thread 通訊通常使用 LMAX Disruptor 來完成。

Worker 程序間通訊

worker 程序間訊息傳遞機制,訊息的接受和處理大概流程如下圖:

對於 worker 程序來說,為了管理流入 和 流出 的訊息,每個 worker 程序有一個獨立的接收執行緒(對配置的TCP埠 supervisor.slots.ports 進行監聽 );

對應 Worker 接受執行緒,每個 worker 存在一個獨立的傳送執行緒,它負責從 worker 的的 transfer-queue 中讀取訊息,並通過網路傳送給其他worker; 

每個 executor 有自己的 incoming-queue 和 outgoing-queue。Worker 接受執行緒將收到的訊息通過 task 編號傳遞給對應的 executor(一個或多個)  incoming-queue;

每個 executor 有單獨的執行緒分別處理 spout/bolt 的業務邏輯,業務邏輯輸出的中間資料 會存放在 outgoing-queue 中,當 executor 的 outgoing-queue 中的 tuple 達到一定的閥值, executor 的傳送執行緒將批量獲取 outgoing-queue 中的 tuple ,併發送到 transfer-queue中。

每個 worker 程序控制一個或多個 executor 執行緒,使用者可在程式碼中進行配置。其實就是我們在程式碼中設定的併發度個數。

Worker 程序中通訊分析

http://images.cnitblog.com/blog/312753/201307/23153829-4ebd07b835ca46408f9e1b1f6282fe84.png

1:Worker 接受執行緒通過網路接受資料,並根據 Tuple 中包含的 taskId ,匹配到對應的 executor;然後根據 executor 找到對應的 incoming-queue,將資料傳送到 incoming-queue 佇列中。

2: 業務邏輯執行執行緒消費 incoming-queue 的資料,通過 blot 的 execute(***) 方法,將 Tuple 作為引數傳輸給使用者自定義的方法。

3:業務邏輯執行完畢之後,將計算的中間資料傳送給 outgoing-queue 佇列,當outgoing-queue 中的 tuple 達到一定的閾值,executor 的傳送執行緒將批量獲取 outgoing-queue 中的 tuple,併發送到 Worker 中的 transfer-queue 中。

4:Worker 傳送執行緒消費 transfer-queue 中的資料,計算 tuple 的目的地,連線不同的 node + port 將資料通過網路傳輸的方式送到另一個Worker.

5:另一個 worker 中進行以上步驟1 操作(開始重複)。