Storm入門學習隨記
推薦慕課網視訊:http://www.imooc.com/video/10055
另外,關於Storm併發度,已經整理到另一篇部落格中,希望對讀者有所幫助。
請參考:http://www.cnblogs.com/quchunhui/p/8271349.html
====Storm的起源。
Storm是開源的、分散式、流式計算系統
什麼是分散式呢?就是將一個任務拆解給多個計算機去執行,讓許多機器共通完成同一個任務,
把這個多機的細節給遮蔽,對外提供同一個介面、同一個服務,這樣的系統就是分散式系統。
在多年以前並沒有非常範用的分散式系統,即使存在,也都是限定在指定的領域,
當然,也有人嘗試從中提取出共通的部分,發明一個通用的分散式系統,但是都沒有很好的結果。
後來,Google發表了3篇論文,提出了分散式計算的模型,在分散式系統上有了一個質的突破。
有一位大牛看了這3篇論文之後,深受啟發,然後就發明了Hadoop系統。
然後,基於Hadoop的改造系統就如雨後春筍一般,接二連三的出現了。
以至於,Hadoop已經不是一套軟體,而是一整套生態系統了。
於是,人們談到分散式,就必談Hadoop了。
但是,Hadoop並不是萬能的,它只能處理適合進行批量計算的需求。對於,非批量的計算就不能夠滿足要求了。
很多時候,我們只能先收集一段時間資料,等資料收集到一定規模之後,我們才開始MapReduce處理。
有這麼一個故事:
-------------------
路人甲是在一家媒體公司A工作,他的主要工作內容很簡單,就是在一些搜尋引擎上做廣告,
眾所周知,搜尋引擎上的廣告是競價排名的,誰土豪誰就排前面,出錢少的就只能排在後面。
公司A的競爭對手都比較土豪,所以呢,公司A的廣告就一直排在後面,也沒什麼好的辦法。
後來,路人甲想出了一個餿主意,就是用程式不斷的去點選競爭對手的廣告,讓對手的廣告費
很快的花費調,這樣公司A就可以廉價的將廣告排在前面了。
搜尋引擎公司試圖識別出這些惡意點選屏來保護商家,將這些惡意點選扣除的費用返還給商家。
一般來說呢,如果利用MapReduce,一般情況下,都需要收集一段時間資料,然後根據這些
資料來算出哪些點選是惡意的,本身收集資料就已經很耗費時間了,再等計算完畢之後,
土豪商家的廣告費也基本上不剩什麼了。
所以呢,我們希望在點擊發生的時候就算出來該點選是否是作弊行為,即使不能馬上判斷出,
也應該儘早的計算出來。
-------------------
為了解決上面這個故事的需求,分散式流式計算系統就產生了,比較知名的有:
•【Yahoo】S4
•【IBM】StreamBase
•【Amazon】Kinesis
•【Spark】Streaming
•【Google】Millwheel
•【Apache】Storm(目前業界中最知名、流程)
批量計算(以Hadoop為代表)與流式計算的區別有哪些呢?
###################
目前已經有人在做一些前瞻性的專案,這些人試圖將批量計算和流式計算進行整合
試圖使用同一套API,即搞定流式計算,又搞定批量計算。
使一段程式碼不要任何改動,就可以同時執行在批量計算和流式計算兩種系統之上。
這種系統目前比較有名的有:
【Twitter】Summing Bird
【Google】CloudDataflow
兩個介面都已經開源了。等以後有機會一定要提前接觸一下。
###################
====Storm元件
Storm採用的是Master-Slave結構,就是使用一個節點來管理整個叢集的執行狀態。
Master節點被稱為:Nimbus,Slave節點用來維護每臺機器的狀態,被稱為:Supervisor
為什麼採取主從結構呢?主從結構比較簡單,不需要進行主節點仲裁等工作。
從前面的結構圖中我們還可以看出,採取主從結構之後,Nimbus是一個單點,
但是,我們知道分散式領域裡,大家都比較討厭自己的系統設計中存在單點,
因為單點如果發生故障,很有可能影響到整個叢集的可用性。
所以,如果一個系統設計中如果存在單點,一般情況下這個單點的作業必然比較輕,
掛了之後,短時間之內也不影響真個系統的執行,並且一般情況下都是沒有狀態的,
宕機之後至需要重啟就能夠恢復並正確處理。
Nimbus的角色是隻負責一些管理性的工作,它並不關心Worker之間的資料是如何傳輸的,
它的一些主要狀態都存在分散式協調服務(Zookeeper)中,記憶體裡面的東西都是可以丟失的,
如果它掛掉,只要沒有運算節點發生故障,那麼整個作業還是能夠正常的進行資料處理的。
Nimbus重啟之後,就可以正確處理整個系統的事務了。
Supervisor的角色是聽Nimbus的話,來啟動並監控真正進行計算的Worker的程序,
如果Worker有異常,那麼久幫助Worker重啟一下,它也不負責資料計算和資料傳輸,
真正的資料計算和輸出,都是由Worker來進行。
Worker是執行在工作節點上面,被Supervisor守護程序建立的用來幹活的JVM程序。
每個Worker對應於一個給定topology的全部執行任務的一個子集。
反過來說,一個Worker裡面不會執行屬於不同的topology的執行任務。
====Storm UI
為了方便使用者管理叢集,檢視叢集執行狀態,提供了一個基於Web的UI來監控整個Storm叢集
它本身不是叢集執行的必須部分,它的啟動停止都不影響Storm的正常執行。
====Storm作業提交執行流程
(1)使用者使用Storm的API來編寫Storm Topology。
(2)使用Storm的Client將Topology提交給Nimbus。
Nimbus收到之後,會將把這些Topology分配給足夠的Supervisor。
(3)Supervisor收到這些Topoligy之後,Nimbus會指派一些Task給這些Supervisor。
(4)Nimvus會指示Supervisor為這些Task生成一些Worker。
(5)Worker來執行這些Task來完成計算任務。
====StormAPI基礎概念
Storm稱使用者的一個作業為Topology(拓撲)。
為什麼叫拓撲呢?是因為Storm的一個拓撲主要包含了許多的資料節點,還有一些計算節點,
以及這些節點之間的邊,也就是說Storm的拓撲是由這些點和邊組成的一個有向無環圖。
這些點有兩種:資料來源節點(Spout)、普通的計算節點(Bolt),
點之間的邊稱為資料流(Stream),資料流中的每一條記錄稱為Tuple。
如下圖中,每一個“水龍頭”表示一個Spout,它會發送一些Tuple給下游的Bolt,
這些Bolt經過處理周,再發送一個Tuple給下一個Bolt,
最後,在這些Bolt裡面是可以執行一些寫資料到外部儲存(如資料庫)等操作的。
在圖中這個Topology裡面我們看到了兩個Spout和5個Bolt,
在實際執行的時候,每個Spout節點都可能有很多個例項,每個Bolt也有可能有很多個例項。
就像MapReduce一樣,一個Map節點並不代表只有一個併發,而有可能很多個Map例項在跑。
這些Spout和Bolt的這些邊裡面,使用者可以設定多種的Grouping的方式。
有些類似SQL中的Group By。用來制定這些計算是怎麼分組的。
*Fields Grouping:保證同樣的欄位移動落到同一個Bolt裡。
--以WordCount為例,MapReduce和Storm的工作流程對比:
(1)MapReduce
(2)Storm
====各個元件的一些說明
--Topologies
為了在storm上面做實時計算, 你要去建立一些topologies。一個topology就是一個計算節點所組成的圖。
Topology裡面的每個處理節點都包含處理邏輯, 而節點之間的連線則表示資料流動的方向。
執行一個Topology是很簡單的。首先,把你所有的程式碼以及所依賴的jar打進一個jar包。然後執行類似下面的這個命令。
strom jar all-your-code.jar backtype.storm.MyTopology arg1 arg2
這個命令會執行主類: backtype.strom.MyTopology,引數是arg1, arg2。
這個類的main函式定義這個topology並且把它提交給Nimbus。storm jar負責連線到nimbus並且上傳jar檔案。
--Stream
Stream是storm裡面的關鍵抽象。一個stream是一個沒有邊界的tuple序列。
storm提供一些原語來分散式地、可靠地把一個stream傳輸進一個新的stream。比如: 你可以把一個tweets流傳輸到熱門話題的流。
storm提供的最基本的處理stream的原語是spout和bolt。你可以實現Spout和Bolt對應的介面以處理你的應用的邏輯。
spout是流的源頭。比如一個spout可能從Kestrel佇列裡面讀取訊息並且把這些訊息發射成一個流。
又比如一個spout可以呼叫twitter的一個api並且把返回的tweets發射成一個流。
通常Spout會從外部資料來源(佇列、資料庫等)讀取資料,然後封裝成Tuple形式,之後傳送到Stream中。
Spout是一個主動的角色,在介面內部有個nextTuple函式,Storm框架會不停的呼叫該函式。
bolt可以接收任意多個輸入stream, 作一些處理, 有些bolt可能還會發射一些新的stream。
一些複雜的流轉換, 比如從一些tweet裡面計算出熱門話題, 需要多個步驟, 從而也就需要多個bolt。
Bolt可以做任何事情: 執行函式,過濾tuple,做一些聚合,做一些合併以及訪問資料庫等等。
Bolt處理輸入的Stream,併產生新的輸出Stream。
Bolt可以執行過濾、函式操作、Join、操作資料庫等任何操作。
Bolt是一個被動的角色,其介面中有一個execute(Tuple input)方法,在接收到訊息之後會呼叫此函式,使用者可以在此方法中執行自己的處理邏輯。
spout和bolt所組成一個網路會被打包成topology, topology是storm裡面最高一級的抽象(類似 Job), 你可以把topology提交給storm的叢集來執行。
topology的結構在Topology那一段已經說過了,這裡就不再贅述了。
topology裡面的每一個節點都是並行執行的。 在你的topology裡面, 你可以指定每個節點的並行度, storm則會在叢集裡面分配那麼多執行緒來同時計算。
一個topology會一直執行直到你顯式停止它。storm自動重新分配一些執行失敗的任務, 並且storm保證你不會有資料丟失, 即使在一些機器意外停機並且訊息被丟掉的情況下。
--資料模型(Data Model)
storm使用tuple來作為它的資料模型。每個tuple是一堆值,每個值有一個名字,並且每個值可以是任何型別,
在我的理解裡面一個tuple可以看作一個沒有方法的java物件(或者是一個表的欄位)。
總體來看,storm支援所有的基本型別、字串以及位元組陣列作為tuple的值型別。你也可以使用你自己定義的型別來作為值型別, 只要你實現對應的序列化器(serializer)。
一個Tuple代表資料流中的一個基本的處理單元,例如一條cookie日誌,它可以包含多個Field,每個Field表示一個屬性。
Tuple本來應該是一個Key-Value的Map,由於各個元件間傳遞的tuple的欄位名稱已經事先定義好了,所以Tuple只需要按序填入各個Value,所以就是一個Value List。
一個沒有邊界的、源源不斷的、連續的Tuple序列就組成了Stream。
topology裡面的每個節點必須定義它要發射的tuple的每個欄位。
比如下面這個bolt定義它所發射的tuple包含兩個欄位,型別分別是: double和triple。
- public class DoubleAndTripleBoltimplementsIRichBolt {
- private OutputCollectorBase _collector;
- @Override
- public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) {
- _collector = collector;
- }
- @Override
- public void execute(Tuple input) {
- intval = input.getInteger(0);
- _collector.emit(input,newValues(val*2, val*3));
- _collector.ack(input);
- }
- @Override
- public void cleanup() {
- }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(newFields("double","triple"));
- }
- }
參考部落格:http://blog.itpub.net/29754888/viewspace-1260026/
====StormAPI使用
我們來看看WorldCount的example程式碼。
WordCount的例項:https://github.com/quchunhui/StormWordCount
====Storm的併發機制
Task數量:表示每個Spout或Bolt邏輯上有多少個併發。它影響輸出結果。
Worker數量:代表總共有幾個JVM程序去執行我們的作業。
Executor數量:表示每個Spout或Bolt啟動幾個執行緒來執行。
下面程式碼中的數字表示Executor數量,它不影響結果,影響效能。
Worker的數量在Config中設定,下圖程式碼中的部分表示Worker數量。
*本地模式中,Worker數不生效,只會啟動一個JVM進行來執行作業。
*只有在叢集模式設定Worker才有效。而且叢集模式的時候一定要設定才能體現叢集的價值。
====Storm資料可靠性
分散式系統都管理很多臺機器,需要保證任意的Worker掛掉之後,我們的系統仍然能正確的處理,那麼
Storm如何保證這些資料正確的恢復?
Storm如何保證這些資料不被重複計算?
(1)Spout容錯API:NextTuple中,emit時,指定MsgID。
(2)Bolt容錯API:①emit時,錨定輸入Tuple。②Act輸入Tuple。
====Storm叢集搭建
(1)安裝zookeeper叢集
配置方法省略。
在storm叢集中,zookeeper具體發揮的是什麼作用呢?
概括來說,zookeeper是nimbus和supervisor進行互動的中介。
1、nimbus通過在zookeeper上寫狀態資訊來分配任務。
通俗的講就是寫哪些supervisor執行哪些task的對應關係。而supervisor則通過從zookeeper上讀取這些狀態資訊,來領取任務。
2、supervisor、task會發送心跳到zookeeper,使得nimbus可以監控整個叢集的狀態,從而在task執行失敗時,可以重啟他們。
(2)下載安裝Storm
官網上下載Storm:http://storm.apache.org
上傳至Linux並解壓縮。這裡將Storm解壓縮到/opt/apache-storm-0.10.0路徑下了。
(3)修改Storm配置檔案
配置檔案路徑:/opt/apache-storm-0.9.5/conf/storm.yaml
配置內容如下:
----------------
storm.zookeeper.servers:
- "192.168.93.128"
- "192.168.93.129"
- "192.169.93.130"
nimbus.host: "192.168.93.128"
storm.local.dir: "/opt/apache-storm-0.9.5/status"
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
----------------
※storm1.x之後,nimbus配置變更為:解決了nimbus單點故障的問題。
nimbus.seeds: ["192.168.1.80","192.168.1.81"]
置之後的檔案如下如所示:
--Storm配置項詳細介紹
•storm.zookeeper.servers:
ZooKeeper伺服器列表
•storm.zookeeper.port:
ZooKeeper連線埠
•storm.local.dir:
storm使用的本地檔案系統目錄(必須存在並且storm程序可讀寫)
•storm.cluster.mode:
Storm叢集執行模式([distributed|local])
•storm.local.mode.zmq:
Local模式下是否使用ZeroMQ作訊息系統,如果設定為false則使用java訊息系統。預設為false
•storm.zookeeper.root:
ZooKeeper中Storm的根目錄位置
•storm.zookeeper.session.timeout:
客戶端連線ZooKeeper超時時間
•storm.id:
執行中拓撲的id,由storm name和一個唯一隨機陣列成。
•nimbus.host:
nimbus伺服器地址
•nimbus.thrift.port:nimbus的thrift監聽埠
•nimbus.childopts:
通過storm-deploy專案部署時指定給nimbus程序的jvm選項
•nimbus.task.timeout.secs:
心跳超時時間,超時後nimbus會認為task死掉並重分配給另一個地址
•nimbus.monitor.freq.secs:
nimbus檢查心跳和重分配任務的時間間隔。注意如果是機器宕掉nimbus會立即接管並處理
•nimbus.supervisor.timeout.secs:
supervisor的心跳超時時間,一旦超過nimbus會認為該supervisor已死並停止為它分發新任務
•nimbus.task.launch.secs:
task啟動時的一個特殊超時設定。在啟動後第一次心跳前會使用該值來臨時替代nimbus.task.timeout.secs
•nimbus.reassign:
當發現task失敗時nimbus是否重新分配執行。預設為真,不建議修改
•nimbus.file.copy.expiration.secs:
nimbus判斷上傳/下載連結的超時時間,當空閒時間超過該設定時nimbus會認為連結死掉並主動斷開
•ui.port:
Storm UI的服務埠
•drpc.servers:
DRPC伺服器列表,以便DRPCSpout知道和誰通訊
•drpc.port:
Storm DRPC的服務埠
•supervisor.slots.ports:
supervisor上能夠執行workers的埠列表。每個worker佔用一個埠,且每個埠只執行一個worker。
通過這項配置可以調整每臺機器上執行的worker數。(調整slot數/每機)
•supervisor.childopts:
在storm-deploy專案中使用,用來配置supervisor守護程序的jvm選項
•supervisor.worker.timeout.secs:
supervisor中的worker心跳超時時間,一旦超時supervisor會嘗試重啟worker程序.
•supervisor.worker.start.timeout.secs:
supervisor初始啟動時,worker的心跳超時時間,當超過該時間supervisor會嘗試重啟worker。
因為JVM初始啟動和配置會帶來的額外消耗,從而使得第一次心跳會超過supervisor.worker.timeout.secs的設定
•supervisor.enable:
supervisor是否應當執行分配給他的workers。預設為true,該選項用來進行Storm的單元測試,一般不應修改.
•supervisor.heartbeat.frequency.secs:
supervisor心跳傳送頻率(多久傳送一次)
•supervisor.monitor.frequency.secs:
supervisor檢查worker心跳的頻率
•worker.childopts:
supervisor啟動worker時使用的jvm選項。所有的”%ID%”字串會被替換為對應worker的識別符號
•worker.heartbeat.frequency.secs:
worker的心跳傳送時間間隔
•task.heartbeat.frequency.secs:
task彙報狀態心跳時間間隔
•task.refresh.poll.secs:
task與其他tasks之間連結同步的頻率。(如果task被重分配,其他tasks向它傳送訊息需要重新整理連線)
。一般來講,重分配發生時其他tasks會理解得到通知。該配置僅僅為了防止未通知的情況。
•topology.debug:
如果設定成true,Storm將記錄發射的每條資訊。
•topology.optimize:
master是否在合適時機通過在單個執行緒內執行多個task以達到優化topologies的目的
•topology.workers:
執行該topology叢集中應當啟動的程序數量。
每個程序內部將以執行緒方式執行一定數目的tasks。topology的元件結合該引數和並行度提示來優化效能
•topology.ackers:
topology中啟動的acker任務數。
Acker儲存由spout傳送的tuples的記錄,並探測tuple何時被完全處理。
當Acker探測到tuple被處理完畢時會向spout傳送確認資訊。通常應當根據topology的吞吐量來確定acker的數目,但一般不需要太多。
當設定為0時,相當於禁用了訊息可靠性。storm會在spout傳送tuples後立即進行確認
•topology.message.timeout.secs:
topology中spout傳送訊息的最大處理超時時間。
如果一條訊息在該時間視窗內未被成功ack,Storm會告知spout這條訊息失敗。而部分spout實現了失敗訊息重播功能。
•topology.kryo.register:
註冊到Kryo(Storm底層的序列化框架)的序列化方案列表。序列化方案可以是一個類名,或者是com.esotericsoftware.kryo.Serializer的實現
•topology.skip.missing.kryo.registrations:
Storm是否應該跳過它不能識別的kryo序列化方案。如果設定為否task可能會裝載失敗或者在執行時丟擲錯誤
•topology.max.task.parallelism:
在一個topology中能夠允許的最大元件並行度。該項配置主要用在本地模式中測試執行緒數限制.
•topology.max.spout.pending:
一個spout task中處於pending狀態的最大的tuples數量。該配置應用於單個task,而不是整個spouts或topology
•topology.state.synchronization.timeout.secs:
元件同步狀態源的最大超時時間(保留選項,暫未使用)
•topology.stats.sample.rate:
用來產生task統計資訊的tuples抽樣百分比
•topology.fall.back.on.java.serialization:
topology中是否使用java的序列化方案
•zmq.threads:
每個worker程序內zeromq通訊用到的執行緒數
•zmq.linger.millis:
當連線關閉時,連結嘗試重新發送訊息到目標主機的持續時長。這是一個不常用的高階選項,基本上可以忽略.
•java.library.path:
JVM啟動(如Nimbus,Supervisor和workers)時的java.library.path設定。該選項告訴JVM在哪些路徑下定位本地庫
(4)配置Storm環境變數
環境變數位置:/etc/profile
配置內容之後如下圖所示:
注意:環境變數修改只有,一定要使用Source命令來使之生效。
(5)啟動Storm
--啟動Storm UI
命令:storm ui >/dev/null 2>&1 &
我們可以它啟動的時候相關的輸出指向到/def/null,並且把錯誤也重新定向到正常輸出。
--啟動主節點(Nimbus節點)
命令:storm nimbus >/dev/null 2>&1 &
在第1臺Linux虛擬機器上執行。正常啟動時的jps結果如下圖所示:
--啟動工作節點(Supervisor節點)
命令:storm supervisor >/dev/null 2>&1 &
在第2、3臺Linux虛擬機器上執行。正常啟動時的jps結果如下圖所示:
(6)啟動StormUI監控頁面:
Storm正常啟動之後,應該可以開啟StormUI畫面。在瀏覽器中輸入地址和埠即可
正確啟動時應該如下圖所示:
--Mainpage:
main頁面主要包括3個部分
【Cluster Summary】
•Nimbus uptime: nimbus的啟動時間
•Supervisors: storm叢集中supervisor的數目
•used slots: 使用了的slots數
•free slots: 剩餘的slots數
•total slots: 總的slots數
•Running tasks: 執行的任務數
【topology summary】
•Name: topology name
•id: topology id (由storm生成)
•status: topology的狀態,包括(ACTIVE, INACTIVE, KILLED, REBALANCING)
•uptime: topology執行的時間
•num workers: 執行的workers數
•num tasks: 執行的task數
【supervisor summary】
•host: supervisor(主機)的主機名
•uptime: supervisor啟動的時間
•slots: supervisor的埠數
•used slots: 使用的埠數
--Topology page
topology頁面主要包括4個部分
【topology summary】
(同主頁)
【topology stats】
•window: 時間視窗,顯示10m、3h、1d和all time的執行狀況
•emitted: emitted tuple數
•transferred: transferred tuple數, 說下與emitted的區別:如果一個task,emitted一個tuple到2個task中,則transferred tuple數是emitted tuple數的兩倍
•complete latency: spout emitting 一個tuple到spout ack這個tuple的平均時間
•acked: ack tuple數
•failed: 失敗的tuple數
【spouts】
•id: spout id
•parallelism: 任務數
•last error: 最近的錯誤數,只顯示最近的前200個錯誤
•emitted、transferred、complete latency、acked和failed上面已解釋
【bolts】
•process latency: bolt收到一個tuple到bolt ack這個tuple的平均時間
其他引數都解釋過了
還有componentpage和taskpage,引數的解釋同上。
taskpage中的Component指的是spoutid或者boltid,time指的是錯誤發生的時間,error是指錯誤的具體內容。
====Storm常用命令
【提交Topologies】
命令格式:storm jar 【jar路徑】 【拓撲包名.拓撲類名】 【拓撲名稱】
樣例:storm jar /storm-starter.jar storm.starter.WordCountTopology wordcountTop
#提交storm-starter.jar到遠端叢集,並啟動wordcountTop拓撲。
【停止Topologies】
命令格式:storm kill 【拓撲名稱】
樣例:storm kill wordcountTop
#殺掉wordcountTop拓撲。
【啟動nimbus後臺程式】
命令格式:storm nimbus
【啟動supervisor後臺程式】
命令格式:storm supervisor
【啟動drpc服務】
命令格式:storm drpc
【啟動ui服務】
命令格式:storm ui
【啟動REPL】
REPL — read-evaluate-print-loop。
雖然clojure可以作為一種指令碼語言內嵌在java裡面,但是它的首選程式設計方式是使用REPL,這是一個簡單的命令列介面,
使用它你可以輸入你的命令,執行,然後檢視結果, 你可以以下面這個命令來啟動REPL:
命令格式:storm repl
【列印本地配置】
命令格式:storm localconfvalue [配置引數關鍵字]
舉例:storm localconfvalue storm.zookeeper.servers
#根據指定引數列印本地配置的值。
【列印遠端配置】
命令格式:storm remoteconfvalue [配置引數關鍵字]
舉例:storm remoteconfvalue storm.zookeeper.servers
#根據指定引數列印遠端配置的值。
【執行Shell指令碼】
命令格式:storm shell resourcesdir command args
【列印CLASSPATH】
命令格式:storm classpath
====Storm調優:
--調優物件
當一個topology在storm cluster中執行時,它的併發主要跟3個邏輯物件相關:worker => executor =>task。(=>代表1對N)
(1)Worker
Worker是執行在工作節點上面,被Supervisor守護程序建立的用來幹活的JVM程序。
每個Worker對應於一個給定topology的全部執行任務的一個子集。
反過來說,一個Worker裡面不會執行屬於不同的topology的執行任務。
它可以通過[storm rebalance]命令任意調整。
(2)Executor
可以理解成一個Worker程序中的工作執行緒。
一個Executor中只能執行隸屬於同一個component(spout/bolt)的task。
一個Worker程序中可以有一個或多個Executor執行緒。在預設情況下,一個Executor執行一個task。
每個component(spout/bolt)的併發度就是指executor數量。
它可以通過[storm rebalance]命令任意調整。
(3)Task
Task則是spout和bolt中具體要乾的活了。一個Executor可以負責1個或多個task。
同時,task也是各個節點之間進行grouping(partition)的單位。無法在執行時調整。
--設定方法:
conf.setNumWorkers(workers); //設定worker數量
uilder.setBolt("2", new WordSpliter(),4) //設定Executor併發數量
builder.setBolt("2", new WordSpliter(),4).setNumTasks(1); //設定每個執行緒處理的Task數量
--任務分配:
任務分配是有下面兩種情況:
①、task數目比worker多:
例如task是[1 2 3 4],可用的slot(所謂slot就是可用的worker)只有[host1:port1,host2:port1],那麼最終是這樣分配
1:[host1:port1]
2:[host2:port1]
3:[host1:port1]
4:[host2:port1]
②、task數目比worker少:
例如task是[1 2],而worker有[host1:port1,host1:port2,host2:port1,host2:port2],
那麼首先會將woker排序,將不同host間隔排列,保證task不會全部分配到同一個機器上,也就是將worker排列成
[host1:port1,host2:port1,host1:port2,host2:port2]
然後分配任務為:
1:[host1:port1]
2:[host2:port1]
--簡單舉例:
通過Config.setNumWorkers(int))來指定一個storm叢集中執行topolgy的程序數量,所有的執行緒將在這些指定的worker程序中執行。
比如說一個topology中要啟動300個執行緒來執行spout/bolt,而指定的worker程序數量是60個。
那麼storm將會給每個worker分配5個執行緒來跑spout/bolt。
如果要對一個topology進行調優,可以調整worker數量和spout/bolt的parallelism(併發度,即executor)數量。
(調整引數之後要記得重新部署topology,後續會為該操作提供一個swapping的功能來減小重新部署的時間)。
例如:
builder.setBolt("cpp", new CppBolt(), 3).setNumTasks(5).noneGrouping(pre_name);
會建立3個執行緒,但有記憶體中會5個CppBolt物件,3個執行緒排程5個物件。
--網上搜羅的一些經驗:
①、對於worker和task之間的比例,網上也給出了參考,。即1個worker包含10~15個左右。當然這個參考,實際情況還是要根據配置和測試情況。
②、executor數最大不能超過該bolt的task數。
--Strom叢集命令
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
|
- [[email protected] bin]# storm
- Commands:
- activate
- classpath
- deactivate
- dev-zookeeper
- drpc
- help 命令幫助
- jar 執行上傳的jar包
- kill 殺死正在執行的topology 後面跟 topology的名稱
- list 檢視執行的所有topology執行情況
- localconfvalue
- logviewer 啟動topology日誌
- nimbus 啟動nimbus
- rebalance shell方式下修改topology執行引數比如worker個數 task個數等
- remoteconfvalue
- repl
- shell
- supervisor 啟動supervisor
- ui 啟動topology ui介面
- version
- Help:
- help
- help <command>
----Storm高可用HA
關於Storm的高可用,有以下幾個方面:
(1)資料利用階段可以通過ACK機制保證資料被處理;
(2)在程序級別,worker失效,supervisor會自動重啟worker執行緒;
(3)在元件級別,supervisor節點失效,會在其他節點重啟該supervisor任務;
但是,nimbus節點失效怎麼辦?
Supervisor程序和Nimbus程序,需要用Daemon程式如monit來啟動,失效時自動重新啟動。
如果Nimbus程序所在的機器都直接倒了,需要在其他機器上重新啟動,Storm目前沒有自建支援,需要自己寫指令碼實現。
即使Nimbus程序不在了,也只是不能部署新任務,有節點失效時不能重新分配而已,不影響已有的執行緒。
同樣,如果Supervisor程序失效,不影響已存在的Worker程序。
--END--