Storm命令詳解-Storm+kafka的HelloWorld初體驗
Storm命令詳解
2015年04月26日 15:08:08 lavimer 閱讀數:9171 標籤: Storm Storm命令詳解 Storm命令 更多
個人分類: Storm
所屬專欄: Storm實戰
版權宣告:本文為博主原創文章,未經博主允許不得轉載。 https://blog.csdn.net/lzm1340458776/article/details/45288503
在Linux終端直接輸入storm,不帶任何引數資訊,或者輸入storm help,可以檢視storm命令列客戶端(Command line client)提供的幫助資訊。Storm 0.9.0.1版本在Linux終端直接輸入storm後的輸出內容如下:
-
Commands:
-
activate
-
classpath
-
deactivate
-
dev-zookeeper
-
drpc
-
help
-
jar
-
kill
-
list
-
localconfvalue
-
logviewer
-
nimbus
-
rebalance
-
remoteconfvalue
-
repl
-
shell
-
supervisor
-
ui
-
version
-
Help:
-
help
-
help <command>
-
Documentation for the storm client can be found at https://github.com/nathanmarz/storm/wiki/Command-line-client
-
Configs can be overridden using one or more -c flags, e.g. "storm list -c nimbus.host=nimbus.mycompany.com"
注:由此可知,新版Storm的命令列客戶端提供了19個命令。
1.activate
啟用指定的拓撲。語法如下:
storm activate topology-name
2.classpath
打印出Storm客戶端執行命令時使用的類路徑(classpath)。語法如下:
storm classpath
3.deactivate
禁用指定的拓撲Spout。語法如下:
storm deactivate topology-name
4.dev-zookeeper
以dev.zookeeper.path配置的值作為本地目錄,以storm.zookeeper.port配置的值作為埠,啟動一個新的Zookeeper服務,僅用來開發/測試。語法如下:
storm dev-zookeeper
5.drpc (常用!)
啟動一個DRPC守護程序。語法如下:
storm drpc
注:該命令應該使用daemontools或者monit工具監控執行。
6.help (常用!)
列印一條幫助訊息或者可用命令的列表。語法如下:
-
storm help
-
storm help <command>
注:直接輸入不帶引數的storm,也可以啟動storm help命令。
7.jar (很常用!)
執行類的指定引數的main方法。語法如下:
storm jar topology-jar-path class ...
注:把Storm的jar檔案和"~/.storm"的配置放到類路徑(classpath)中,以便當拓撲提交時,StormSUbmitter會上傳topology-jar-path的jar檔案。
8.kill (常用!)
殺死名為topology-name的拓撲。語法如下:
storm kill topology-name [-w wait-time-secs]
注:storm首先會在拓撲的訊息超時時間期間禁用spout,以允許所有正在處理的訊息完成。然後,Storm將會關閉Worker並清理他們的狀態。可以使用-w標記覆蓋Storm在禁用與關閉期間等待的時間長度。
9.list (常用!)
列出正在執行的拓撲及其狀態。語法如下:
storm list
10.localconfvalue
打印出本地Storm配置的conf-name的值。語法如下:
storm localconfvalue conf-name
注:本地Storm配置是~/.storm/storm.yaml與defaults.yaml合併的結果。
11.logviewer (常用!)
啟動Logviewer守護程序。語法如下:
storm logviewer
注:Logviewer提供一個Web介面檢視Storm日誌檔案。該命令應該使用daemontools或者monit工具監控執行。
12.nimbus (常用!)
啟動Nimbus守護程序。語法如下:
storm nimbus
注:該命令應該使用daemontools或者monit工具監控執行。
13.rebalance (常用!)
再平衡即動態設定拓撲的程序數量和執行緒數量等。詳細內容見:這裡
14.remoteconfvalue
打印出遠端叢集Storm配置的conf-name的值。語法如下:
storm remoteconfvalue conf-name
注:叢集Storm配置是$STORM-PATH/conf/storm.yaml與defaults.yaml合併的結果。該命令必須在叢集節點上執行。
15.repl
開啟一個包含路徑(classpath)中的jar檔案和配置的Clojure REPL,以便除錯時使用。語法如下:
storm repl
注:Clojure可以作為一種指令碼語言內嵌到java中,但是Clojure的首選程式設計方式是使用REPL,REPL是一個簡單的命令列介面。使用REPL,可以輸入命令並執行,然後檢視結果。
16.shell
執行Shell指令碼。語法如下:
storm shell resourcesdir command args
17.supervisor (常用!)
啟動Supervisor守護程序。語法如下:
storm supervisor
注:該命令應該使用daemontools或者monit工具監控執行。
18.ui (常用!)
啟動UI守護程序。語法如下:
storm ui
注:UI為Storm叢集提供了一個Web介面並顯示執行拓撲的詳細統計資訊。該命令應該使用daemontools或者monit工具監控執行。
19.version
列印Storm釋出的版本號。語法如下:
storm version
附:文章引用自《從零開始學Storm》
Storm+kafka的HelloWorld初體驗
從16年4月5號開始學習kafka,後來由於專案需要又涉及到了storm。
經過幾天的掃盲,到今天16年4月13日,磕磕碰碰的總算是寫了一個kafka+storm的HelloWorld的例子。
為了達到前人栽樹後人乘涼的知識共享的目的,我嘗試著梳理一下過程。
====例項需求
由kafka訊息佇列源源不斷生產資料,然後由storm進行實時消費。
大家可以設想這些資料來源是不同商品的使用者行為操作行為,我們是不是就可以實時觀測到使用者關注商品的熱點呢?
====環境準備
(1)Linux:
公司暫時沒有多餘的Linux主機,所以我只能在自己的電腦上建立的3臺Linux虛擬機器。
虛擬機器的建立方法我做了一個小白級別的手冊,按照這個手冊就可以建立起虛擬機器了。
百度雲連線地址:http://pan.baidu.com/s/1hr3lVqG
(2)JDK:
我這裡使用的是:jdk-7u80-linux-x64.tar.gz。
在官方網站上下載,然後配置環境變數即可。
(3)zookeeper叢集:
搭建方法省略。可以參照我的部落格:http://www.cnblogs.com/quchunhui/p/5356511.html
(4)kafka:
搭建方法省略。可以參照我的部落格:http://www.cnblogs.com/quchunhui/p/5356511.html
(5)storm:
我這裡使用的版本是相對穩定的:apache-storm-0.9.5.tar.gz
搭建方法省略,可以參照我的部落格:http://www.cnblogs.com/quchunhui/p/5370191.html
(6)Maven:
開發環境的構建使用Maven。我這裡使用的版本是:apache-maven-3.3.3.zip
Maven的入門可以參考我的部落格:http://www.cnblogs.com/quchunhui/p/5359293.html
補充一下環境變數配置之後的圖,以供小白參考。
====程式執行方式
(1)kafka:
需要手動編寫kafka的生產者程式,然後通過eclipse等工具在Windows端啟動,以達到生產訊息的目的。
(2)storm:
可以進行兩種方式的啟動。一種是通過eclipse等工具在Windows端啟動(俗稱本地模式)
另一種是將storm的消費者程式打成jar包釋出到Linux環境上,通過Linux啟動程式進行消費(俗稱叢集模式)。
====Storm框架前期理解
從某位大神的QQ群組裡下載了一篇關於storm的基本框架以及安裝的文章
我這裡共享到了我的百度雲盤上了,請大家在開始程式設計之前一定要看看。非常值得一看。
百度雲地址:http://pan.baidu.com/s/1boRcCeb
那麼後面我們就可以開始編寫我們的程式了。首先需要編寫的是kafka的生產者程式。
====kafka程式相關:
我已經寫好的程式碼共享到了Github上了:https://github.com/quchunhui/kafkaSample/
這裡只對目錄結構以及重要部分進行說明:
(1)src/main路徑結構如下:
+---common
| Constants.java //這裡統一定義了所有的常量,修改配置的時候只修改這裡就可以。
|
+---consumer
| +---group
| | GroupConsumer.java //kafka消費者程式。消費模型:分組消費
| |
| \---partition
| PartitionConsumer.java //kafka消費者程式。消費模型:分割槽消費
|
+---producer
| +---async
| | AsyncProduce.java //kafka生產者程式。生產模型:非同步生產(本次例項相關)
| |
| +---partiton
| | SimplePartitioner.java //message的序列化類
| |
| \---sync
| SyncProduce.java //kafka生產者程式。生產模型:同步生產
|
\---utilities
CommonUtil.java //共通方法類。
(2)例項所用的程式碼:
本次例項中,僅僅使用了kafka進行訊息的生產,同事考慮到非同步生產效能更高一些,
本次例項中使用了非同步生產的程式碼,就是上面紅色字標記的java程式(AsyncProduce.java)。
程式碼本身比較簡單,其中下面紅色框的部分為【非同步】的配置項,需要注意。
各個配置項的說明請參考我的另一篇部落格:http://www.cnblogs.com/quchunhui/p/5357040.html
====Storm程式相關:
(1)拓撲設計
【訊息源(RandomSentenceSpout)】
接入到從上面的kafka訊息佇列中,將kafka作為storm的訊息源。
【資料標準化(WordNormalizerBolt)】
然後使用一個Bolt進行歸一化(語句切分),句子切分成單詞發射出去。(程式碼更新中。。。)
【詞頻統計(WordCountBolt)】
使用一個Bolt接受訂閱切分的單詞Tuple,進行單詞統計,並且選擇使用按欄位分組的策略,詞頻實時排序,把TopN實時發射出去。(程式碼更新中。。。)
【工具類(PrintBolt)】
最後使用一哥Bolt將結果列印到Log中。(程式碼更新中。。。)
====例項程式碼
我自己進行驗證用的程式碼已經上傳到Github上了,可以直接下載下來使用。
這裡只對程式碼的目錄結構以及需要格外關注的點進行一些補充。
Git地址:https://github.com/quchunhui/storm-kafka-plus-qch
(1)目錄結構
src\main\java\com\dscn\helloworld
|
| WordCountTopology.java // Topology程式碼,程式入口,使用eclipse是需要執行該程式。
|
+---bolt
| PrintBolt.java // 上面講到的工具類(PrintBolt)類
| WordCountBolt.java // 上面講到的詞頻統計(WordCountBolt)類
| WordNormalizerBolt.java // 上面講到的資料標準化(WordNormalizerBolt)類
|
\---spout
RandomSentenceSpout.java // 未使用
(2)重要程式碼說明
由於原始碼已經共享給大家了,Storm的介面的用法在下面的篇幅中單獨羅列了一下,我這裡不進行過多的闡述。
在這裡只將我碰到過的問題羅列出來、以問題&解決方法的形式分享。
【問題1】
storm是如何實現與kafka的對接的
【回答】
Spout作為storm的訊息源的接入點,在這裡可以同構設定Storm官方提供【storm.kafka.SpoutConfig】類來指定訊息源。
----------------
//配置zookeeper叢集地址,畢竟storm是需要叢集支援的。
BrokerHosts brokerHosts = new ZkHosts("192.168.93.128:2181,192.168.93.129:2181,192.168.93.130:2181");
//配置Kafka訂閱的Topic,以及zookeeper中資料節點目錄和名字
SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "qchlocaltest", "", "topo");
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
//如果該Topology因故障停止處理,下次正常執行時是否從Spout對應資料來源Kafka中的該訂閱Topic的起始位置開始讀取
spoutConfig.forceFromStart = true;
//zookeeper叢集host
spoutConfig.zkServers = Arrays.asList(new String[] {"192.168.93.128", "192.168.93.129", "192.168.93.130"});
//zookeeper叢集port
spoutConfig.zkPort = 2181;
----------------
【問題2】
我嘗試著自己重新寫程式碼配置開發環境(不是直接使用Github上的程式碼),
編譯時可以正常通過的,但是本地模式通過eclipse啟動Topology的時候,出現了log4j和slf4j的衝突問題。
【解決方法】
問題原因是由於log4j和slf4j之間的重複呼叫,導致死迴圈而致使記憶體溢位。
解決辦法就是log4j和slf4j保留一個, 普遍上都是保留slf4j的。
需要在Maven的pom.xml上將log4j的相關依賴移除。
移除方法:
可以通過【mvn dependency:tree】命令來檢視修改之後的依賴關係。
如果發現需要移除的包的時候,使用Maven的【exclusion】標籤來移除依賴關係。
填寫exclusion標籤的時候,下圖中紅色的部分是groupId,藍色的部分是artifactId。
【問題3】
使用mvn install命令將程式打jar包上傳到Linux的storm目錄下,然後使用命令
[storm jar test-0.1-jar-with-dependencies.jar com.dscn.helloworld.WordCountTopology 192.168.93.128]
啟動Topology的時候,出現了下面的提示錯誤。
【解決方法】
是Maven的pom.xml的配置出現了問題。詳細請參考部落格:http://blog.csdn.net/luyee2010/article/details/18455237
修改方法就是強storm的scope修改為provided。如下圖所示:
【問題4】
將程式碼放到實際的叢集執行環境(kafka+storm+hbase)中,發現storm接受不到訊息。
【原因】
一直以來都是使用kafka的非同步生產來生產訊息,以為都正常的生產訊息了。由於非同步生產的時候,並沒有訊息確認機制,
所以不能確保訊息是否正確的進入到了訊息佇列之中,改用同步生產的程式碼嘗試了一下,果然發生了一下的錯誤。
【解決辦法】
通過網上搜索[kafka Failed to send messages]關鍵字,發現有可能是需要設定advertised.host.name這個屬性。
抱著嘗試一下的心態試了一下,果然好使了。至於這個屬性的真正意義還有待探索。(TODO)
【問題5】
程式碼在本地的時候好好的,通過storm jar命令釋出到叢集環境的時候,發生了Jar包衝突的問題。
【解決方法】
本來是認為自己的Maven環境的依賴有問題,也通過mvn dependency:tree查看了依賴關係,毫無問題。根本就誒有log4j-over-slf4j.jar這個包。
頭疼了很久,通過QQ群諮詢了一些朋友,他們建議我確認叢集環境中storm/lib下是否存在log4j-over-slf4j.jar,如果存在就把它刪掉。
嘗試了一下之後,果然好使了。原來是我的程式的jar包和叢集環境中會有衝突。詳細請參考我的另一篇部落格:
http://www.cnblogs.com/quchunhui/p/5404168.html
====Storm介面詳解:
【IComponent介面】
Spout和Bolt都是其Component。所以,Storm定義了一個名叫IComponent的總介面。
IComponent的繼承關係如下圖所示:
綠色部分是我們最常用、比較簡單的部分。紅色部分是與事務相關。
BaseComponent 是Storm提供的“偷懶”的類。為什麼這麼說呢,它及其子類,都或多或少實現了其介面定義的部分方法。
這樣我們在用的時候,可以直接繼承該類,而不是自己每次都寫所有的方法。
但值得一提的是,BaseXXX這種定義的類,它所實現的方法,都是空的,直接返回null。
【Spout】
類圖如下圖所示:
介面如下圖所示:
各個介面說明:
①、open方法:
是初始化動作。允許你在該spout初始化時做一些動作,傳入了上下文,方便取上下文的一些資料。
②、close方法
在該spout關閉前執行,但是並不能得到保證其一定被執行。
spout是作為task執行在worker內,在cluster模式下,supervisor會直接kill -9 woker的程序,這樣它就無法執行了。
而在本地模式下,只要不是kill -9, 如果是傳送停止命令,是可以保證close的執行的。
③、activate和deactivate方法 :
一個spout可以被暫時啟用和關閉,這兩個方法分別在對應的時刻被呼叫。
④、nextTuple方法:
負責訊息的接入,執行資料發射。是Spout中的最重要方法。
⑤、ack(Object)方法:
傳入的Object其實是一個id,唯一表示一個tuple。該方法是這個id所對應的tuple被成功處理後執行。
⑥、fail(Object)方法:
同ack,只不過是tuple處理失敗時執行。
我們的RandomSpout由於繼承了BaseRichSpout,
所以不用實現close、activate、deactivate、ack、fail和getComponentConfiguration方法,只關心最基本核心的部分。
結論:
通常情況下(Shell和事務型的除外),實現一個Spout,可以直接實現介面IRichSpout,如果不想寫多餘的程式碼,可以直接繼承BaseRichSpout。
【Bolt】
類圖如下圖所示:
這裡可以看到一個奇怪的問題: 為什麼IBasicBolt並沒有繼承IBolt? 我們帶著問題往下看。
IBolt定義了三個方法:
①、prepare方法:
IBolt繼承了java.io.Serializable,我們在nimbus上提交了topology以後,創建出來的bolt會序列化後傳送到具體執行的worker上去。
worker在執行該Bolt時,會先呼叫prepare方法傳入當前執行的上下文。
②、execute方法:
接受一個tuple進行處理,並用prepare方法傳入的OutputCollector的ack方法(表示成功)或fail(表示失敗)來反饋處理結果。
③、cleanup方法:
同ISpout的close方法,在關閉前呼叫。同樣不保證其一定執行。
紅色部分(execute方法)是Bolt實現時一定要注意的地方。
而Storm提供了IBasicBolt介面,其目的就是實現該介面的Bolt不用在程式碼中提供反饋結果了,Storm內部會自動反饋成功。
如果你確實要反饋失敗,可以丟擲FailedException。
我們來再寫一個Bolt繼承BaseRichBolt替代ExclaimBasicBolt。程式碼如下:
修改topology
執行下,結果一致。
結論:
通常情況下,實現一個Bolt,可以實現IRichBolt介面或繼承BaseRichBolt,
如果不想自己處理結果反饋,可以實現IBasicBolt介面或繼承BaseBasicBolt,它實際上相當於自動做掉了prepare方法和collector.emit.ack(inputTuple);
====推薦部落格:
【整合實戰類】:
http://shiyanjun.cn/archives/934.html
http://www.tuicool.com/articles/NzyqAn
http://itindex.net/detail/51477-storm-筆記-kafka
【問題解決類】:
http://www.aboutyun.com/thread-12590-1-1.html
【Storm調優類】:
http://blog.csdn.net/derekjiang/article/details/9040243
http://www.51studyit.com/html/notes/20140329/45.html
--END--