Storm(六)Storm的原理機制
一.Storm的資料分發策略
1. Shuffle Grouping
隨機分組,隨機派發stream裡面的tuple,保證每個bolt task接收到的tuple數目大致相同。 輪詢,平均分配
2. Fields Grouping
按欄位分組,比如,按"user-id"這個欄位來分組,那麼具有同樣"user-id"的 tuple 會被分到相同的Bolt裡的一個task, 而不同的"user-id"則可能會被分配到不同的task。
3. All Grouping
廣播發送,對於每一個tuple,所有的bolts都會收到
4. Global Grouping
全域性分組,把tuple分配給task id最低的task 。
5. None Grouping
不分組,這個分組的意思是說stream不關心到底怎樣分組。目前這種分組和Shuffle grouping是一樣的效果。 有一點不同的是storm會把使用none grouping的這個bolt放到這個bolt的訂閱者同一個執行緒裡面去執行(未來Storm如果可能的話會這樣設計)。
6. Direct Grouping
指向型分組, 這是一種比較特別的分組方法,用這種分組意味著訊息(tuple)的傳送者指定由訊息接收者的哪個task處理這個訊息。只有被宣告為 Direct Stream 的訊息流可以宣告這種分組方法。而且這種訊息tuple必須使用 emitDirect 方法來發射。訊息處理者可以通過 TopologyContext 來獲取處理它的訊息的task的id (OutputCollector.emit方法也會返回task的id)
7. Local or shuffle grouping
本地或隨機分組。如果目標bolt有一個或者多個task與源bolt的task在同一個工作程序中,tuple將會被隨機發送給這些同進程中的tasks。否則,和普通的Shuffle Grouping行為一致
8.customGrouping
自定義,相當於mapreduce那裡自己去實現一個partition一樣。
二.Storm的併發機制
Worker – 程序
一個Topology拓撲會包含一個或多個Worker(每個Worker程序只能從屬於一個特定的Topology) 這些Worker程序會並行跑在叢集中不同的伺服器上,即一個Topology拓撲其實是由並行執行在Storm叢集中多臺伺服器上的程序所組成
Executor – 執行緒
Executor是由Worker程序中生成的一個執行緒 每個Worker程序中會執行拓撲當中的一個或多個Executor執行緒 一個Executor執行緒中可以執行一個或多個Task任務(預設每個Executor只執行一個Task任務),但是這些Task任務都是對應著同一個元件(Spout、Bolt)。
Task
實際執行資料處理的最小單元 每個task即為一個Spout或者一個Bolt Task數量在整個Topology生命週期中保持不變,Executor數量可以變化或手動調整 (預設情況下,Task數量和Executor是相同的,即每個Executor執行緒中預設執行一個Task任務)
設定Worker程序數
Config.setNumWorkers(int workers)
設定Executor執行緒數
TopologyBuilder.setSpout(String id, IRichSpout spout, Number parallelism_hint) ,TopologyBuilder.setBolt(String id, IRichBolt bolt, Number parallelism_hint) :其中, parallelism_hint即為executor執行緒數
設定Task數量
ComponentConfigurationDeclarer.setNumTasks(Number val)
例:
Rebalance – 再平衡
即,動態調整Topology拓撲的Worker程序數量、以及Executor執行緒數量
支援兩種調整方式: 1、通過Storm UI 2、通過Storm CLI
通過Storm CLI動態調整: storm help rebalance
例:storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10 將mytopology拓撲worker程序數量調整為5個, “ blue-spout ” 所使用的執行緒數量調整為3個 ,“ yellow-bolt ”所使用的執行緒數量調整為10個。
三.Storm的通訊機制
Worker程序間的資料通訊
ZMQ ZeroMQ 開源的訊息傳遞框架,並不是一個MessageQueue Netty Netty是基於NIO的網路框架,更加高效。(之所以Storm 0.9版本之後使用Netty,是因為ZMQ的license和Storm的license不相容。)
Worker內部的資料通訊
Disruptor 實現了“佇列”的功能。 可以理解為一種事件監聽或者訊息處理機制,即在隊列當中一邊由生產者放入訊息資料,另一邊消費者並行取出訊息資料處理。
Worker內部的訊息傳遞機制
四.Storm的容錯機制
1、叢集節點宕機
Nimbus伺服器 單點故障? 非Nimbus伺服器 故障時,該節點上所有Task任務都會超時,Nimbus會將這些Task任務重新分配到其他伺服器上執行
2、程序掛掉
Worker 掛掉時,Supervisor會重新啟動這個程序。如果啟動過程中仍然一直失敗,並且無法向Nimbus傳送心跳,Nimbus會將該Worker重新分配到其他伺服器上 Supervisor 無狀態(所有的狀態資訊都存放在Zookeeper中來管理) 快速失敗(每當遇到任何異常情況,都會自動毀滅) Nimbus 無狀態(所有的狀態資訊都存放在Zookeeper中來管理) 快速失敗(每當遇到任何異常情況,都會自動毀滅)
3、訊息的完整性
從Spout中發出的Tuple,以及基於他所產生Tuple(例如上個例子當中Spout發出的句子,以及句子當中單詞的tuple等) 由這些訊息就構成了一棵tuple樹 當這棵tuple樹傳送完成,並且樹當中每一條訊息都被正確處理,就表明spout傳送訊息被“完整處理”,即訊息的完整性
Acker -- 訊息完整性的實現機制 Storm的拓撲當中特殊的一些任務 負責跟蹤每個Spout發出的Tuple的DAG(有向無環圖)
五.Storm的DRPC
DRPC (Distributed RPC) 分散式遠端過程呼叫
DRPC 是通過一個 DRPC 服務端(DRPC server)來實現分散式 RPC 功能的。 DRPC Server 負責接收 RPC 請求,並將該請求傳送到 Storm中執行的 Topology,等待接收 Topology 傳送的處理結果,並將該結果返回給傳送請求的客戶端。 (其實,從客戶端的角度來說,DPRC 與普通的 RPC 呼叫並沒有什麼區別。)
DRPC設計目的: 為了充分利用Storm的計算能力實現高密度的並行實時計算。 (Storm接收若干個資料流輸入,資料在Topology當中執行完成,然後通過DRPC將結果進行輸出。)
客戶端通過向 DRPC 伺服器傳送待執行函式的名稱以及該函式的引數來獲取處理結果。實現該函式的拓撲使用一個DRPCSpout 從 DRPC 伺服器中接收一個函式呼叫流。DRPC 伺服器會為每個函式呼叫都標記了一個唯一的 id。隨後拓撲會執行函式來計算結果,並在拓撲的最後使用一個名為 ReturnResults 的 bolt 連線到 DRPC 伺服器,根據函式呼叫的 id 來將函式呼叫的結果返回。
定義DRPC拓撲:
方法1: 通過LinearDRPCTopologyBuilder (該方法也過期,不建議使用) 該方法會自動為我們設定Spout、將結果返回給DRPC Server等,我們只需要將Topology實現
方法2: 直接通過普通的拓撲構造方法TopologyBuilder來建立DRPC拓撲 需要手動設定好開始的DRPCSpout以及結束的ReturnResults
執行模式:
1、本地模式
2.遠端模式(叢集模式)
修改配置檔案conf/storm.yaml drpc.servers: - "node21“ 啟動DRPC Server bin/storm drpc & 通過StormSubmitter.submitTopology提交拓撲
六.Storm的事務
事務性拓撲(Transactional Topologies)
保證訊息(tuple)被且僅被處理一次
Design 1
強順序流(強有序) 引入事務(transaction)的概念,每個transaction(即每個tuple)關聯一個transaction id。 Transaction id從1開始,每個tuple會按照順序+1。 在處理tuple時,將處理成功的tuple結果以及transaction id同時寫入資料庫中進行儲存。
兩種情況:
1、當前transaction id與資料庫中的transaction id不一致
2、兩個transaction id相同
缺點: 一次只能處理一個tuple,無法實現分散式計算
Design 2
強順序的Batch流
事務(transaction)以batch為單位,即把一批tuple稱為一個batch,每次處理一個batch。 每個batch(一批tuple)關聯一個transaction id ,每個batch內部可以平行計算
缺點
Design 3
Storm's design
將Topology拆分為兩個階段:
1、Processing phase 允許並行處理多個batch
2、Commit phase 保證batch的強有序,一次只能處理一個batch
Design details
Manages state - 狀態管理
Storm通過Zookeeper儲存所有transaction相關資訊(包含了:當前transaction id 以及batch的元資料資訊)
Coordinates the transactions - 協調事務
Storm會管理決定transaction應該處理什麼階段(processing、committing)
Fault detection - 故障檢測
Storm內部通過Acker機制保障訊息被正常處理(使用者不需要手動去維護)
First class batch processing API
Storm提供batch bolt介面
三種事務:
1、普通事務
2、Partitioned Transaction - 分割槽事務
3、Opaque Transaction - 不透明分割槽事務