1. 程式人生 > 實用技巧 >Storm(六)Storm的原理機制

Storm(六)Storm的原理機制

一.Storm的資料分發策略

1. Shuffle Grouping

隨機分組,隨機派發stream裡面的tuple,保證每個bolt task接收到的tuple數目大致相同。 輪詢,平均分配

2. Fields Grouping

按欄位分組,比如,按"user-id"這個欄位來分組,那麼具有同樣"user-id"的 tuple 會被分到相同的Bolt裡的一個task, 而不同的"user-id"則可能會被分配到不同的task。

3. All Grouping

廣播發送,對於每一個tuple,所有的bolts都會收到

4. Global Grouping

全域性分組,把tuple分配給task id最低的task 。

5. None Grouping

不分組,這個分組的意思是說stream不關心到底怎樣分組。目前這種分組和Shuffle grouping是一樣的效果。 有一點不同的是storm會把使用none grouping的這個bolt放到這個bolt的訂閱者同一個執行緒裡面去執行(未來Storm如果可能的話會這樣設計)。

6. Direct Grouping

指向型分組, 這是一種比較特別的分組方法,用這種分組意味著訊息(tuple)的傳送者指定由訊息接收者的哪個task處理這個訊息。只有被宣告為 Direct Stream 的訊息流可以宣告這種分組方法。而且這種訊息tuple必須使用 emitDirect 方法來發射。訊息處理者可以通過 TopologyContext 來獲取處理它的訊息的task的id (OutputCollector.emit方法也會返回task的id)

7. Local or shuffle grouping

本地或隨機分組。如果目標bolt有一個或者多個task與源bolt的task在同一個工作程序中,tuple將會被隨機發送給這些同進程中的tasks。否則,和普通的Shuffle Grouping行為一致

8.customGrouping

自定義,相當於mapreduce那裡自己去實現一個partition一樣。

二.Storm的併發機制

Worker – 程序

一個Topology拓撲會包含一個或多個Worker(每個Worker程序只能從屬於一個特定的Topology) 這些Worker程序會並行跑在叢集中不同的伺服器上,即一個Topology拓撲其實是由並行執行在Storm叢集中多臺伺服器上的程序所組成

Executor – 執行緒

Executor是由Worker程序中生成的一個執行緒 每個Worker程序中會執行拓撲當中的一個或多個Executor執行緒 一個Executor執行緒中可以執行一個或多個Task任務(預設每個Executor只執行一個Task任務),但是這些Task任務都是對應著同一個元件(Spout、Bolt)。

Task

實際執行資料處理的最小單元 每個task即為一個Spout或者一個Bolt Task數量在整個Topology生命週期中保持不變,Executor數量可以變化或手動調整 (預設情況下,Task數量和Executor是相同的,即每個Executor執行緒中預設執行一個Task任務)

設定Worker程序數

Config.setNumWorkers(int workers)

設定Executor執行緒數

TopologyBuilder.setSpout(String id, IRichSpout spout, Number parallelism_hint) ,TopologyBuilder.setBolt(String id, IRichBolt bolt, Number parallelism_hint) :其中, parallelism_hint即為executor執行緒數

設定Task數量

ComponentConfigurationDeclarer.setNumTasks(Number val)

例:

Rebalance – 再平衡

即,動態調整Topology拓撲的Worker程序數量、以及Executor執行緒數量

支援兩種調整方式: 1、通過Storm UI 2、通過Storm CLI

通過Storm CLI動態調整: storm help rebalance

例:storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10 將mytopology拓撲worker程序數量調整為5個, “ blue-spout ” 所使用的執行緒數量調整為3個 ,“ yellow-bolt ”所使用的執行緒數量調整為10個。

三.Storm的通訊機制

Worker程序間的資料通訊

ZMQ ZeroMQ 開源的訊息傳遞框架,並不是一個MessageQueue Netty Netty是基於NIO的網路框架,更加高效。(之所以Storm 0.9版本之後使用Netty,是因為ZMQ的license和Storm的license不相容。)

Worker內部的資料通訊

Disruptor 實現了“佇列”的功能。 可以理解為一種事件監聽或者訊息處理機制,即在隊列當中一邊由生產者放入訊息資料,另一邊消費者並行取出訊息資料處理。

Worker內部的訊息傳遞機制

四.Storm的容錯機制

1、叢集節點宕機

Nimbus伺服器 單點故障? 非Nimbus伺服器 故障時,該節點上所有Task任務都會超時,Nimbus會將這些Task任務重新分配到其他伺服器上執行

2、程序掛掉

Worker 掛掉時,Supervisor會重新啟動這個程序。如果啟動過程中仍然一直失敗,並且無法向Nimbus傳送心跳,Nimbus會將該Worker重新分配到其他伺服器上 Supervisor 無狀態(所有的狀態資訊都存放在Zookeeper中來管理) 快速失敗(每當遇到任何異常情況,都會自動毀滅) Nimbus 無狀態(所有的狀態資訊都存放在Zookeeper中來管理) 快速失敗(每當遇到任何異常情況,都會自動毀滅)

3、訊息的完整性

從Spout中發出的Tuple,以及基於他所產生Tuple(例如上個例子當中Spout發出的句子,以及句子當中單詞的tuple等) 由這些訊息就構成了一棵tuple樹 當這棵tuple樹傳送完成,並且樹當中每一條訊息都被正確處理,就表明spout傳送訊息被“完整處理”,即訊息的完整性

Acker -- 訊息完整性的實現機制 Storm的拓撲當中特殊的一些任務 負責跟蹤每個Spout發出的Tuple的DAG(有向無環圖)

五.Storm的DRPC

DRPC (Distributed RPC) 分散式遠端過程呼叫

DRPC 是通過一個 DRPC 服務端(DRPC server)來實現分散式 RPC 功能的。 DRPC Server 負責接收 RPC 請求,並將該請求傳送到 Storm中執行的 Topology,等待接收 Topology 傳送的處理結果,並將該結果返回給傳送請求的客戶端。 (其實,從客戶端的角度來說,DPRC 與普通的 RPC 呼叫並沒有什麼區別。)

DRPC設計目的: 為了充分利用Storm的計算能力實現高密度的並行實時計算。 (Storm接收若干個資料流輸入,資料在Topology當中執行完成,然後通過DRPC將結果進行輸出。)

客戶端通過向 DRPC 伺服器傳送待執行函式的名稱以及該函式的引數來獲取處理結果。實現該函式的拓撲使用一個DRPCSpout 從 DRPC 伺服器中接收一個函式呼叫流。DRPC 伺服器會為每個函式呼叫都標記了一個唯一的 id。隨後拓撲會執行函式來計算結果,並在拓撲的最後使用一個名為 ReturnResults 的 bolt 連線到 DRPC 伺服器,根據函式呼叫的 id 來將函式呼叫的結果返回。

定義DRPC拓撲:

方法1: 通過LinearDRPCTopologyBuilder (該方法也過期,不建議使用) 該方法會自動為我們設定Spout、將結果返回給DRPC Server等,我們只需要將Topology實現

方法2: 直接通過普通的拓撲構造方法TopologyBuilder來建立DRPC拓撲 需要手動設定好開始的DRPCSpout以及結束的ReturnResults

執行模式:

1、本地模式

2.遠端模式(叢集模式)

修改配置檔案conf/storm.yaml drpc.servers: - "node21“ 啟動DRPC Server bin/storm drpc & 通過StormSubmitter.submitTopology提交拓撲

六.Storm的事務

事務性拓撲(Transactional Topologies)

保證訊息(tuple)被且僅被處理一次

Design 1

強順序流(強有序) 引入事務(transaction)的概念,每個transaction(即每個tuple)關聯一個transaction id。 Transaction id從1開始,每個tuple會按照順序+1。 在處理tuple時,將處理成功的tuple結果以及transaction id同時寫入資料庫中進行儲存。

兩種情況:

1、當前transaction id與資料庫中的transaction id不一致

2、兩個transaction id相同

缺點: 一次只能處理一個tuple,無法實現分散式計算

Design 2

強順序的Batch流

事務(transaction)以batch為單位,即把一批tuple稱為一個batch,每次處理一個batch。 每個batch(一批tuple)關聯一個transaction id ,每個batch內部可以平行計算

缺點

Design 3

Storm's design

將Topology拆分為兩個階段:

1、Processing phase 允許並行處理多個batch

2、Commit phase 保證batch的強有序,一次只能處理一個batch

Design details

Manages state - 狀態管理

Storm通過Zookeeper儲存所有transaction相關資訊(包含了:當前transaction id 以及batch的元資料資訊)

Coordinates the transactions - 協調事務

Storm會管理決定transaction應該處理什麼階段(processing、committing)

Fault detection - 故障檢測

Storm內部通過Acker機制保障訊息被正常處理(使用者不需要手動去維護)

First class batch processing API

Storm提供batch bolt介面

三種事務:

1、普通事務

2、Partitioned Transaction - 分割槽事務

3、Opaque Transaction - 不透明分割槽事務