1. 程式人生 > >Storm入門學習隨記

Storm入門學習隨記

推薦慕課網視訊:http://www.imooc.com/video/10055

 

另外,關於Storm併發度,已經整理到另一篇部落格中,希望對讀者有所幫助。

請參考:http://www.cnblogs.com/quchunhui/p/8271349.html

 

====Storm的起源。

Storm是開源的、分散式、流式計算系統

 

什麼是分散式呢?就是將一個任務拆解給多個計算機去執行,讓許多機器共通完成同一個任務,

把這個多機的細節給遮蔽,對外提供同一個介面、同一個服務,這樣的系統就是分散式系統。

 

在多年以前並沒有非常範用的分散式系統,即使存在,也都是限定在指定的領域,

當然,也有人嘗試從中提取出共通的部分,發明一個通用的分散式系統,但是都沒有很好的結果。

後來,Google發表了3篇論文,提出了分散式計算的模型,在分散式系統上有了一個質的突破。

 

有一位大牛看了這3篇論文之後,深受啟發,然後就發明了Hadoop系統。

 

然後,基於Hadoop的改造系統就如雨後春筍一般,接二連三的出現了。

以至於,Hadoop已經不是一套軟體,而是一整套生態系統了。

於是,人們談到分散式,就必談Hadoop了。

 

但是,Hadoop並不是萬能的,它只能處理適合進行批量計算的需求。對於,非批量的計算就不能夠滿足要求了。

很多時候,我們只能先收集一段時間資料,等資料收集到一定規模之後,我們才開始MapReduce處理。

 

有這麼一個故事:

-------------------

路人甲是在一家媒體公司A工作,他的主要工作內容很簡單,就是在一些搜尋引擎上做廣告,

眾所周知,搜尋引擎上的廣告是競價排名的,誰土豪誰就排前面,出錢少的就只能排在後面。

公司A的競爭對手都比較土豪,所以呢,公司A的廣告就一直排在後面,也沒什麼好的辦法。

後來,路人甲想出了一個餿主意,就是用程式不斷的去點選競爭對手的廣告,讓對手的廣告費

很快的花費調,這樣公司A就可以廉價的將廣告排在前面了。

搜尋引擎公司試圖識別出這些惡意點選屏來保護商家,將這些惡意點選扣除的費用返還給商家。

一般來說呢,如果利用MapReduce,一般情況下,都需要收集一段時間資料,然後根據這些

資料來算出哪些點選是惡意的,本身收集資料就已經很耗費時間了,再等計算完畢之後,

土豪商家的廣告費也基本上不剩什麼了。

所以呢,我們希望在點擊發生的時候就算出來該點選是否是作弊行為,即使不能馬上判斷出,

也應該儘早的計算出來。

-------------------

 為了解決上面這個故事的需求,分散式流式計算系統就產生了,比較知名的有:

•【Yahoo】S4

•【IBM】StreamBase

•【Amazon】Kinesis

•【Spark】Streaming

•【Google】Millwheel

•【Apache】Storm(目前業界中最知名、流程)

 

批量計算(以Hadoop為代表)與流式計算的區別有哪些呢?

 

###################

目前已經有人在做一些前瞻性的專案,這些人試圖將批量計算和流式計算進行整合

試圖使用同一套API,即搞定流式計算,又搞定批量計算。

使一段程式碼不要任何改動,就可以同時執行在批量計算和流式計算兩種系統之上。

這種系統目前比較有名的有:

【Twitter】Summing Bird

【Google】CloudDataflow

兩個介面都已經開源了。等以後有機會一定要提前接觸一下。

###################

 

====Storm元件

Storm採用的是Master-Slave結構,就是使用一個節點來管理整個叢集的執行狀態。

Master節點被稱為:Nimbus,Slave節點用來維護每臺機器的狀態,被稱為:Supervisor

 

為什麼採取主從結構呢?主從結構比較簡單,不需要進行主節點仲裁等工作。

 

從前面的結構圖中我們還可以看出,採取主從結構之後,Nimbus是一個單點

但是,我們知道分散式領域裡,大家都比較討厭自己的系統設計中存在單點,

因為單點如果發生故障,很有可能影響到整個叢集的可用性。

 

所以,如果一個系統設計中如果存在單點,一般情況下這個單點的作業必然比較輕,

掛了之後,短時間之內也不影響真個系統的執行,並且一般情況下都是沒有狀態的,

宕機之後至需要重啟就能夠恢復並正確處理。

 

Nimbus的角色是隻負責一些管理性的工作,它並不關心Worker之間的資料是如何傳輸的,

它的一些主要狀態都存在分散式協調服務(Zookeeper)中,記憶體裡面的東西都是可以丟失的

如果它掛掉,只要沒有運算節點發生故障,那麼整個作業還是能夠正常的進行資料處理的。

Nimbus重啟之後,就可以正確處理整個系統的事務了。

 

Supervisor的角色是聽Nimbus的話,來啟動並監控真正進行計算的Worker的程序,

如果Worker有異常,那麼久幫助Worker重啟一下,它也不負責資料計算和資料傳輸,

 

真正的資料計算和輸出,都是由Worker來進行。

Worker是執行在工作節點上面,被Supervisor守護程序建立的用來幹活的JVM程序。

每個Worker對應於一個給定topology的全部執行任務的一個子集。

反過來說,一個Worker裡面不會執行屬於不同的topology的執行任務。

 

====Storm UI

為了方便使用者管理叢集,檢視叢集執行狀態,提供了一個基於Web的UI來監控整個Storm叢集

它本身不是叢集執行的必須部分,它的啟動停止都不影響Storm的正常執行。

 

====Storm作業提交執行流程

(1)使用者使用Storm的API來編寫Storm Topology。

(2)使用Storm的Client將Topology提交給Nimbus。

Nimbus收到之後,會將把這些Topology分配給足夠的Supervisor。

(3)Supervisor收到這些Topoligy之後,Nimbus會指派一些Task給這些Supervisor。

(4)Nimvus會指示Supervisor為這些Task生成一些Worker。

(5)Worker來執行這些Task來完成計算任務。

  

====StormAPI基礎概念

Storm稱使用者的一個作業為Topology(拓撲)。

 

為什麼叫拓撲呢?是因為Storm的一個拓撲主要包含了許多的資料節點,還有一些計算節點,

以及這些節點之間的邊,也就是說Storm的拓撲是由這些點和邊組成的一個有向無環圖。

這些點有兩種:資料來源節點(Spout)、普通的計算節點(Bolt),

點之間的邊稱為資料流(Stream),資料流中的每一條記錄稱為Tuple。

 

如下圖中,每一個“水龍頭”表示一個Spout,它會發送一些Tuple給下游的Bolt,

這些Bolt經過處理周,再發送一個Tuple給下一個Bolt,

最後,在這些Bolt裡面是可以執行一些寫資料到外部儲存(如資料庫)等操作的。

在圖中這個Topology裡面我們看到了兩個Spout和5個Bolt,

在實際執行的時候,每個Spout節點都可能有很多個例項,每個Bolt也有可能有很多個例項。

就像MapReduce一樣,一個Map節點並不代表只有一個併發,而有可能很多個Map例項在跑。

 

這些Spout和Bolt的這些邊裡面,使用者可以設定多種的Grouping的方式。

有些類似SQL中的Group By。用來制定這些計算是怎麼分組的。

 

*Fields Grouping:保證同樣的欄位移動落到同一個Bolt裡。

 

--以WordCount為例,MapReduce和Storm的工作流程對比:

(1)MapReduce

(2)Storm

 

====各個元件的一些說明

--Topologies

為了在storm上面做實時計算, 你要去建立一些topologies。一個topology就是一個計算節點所組成的圖。

Topology裡面的每個處理節點都包含處理邏輯, 而節點之間的連線則表示資料流動的方向。

執行一個Topology是很簡單的。首先,把你所有的程式碼以及所依賴的jar打進一個jar包。然後執行類似下面的這個命令。

strom jar all-your-code.jar backtype.storm.MyTopology arg1 arg2

這個命令會執行主類: backtype.strom.MyTopology,引數是arg1, arg2。

這個類的main函式定義這個topology並且把它提交給Nimbus。storm jar負責連線到nimbus並且上傳jar檔案。

 

--Stream

Stream是storm裡面的關鍵抽象。一個stream是一個沒有邊界的tuple序列。

storm提供一些原語來分散式地、可靠地把一個stream傳輸進一個新的stream。比如: 你可以把一個tweets流傳輸到熱門話題的流。

storm提供的最基本的處理stream的原語是spout和bolt。你可以實現Spout和Bolt對應的介面以處理你的應用的邏輯。

spout是流的源頭。比如一個spout可能從Kestrel佇列裡面讀取訊息並且把這些訊息發射成一個流。

又比如一個spout可以呼叫twitter的一個api並且把返回的tweets發射成一個流。

通常Spout會從外部資料來源(佇列、資料庫等)讀取資料,然後封裝成Tuple形式,之後傳送到Stream中。

Spout是一個主動的角色,在介面內部有個nextTuple函式,Storm框架會不停的呼叫該函式。

 

bolt可以接收任意多個輸入stream, 作一些處理, 有些bolt可能還會發射一些新的stream。

一些複雜的流轉換, 比如從一些tweet裡面計算出熱門話題, 需要多個步驟, 從而也就需要多個bolt。

Bolt可以做任何事情: 執行函式,過濾tuple,做一些聚合,做一些合併以及訪問資料庫等等。

Bolt處理輸入的Stream,併產生新的輸出Stream。

Bolt可以執行過濾、函式操作、Join、操作資料庫等任何操作。

Bolt是一個被動的角色,其介面中有一個execute(Tuple input)方法,在接收到訊息之後會呼叫此函式,使用者可以在此方法中執行自己的處理邏輯。

 

spout和bolt所組成一個網路會被打包成topology, topology是storm裡面最高一級的抽象(類似 Job), 你可以把topology提交給storm的叢集來執行。

topology的結構在Topology那一段已經說過了,這裡就不再贅述了。

 

topology裡面的每一個節點都是並行執行的。 在你的topology裡面, 你可以指定每個節點的並行度, storm則會在叢集裡面分配那麼多執行緒來同時計算。

一個topology會一直執行直到你顯式停止它。storm自動重新分配一些執行失敗的任務, 並且storm保證你不會有資料丟失, 即使在一些機器意外停機並且訊息被丟掉的情況下。

 

--資料模型(Data Model)

storm使用tuple來作為它的資料模型。每個tuple是一堆值,每個值有一個名字,並且每個值可以是任何型別,

在我的理解裡面一個tuple可以看作一個沒有方法的java物件(或者是一個表的欄位)。

總體來看,storm支援所有的基本型別、字串以及位元組陣列作為tuple的值型別。你也可以使用你自己定義的型別來作為值型別, 只要你實現對應的序列化器(serializer)。

一個Tuple代表資料流中的一個基本的處理單元,例如一條cookie日誌,它可以包含多個Field,每個Field表示一個屬性。

 

Tuple本來應該是一個Key-Value的Map,由於各個元件間傳遞的tuple的欄位名稱已經事先定義好了,所以Tuple只需要按序填入各個Value,所以就是一個Value List。

一個沒有邊界的、源源不斷的、連續的Tuple序列就組成了Stream。

 

topology裡面的每個節點必須定義它要發射的tuple的每個欄位。

比如下面這個bolt定義它所發射的tuple包含兩個欄位,型別分別是: double和triple。

  1. public class DoubleAndTripleBoltimplementsIRichBolt {
  2.     private OutputCollectorBase _collector;
  3.  
  4.     @Override
  5.     public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) {
  6.         _collector = collector;
  7.     }
  8.  
  9.     @Override
  10.     public void execute(Tuple input) {
  11.         intval = input.getInteger(0);
  12.         _collector.emit(input,newValues(val*2, val*3));
  13.         _collector.ack(input);
  14.     }
  15.  
  16.     @Override
  17.     public void cleanup() {
  18.     }
  19.  
  20.     @Override
  21.     public void declareOutputFields(OutputFieldsDeclarer declarer) {
  22.         declarer.declare(newFields("double","triple"));
  23.     }
  24. }

 

參考部落格:http://blog.itpub.net/29754888/viewspace-1260026/

 

====StormAPI使用

我們來看看WorldCount的example程式碼。

WordCount的例項:https://github.com/quchunhui/StormWordCount

 

====Storm的併發機制

Task數量:表示每個Spout或Bolt邏輯上有多少個併發。它影響輸出結果。

Worker數量:代表總共有幾個JVM程序去執行我們的作業。

Executor數量:表示每個Spout或Bolt啟動幾個執行緒來執行。

 

下面程式碼中的數字表示Executor數量,它不影響結果,影響效能。

 

Worker的數量在Config中設定,下圖程式碼中的部分表示Worker數量。

*本地模式中,Worker數不生效,只會啟動一個JVM進行來執行作業。

*只有在叢集模式設定Worker才有效。而且叢集模式的時候一定要設定才能體現叢集的價值。

 

====Storm資料可靠性

分散式系統都管理很多臺機器,需要保證任意的Worker掛掉之後,我們的系統仍然能正確的處理,那麼

Storm如何保證這些資料正確的恢復?

Storm如何保證這些資料不被重複計算?

(1)Spout容錯API:NextTuple中,emit時,指定MsgID。

(2)Bolt容錯API:①emit時,錨定輸入Tuple。②Act輸入Tuple。

 

====Storm叢集搭建

(1)安裝zookeeper叢集

配置方法省略。

 

在storm叢集中,zookeeper具體發揮的是什麼作用呢?

概括來說,zookeeper是nimbus和supervisor進行互動的中介。

1、nimbus通過在zookeeper上寫狀態資訊來分配任務。

通俗的講就是寫哪些supervisor執行哪些task的對應關係。而supervisor則通過從zookeeper上讀取這些狀態資訊,來領取任務。

2、supervisor、task會發送心跳到zookeeper,使得nimbus可以監控整個叢集的狀態,從而在task執行失敗時,可以重啟他們。

 

(2)下載安裝Storm

官網上下載Storm:http://storm.apache.org

上傳至Linux並解壓縮。這裡將Storm解壓縮到/opt/apache-storm-0.10.0路徑下了。

 

(3)修改Storm配置檔案

配置檔案路徑:/opt/apache-storm-0.9.5/conf/storm.yaml

配置內容如下:

----------------

storm.zookeeper.servers:
- "192.168.93.128"
- "192.168.93.129"
- "192.169.93.130"
nimbus.host: "192.168.93.128"
storm.local.dir: "/opt/apache-storm-0.9.5/status"
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703

----------------

※storm1.x之後,nimbus配置變更為:解決了nimbus單點故障的問題。

nimbus.seeds: ["192.168.1.80","192.168.1.81"]

 

置之後的檔案如下如所示:

 

--Storm配置項詳細介紹

•storm.zookeeper.servers:

ZooKeeper伺服器列表

•storm.zookeeper.port:

ZooKeeper連線埠

•storm.local.dir:

storm使用的本地檔案系統目錄(必須存在並且storm程序可讀寫)

•storm.cluster.mode:

Storm叢集執行模式([distributed|local])

•storm.local.mode.zmq:

Local模式下是否使用ZeroMQ作訊息系統,如果設定為false則使用java訊息系統。預設為false

•storm.zookeeper.root:

ZooKeeper中Storm的根目錄位置

•storm.zookeeper.session.timeout:

客戶端連線ZooKeeper超時時間

•storm.id:

執行中拓撲的id,由storm name和一個唯一隨機陣列成。

•nimbus.host:

nimbus伺服器地址

•nimbus.thrift.port:nimbus的thrift監聽埠

•nimbus.childopts:

通過storm-deploy專案部署時指定給nimbus程序的jvm選項

•nimbus.task.timeout.secs:

心跳超時時間,超時後nimbus會認為task死掉並重分配給另一個地址

•nimbus.monitor.freq.secs:

nimbus檢查心跳和重分配任務的時間間隔。注意如果是機器宕掉nimbus會立即接管並處理

•nimbus.supervisor.timeout.secs:

supervisor的心跳超時時間,一旦超過nimbus會認為該supervisor已死並停止為它分發新任務

•nimbus.task.launch.secs:

task啟動時的一個特殊超時設定。在啟動後第一次心跳前會使用該值來臨時替代nimbus.task.timeout.secs

•nimbus.reassign:

當發現task失敗時nimbus是否重新分配執行。預設為真,不建議修改

•nimbus.file.copy.expiration.secs:

nimbus判斷上傳/下載連結的超時時間,當空閒時間超過該設定時nimbus會認為連結死掉並主動斷開

•ui.port:

Storm UI的服務埠

•drpc.servers:

DRPC伺服器列表,以便DRPCSpout知道和誰通訊

•drpc.port:

Storm DRPC的服務埠

•supervisor.slots.ports:

supervisor上能夠執行workers的埠列表。每個worker佔用一個埠,且每個埠只執行一個worker。

通過這項配置可以調整每臺機器上執行的worker數。(調整slot數/每機)

•supervisor.childopts:

在storm-deploy專案中使用,用來配置supervisor守護程序的jvm選項

•supervisor.worker.timeout.secs:

supervisor中的worker心跳超時時間,一旦超時supervisor會嘗試重啟worker程序.

•supervisor.worker.start.timeout.secs:

supervisor初始啟動時,worker的心跳超時時間,當超過該時間supervisor會嘗試重啟worker。

因為JVM初始啟動和配置會帶來的額外消耗,從而使得第一次心跳會超過supervisor.worker.timeout.secs的設定

•supervisor.enable:

supervisor是否應當執行分配給他的workers。預設為true,該選項用來進行Storm的單元測試,一般不應修改.

•supervisor.heartbeat.frequency.secs:

supervisor心跳傳送頻率(多久傳送一次)

•supervisor.monitor.frequency.secs:

supervisor檢查worker心跳的頻率

•worker.childopts:

supervisor啟動worker時使用的jvm選項。所有的”%ID%”字串會被替換為對應worker的識別符號

•worker.heartbeat.frequency.secs:

worker的心跳傳送時間間隔

•task.heartbeat.frequency.secs:

task彙報狀態心跳時間間隔

•task.refresh.poll.secs:

task與其他tasks之間連結同步的頻率。(如果task被重分配,其他tasks向它傳送訊息需要重新整理連線)

。一般來講,重分配發生時其他tasks會理解得到通知。該配置僅僅為了防止未通知的情況。

•topology.debug:

如果設定成true,Storm將記錄發射的每條資訊。

•topology.optimize:

master是否在合適時機通過在單個執行緒內執行多個task以達到優化topologies的目的

•topology.workers:

執行該topology叢集中應當啟動的程序數量。

每個程序內部將以執行緒方式執行一定數目的tasks。topology的元件結合該引數和並行度提示來優化效能

•topology.ackers:

topology中啟動的acker任務數。

Acker儲存由spout傳送的tuples的記錄,並探測tuple何時被完全處理。

當Acker探測到tuple被處理完畢時會向spout傳送確認資訊。通常應當根據topology的吞吐量來確定acker的數目,但一般不需要太多。

當設定為0時,相當於禁用了訊息可靠性。storm會在spout傳送tuples後立即進行確認

•topology.message.timeout.secs:

topology中spout傳送訊息的最大處理超時時間。

如果一條訊息在該時間視窗內未被成功ack,Storm會告知spout這條訊息失敗。而部分spout實現了失敗訊息重播功能。

•topology.kryo.register:

註冊到Kryo(Storm底層的序列化框架)的序列化方案列表。序列化方案可以是一個類名,或者是com.esotericsoftware.kryo.Serializer的實現

•topology.skip.missing.kryo.registrations:

Storm是否應該跳過它不能識別的kryo序列化方案。如果設定為否task可能會裝載失敗或者在執行時丟擲錯誤

•topology.max.task.parallelism:

在一個topology中能夠允許的最大元件並行度。該項配置主要用在本地模式中測試執行緒數限制.

•topology.max.spout.pending:

一個spout task中處於pending狀態的最大的tuples數量。該配置應用於單個task,而不是整個spouts或topology

•topology.state.synchronization.timeout.secs:

元件同步狀態源的最大超時時間(保留選項,暫未使用)

•topology.stats.sample.rate:

用來產生task統計資訊的tuples抽樣百分比

•topology.fall.back.on.java.serialization:

topology中是否使用java的序列化方案

•zmq.threads:

每個worker程序內zeromq通訊用到的執行緒數

•zmq.linger.millis:

當連線關閉時,連結嘗試重新發送訊息到目標主機的持續時長。這是一個不常用的高階選項,基本上可以忽略.

•java.library.path:

JVM啟動(如Nimbus,Supervisor和workers)時的java.library.path設定。該選項告訴JVM在哪些路徑下定位本地庫

 

(4)配置Storm環境變數

環境變數位置:/etc/profile

配置內容之後如下圖所示:

注意:環境變數修改只有,一定要使用Source命令來使之生效。

 

(5)啟動Storm

--啟動Storm UI

命令:storm ui >/dev/null 2>&1 &

我們可以它啟動的時候相關的輸出指向到/def/null,並且把錯誤也重新定向到正常輸出。

 

--啟動主節點(Nimbus節點)

命令:storm nimbus >/dev/null 2>&1 &

在第1臺Linux虛擬機器上執行。正常啟動時的jps結果如下圖所示:

 

--啟動工作節點(Supervisor節點)

命令:storm supervisor >/dev/null 2>&1 &

在第2、3臺Linux虛擬機器上執行。正常啟動時的jps結果如下圖所示:

 

(6)啟動StormUI監控頁面:

Storm正常啟動之後,應該可以開啟StormUI畫面。在瀏覽器中輸入地址和埠即可

正確啟動時應該如下圖所示:

--Mainpage:

main頁面主要包括3個部分

 

 【Cluster Summary】

•Nimbus uptime: nimbus的啟動時間

•Supervisors: storm叢集中supervisor的數目

•used slots: 使用了的slots數

•free slots: 剩餘的slots數

•total slots: 總的slots數

•Running tasks: 執行的任務數

 

【topology summary】

•Name: topology name

•id: topology id (由storm生成)

•status: topology的狀態,包括(ACTIVE, INACTIVE, KILLED, REBALANCING)

•uptime: topology執行的時間

•num workers: 執行的workers數

•num tasks: 執行的task數

 

【supervisor summary】

•host: supervisor(主機)的主機名

•uptime: supervisor啟動的時間

•slots: supervisor的埠數

•used slots: 使用的埠數

 

--Topology page

topology頁面主要包括4個部分

【topology summary】

(同主頁)

 

【topology stats】

•window: 時間視窗,顯示10m、3h、1d和all time的執行狀況

•emitted: emitted tuple數

•transferred: transferred tuple數, 說下與emitted的區別:如果一個task,emitted一個tuple到2個task中,則transferred tuple數是emitted tuple數的兩倍

•complete latency: spout emitting 一個tuple到spout ack這個tuple的平均時間

•acked: ack tuple數

•failed: 失敗的tuple數

 

【spouts】

•id: spout id

•parallelism: 任務數

•last error: 最近的錯誤數,只顯示最近的前200個錯誤

•emitted、transferred、complete latency、acked和failed上面已解釋

 

【bolts】

•process latency: bolt收到一個tuple到bolt ack這個tuple的平均時間

其他引數都解釋過了

還有componentpage和taskpage,引數的解釋同上。

taskpage中的Component指的是spoutid或者boltid,time指的是錯誤發生的時間,error是指錯誤的具體內容。

 

====Storm常用命令

【提交Topologies】

命令格式:storm jar 【jar路徑】 【拓撲包名.拓撲類名】 【拓撲名稱】

樣例:storm jar /storm-starter.jar storm.starter.WordCountTopology wordcountTop

#提交storm-starter.jar到遠端叢集,並啟動wordcountTop拓撲。

 

【停止Topologies】

命令格式:storm kill 【拓撲名稱】

樣例:storm kill wordcountTop

#殺掉wordcountTop拓撲。

 

【啟動nimbus後臺程式】

命令格式:storm nimbus

 

【啟動supervisor後臺程式】

命令格式:storm supervisor

 

【啟動drpc服務】

命令格式:storm drpc

 

【啟動ui服務】

命令格式:storm ui

 

【啟動REPL】

REPL — read-evaluate-print-loop。

雖然clojure可以作為一種指令碼語言內嵌在java裡面,但是它的首選程式設計方式是使用REPL,這是一個簡單的命令列介面,

使用它你可以輸入你的命令,執行,然後檢視結果, 你可以以下面這個命令來啟動REPL:

命令格式:storm repl

 

【列印本地配置】

命令格式:storm localconfvalue [配置引數關鍵字]

舉例:storm localconfvalue storm.zookeeper.servers

#根據指定引數列印本地配置的值。

 

【列印遠端配置】

命令格式:storm remoteconfvalue [配置引數關鍵字]

舉例:storm remoteconfvalue storm.zookeeper.servers

#根據指定引數列印遠端配置的值。

 

【執行Shell指令碼】

命令格式:storm shell resourcesdir command args

 

【列印CLASSPATH】

命令格式:storm classpath

 

====Storm調優:

--調優物件

當一個topology在storm cluster中執行時,它的併發主要跟3個邏輯物件相關:worker => executor =>task。(=>代表1對N)

(1)Worker

Worker是執行在工作節點上面,被Supervisor守護程序建立的用來幹活的JVM程序。

每個Worker對應於一個給定topology的全部執行任務的一個子集。

反過來說,一個Worker裡面不會執行屬於不同的topology的執行任務。

它可以通過[storm rebalance]命令任意調整。


(2)Executor

可以理解成一個Worker程序中的工作執行緒。

一個Executor中只能執行隸屬於同一個component(spout/bolt)的task。

一個Worker程序中可以有一個或多個Executor執行緒。在預設情況下,一個Executor執行一個task。

 

每個component(spout/bolt)的併發度就是指executor數量。

 

它可以通過[storm rebalance]命令任意調整。


(3)Task

Task則是spout和bolt中具體要乾的活了。一個Executor可以負責1個或多個task。

同時,task也是各個節點之間進行grouping(partition)的單位。無法在執行時調整。

 

--設定方法:

conf.setNumWorkers(workers);                                        //設定worker數量

uilder.setBolt("2", new WordSpliter(),4)                             //設定Executor併發數量

builder.setBolt("2", new WordSpliter(),4).setNumTasks(1); //設定每個執行緒處理的Task數量


--任務分配:

任務分配是有下面兩種情況:

①、task數目比worker多:

例如task是[1 2 3 4],可用的slot(所謂slot就是可用的worker)只有[host1:port1,host2:port1],那麼最終是這樣分配
1:[host1:port1]

2:[host2:port1]

3:[host1:port1]

4:[host2:port1]


②、task數目比worker少:

例如task是[1 2],而worker有[host1:port1,host1:port2,host2:port1,host2:port2],

那麼首先會將woker排序,將不同host間隔排列,保證task不會全部分配到同一個機器上,也就是將worker排列成

[host1:port1,host2:port1,host1:port2,host2:port2]

然後分配任務為:

1:[host1:port1]

2:[host2:port1]

 

--簡單舉例:

通過Config.setNumWorkers(int))來指定一個storm叢集中執行topolgy的程序數量,所有的執行緒將在這些指定的worker程序中執行。

比如說一個topology中要啟動300個執行緒來執行spout/bolt,而指定的worker程序數量是60個。

那麼storm將會給每個worker分配5個執行緒來跑spout/bolt。

如果要對一個topology進行調優,可以調整worker數量和spout/bolt的parallelism(併發度,即executor)數量。

(調整引數之後要記得重新部署topology,後續會為該操作提供一個swapping的功能來減小重新部署的時間)。

 

例如:
builder.setBolt("cpp", new CppBolt(), 3).setNumTasks(5).noneGrouping(pre_name); 
會建立3個執行緒,但有記憶體中會5個CppBolt物件,3個執行緒排程5個物件。

 

--網上搜羅的一些經驗:
①、對於worker和task之間的比例,網上也給出了參考,。即1個worker包含10~15個左右。當然這個參考,實際情況還是要根據配置和測試情況。

②、executor數最大不能超過該bolt的task數。

 

--Strom叢集命令

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

[root@h2master bin]# storm 

Commands: 

        activate 

        classpath 

        deactivate 

        dev-zookeeper 

        drpc 

        help 命令幫助 

        jar   執行上傳的jar包 

        kill   殺死正在執行的topology 後面跟 topology的名稱 

        list   檢視執行的所有topology執行情況 

        localconfvalue 

        logviewer   啟動topology日誌 

        nimbus      啟動nimbus 

        rebalance   shell方式下修改topology執行引數比如worker個數 task個數等 

        remoteconfvalue 

        repl 

        shell 

        supervisor  啟動supervisor 

        ui              啟動topology ui介面 

        version    

   

Help:  

        help  

        help <command>

 

  1. [[email protected] bin]# storm  
  2. Commands:  
  3.         activate  
  4.         classpath  
  5.         deactivate  
  6.         dev-zookeeper  
  7.         drpc  
  8.         help 命令幫助  
  9.         jar   執行上傳的jar包  
  10.         kill   殺死正在執行的topology 後面跟 topology的名稱  
  11.         list   檢視執行的所有topology執行情況  
  12.         localconfvalue  
  13.         logviewer   啟動topology日誌  
  14.         nimbus      啟動nimbus  
  15.         rebalance   shell方式下修改topology執行引數比如worker個數 task個數等  
  16.         remoteconfvalue  
  17.         repl  
  18.         shell  
  19.         supervisor  啟動supervisor  
  20.         ui              啟動topology ui介面  
  21.         version     
  22.   
  23. Help:   
  24.         help   
  25.         help <command>

 

----Storm高可用HA

關於Storm的高可用,有以下幾個方面:

(1)資料利用階段可以通過ACK機制保證資料被處理

(2)在程序級別,worker失效,supervisor會自動重啟worker執行緒;

(3)在元件級別,supervisor節點失效,會在其他節點重啟該supervisor任務

但是,nimbus節點失效怎麼辦?

Supervisor程序和Nimbus程序,需要用Daemon程式如monit來啟動,失效時自動重新啟動。

如果Nimbus程序所在的機器都直接倒了,需要在其他機器上重新啟動,Storm目前沒有自建支援,需要自己寫指令碼實現。

即使Nimbus程序不在了,也只是不能部署新任務,有節點失效時不能重新分配而已,不影響已有的執行緒。

同樣,如果Supervisor程序失效,不影響已存在的Worker程序。

 

--END--

好文要頂 關注我 收藏該文