1. 程式人生 > 實用技巧 >Storm執行原理探索

Storm執行原理探索

>>> hot3.png

Apache Storm 簡介

Apache Storm 的前身是 Twitter Storm 平臺,目前已經歸於 Apache 基金會管轄。Apache Storm 是一個免費開源的分散式實時計算系統。簡化了流資料的可靠處理,像 Hadoop 一樣實現實時批處理。Storm 很簡單,可用於任意程式語言。Apache Storm 採用 Clojure 開發。Storm 有很多應用場景,包括實時資料分析、聯機學習、持續計算、分散式 RPC、ETL 等。Storm 速度非常快,一個測試在單節點上實現每秒一百萬的組處理。

1、Storm叢集架構

Storm叢集採用主從架構方式,主節點是Nimbus,從節點是Supervisor,有關排程相關的資訊儲存到ZooKeeper叢集中,架構如下圖所示:

02150040_OoAa.png

Nimbus

Storm叢集的Master節點,負責分發使用者程式碼,指派給具體的Supervisor節點上的Worker節點,去執行Topology對應的元件(Spout/Bolt)的Task。

Supervisor

Storm叢集的從節點,負責管理執行在Supervisor節點上的每一個Worker程序的啟動和終止。通過Storm的配置檔案中的supervisor.slots.ports配置項,可以指定在一個Supervisor上最大允許多少個Slot,每個Slot通過埠號來唯一標識,一個埠號對應一個Worker程序(如果該Worker程序被啟動)。

ZooKeeper

用來協調Nimbus和Supervisor,如果Supervisor因故障出現問題而無法執行Topology,Nimbus會第一時間感知到,並重新分配Topology到其它可用的Supervisor上執行。

2、執行元件

Strom在執行中可分為spout與bolt兩個元件,其中,資料來源從spout開始,資料以tuple的方式傳送到bolt,多個bolt可以串連起來,一個bolt也可以接入多個spot/bolt.執行時原理如下圖

02150040_eUCI.png

其中,各元件定義如下

Spout: 資料來源,源源不斷的傳送元組資料 Tuple

Tuple: 元組資料的抽象介面,可以是任何型別的資料。但是必須要可序列化。

Stream: Tuple的集合。一個 Stream內的 Tuple擁有相同的源。

Bolt: 消費Tuple的節點。消費後可能會排出新的 Tuple到該 Stream上,也可能會排到到其他 Stream,也或者根本不排。可併發。

Topology: 將 Spout、 Bolt整合起來的拓撲圖。定義了 Spout和 Bolt的結合關係、併發數量、配置等等。

3、Topology具體執行

在上面Spout和Bolt組成一個Topology,然後通過命令將這個Topology打包成jar包,啟動相關命令啟動應用就可以了,一個Storm在叢集上執行一個Topology時,主要通過以下3個實體來完成Topology的執行工作:

(1). Worker(程序)

(2). Executor(執行緒)

(3). Task

下圖簡要描述了這3者之間的關係:

02150040_oCfN.png

1個worker程序執行的是1個topology的子集(注:不會出現1個worker為多個topology服務)。1個worker程序會啟動1個或多個executor執行緒來執行1個topology的component(spout或bolt)。因此,1個執行中的topology就是由叢集中多臺物理機上的多個worker程序組成的。

executor是1個被worker程序啟動的單獨執行緒。每個executor只會執行1個topology的1個component(spout或bolt)的task(注:task可以是1個或多個,storm預設是1個component只生成1個task,executor執行緒裡會在每次迴圈裡順序呼叫所有task例項)。

task是最終執行spout或bolt中程式碼的單元(注:1個task即為spout或bolt的1個例項,executor執行緒在執行期間會呼叫該task的nextTuple或execute方法)。topology啟動後,1個component(spout或bolt)的task數目是固定不變的,但該component使用的executor執行緒數可以動態調整(例如:1個executor執行緒可以執行該component的1個或多個task例項)。這意味著,對於1個component存在這樣的條件:#threads<=#tasks(即:執行緒數小於等於task數目)。預設情況下task的數目等於executor執行緒數目,即1個executor執行緒只執行1個task。

總體的Topology處理流程圖為:

02150040_6TdB.png

4、Stream Groupings

Storm中最重要的抽象,應該就是Stream grouping了,它能夠控制Spot/Bolt對應的Task以什麼樣的方式來分發Tuple,將Tuple發射到目的Spot/Bolt對應的Task

02150040_uXxC.png

目前,Storm Streaming Grouping支援如下幾種型別:

Shuffle Grouping :隨機分組,儘量均勻分佈到下游Bolt中

將流分組定義為混排。這種混排分組意味著來自Spout的輸入將混排,或隨機分發給此Bolt中的任務。shuffle grouping對各個task的tuple分配的比較均勻。

Fields Grouping :按欄位分組,按資料中field值進行分組;相同field值的Tuple被髮送到相同的Task

這種grouping機制保證相同field值的tuple會去同一個task,這對於WordCount來說非常關鍵,如果同一個單詞不去同一個task,那麼統計出來的單詞次數就不對了。“if the stream is grouped by the “user-id” field, tuples with the same “user-id” will always Go to the same task”. —— 小示例

All grouping :廣播

廣播發送, 對於每一個tuple將會複製到每一個bolt中處理。

Global grouping :全域性分組,Tuple被分配到一個Bolt中的一個Task,實現事務性的Topology。

Stream中的所有的tuple都會發送給同一個bolt任務處理,所有的tuple將會發送給擁有最小task_id的bolt任務處理。

None grouping :不分組

不關注並行處理負載均衡策略時使用該方式,目前等同於shuffle grouping,另外storm將會把bolt任務和他的上游提供資料的任務安排在同一個執行緒下。

Direct grouping :直接分組 指定分組

由tuple的發射單元直接決定tuple將發射給那個bolt,一般情況下是由接收tuple的bolt決定接收哪個bolt發射的Tuple。這是一種比較特別的分組方法,用這種分組意味著訊息的傳送者指定由訊息接收者的哪個task處理這個訊息。 只有被宣告為Direct Stream的訊息流可以宣告這種分組方法。而且這種訊息tuple必須使用emitDirect方法來發射。訊息處理者可以通過TopologyContext來獲取處理它的訊息的taskid (OutputCollector.emit方法也會返回taskid)。

另外,Storm還提供了使用者自定義Streaming Grouping介面,如果上述Streaming Grouping都無法滿足實際業務需求,也可以自己實現,只需要實現backtype.storm.grouping.CustomStreamGrouping介面,該介面重定義瞭如下方法:

List chooseTasks(int taskId, List values)

上面幾種Streaming Group的內建實現中,最常用的應該是Shuffle Grouping、Fields Grouping、Direct Grouping這三種,使用其它的也能滿足特定的應用需求。

5、可靠性

(1)、spout的可靠性

spout會記錄它所發射出去的tuple,當下遊任意一個bolt處理失敗時spout能夠重新發射該tuple。在spout的nextTuple()傳送一個tuple時,為實現可靠訊息處理需要給每個spout發出的tuple帶上唯一ID,並將該ID作為引數傳遞給SpoutOutputCollector的emit()方法:collector.emit(new Values("value1","value2"), tupleID);

實際上Values extends ArrayList<Object>

保障過程中,每個bolt每收到一個tuple,都要向上遊應答或報錯,在tuple樹上的所有bolt都確認應答,spout才會隱式呼叫ack()方法表明這條訊息(一條完整的流)已經處理完畢,將會對編號ID的訊息應答確認;處理報錯、超時則會呼叫fail()方法。

(2)、bolt的可靠性

bolt的可靠訊息處理機制包含兩個步驟:

a、當發射衍生的tuple,需要錨定讀入的tuple

b、當處理訊息時,需要應答或報錯

可以通過OutputCollector中emit()的一個過載函式錨定或tuple:collector.emit(tuple, new Values(word)); 並且需要呼叫一次this.collector.ack(tuple)應答。

6、高效能平行計算引擎Storm和Spark比較

Spark基於這樣的理念,把計算過程傳遞給資料要比把資料傳遞給計算過程要更富效率。每個節點儲存(或快取)它的資料集,然後任務被提交給節點。每次輸入是一次性將所有資料分部到各機器節點讀入,通過記憶體計算將結果RDD臨時儲存記憶體中。一次跑批將所有的任務根據惰性RDD的區別來拆解不現的stage,下一個的stage的輸入為上一個stage的輸出。這一過程全部都是在記憶體中完成。(記憶體不足也可以硬碟)所以這是把過程傳遞給資料。這和Hadoop map/reduce非常相似,除了積極使用記憶體來避免I/O操作,以使得迭代演算法(前一步計算輸出是下一步計算的輸入)效能更高。

而Storm的架構和Spark截然相反。Storm是一個分散式流計算引擎。每個節點實現一個基本的計算過程,而資料項在互相連線的網路節點中流進流出。和Spark相反,這個是把資料傳遞給過程。Strom任務提交後組成一個Topology,會一直不斷的取資料進行處理,如果沒有執行停止命令,任務不會停止。而Spak可以當成是一次性的(spark streaming不是一次性的)任務。資料處理完後任務就結束。

兩個框架都用於處理大量資料的平行計算。

Storm在動態處理大量生成的“小資料塊”上要更好(比如在Twitter資料流上實時計算一些匯聚功能或分析)。

Spark工作於現有的資料全集(如Hadoop資料)已經被匯入Spark叢集,Spark基於in-memory管理可以進行快訊掃描,並最小化迭代演算法的全域性I/O操作。

不過Spark流模組(Streaming Module)倒是和Storm相類似(都是流計算引擎),儘管並非完全一樣。

Spark流模組先匯聚批量資料然後進行資料塊分發(視作不可變資料進行處理),而Storm是隻要接收到資料就實時處理並分發。

不確定哪種方式在資料吞吐量上要具優勢,不過Storm計算時間延遲要小。

總結下,Spark和Storm設計相反,而Spark Steaming才和Storm類似,前者有資料平滑視窗(sliding window),而後者需要自己去維護這個視窗。

轉載於:https://my.oschina.net/riseee/blog/1627829