1. 程式人生 > >Storm命令詳解-Storm+kafka的HelloWorld初體驗

Storm命令詳解-Storm+kafka的HelloWorld初體驗

Storm命令詳解

2015年04月26日 15:08:08 lavimer 閱讀數:9171 標籤: Storm Storm命令詳解 Storm命令 更多

個人分類: Storm

所屬專欄: Storm實戰

版權宣告:本文為博主原創文章,未經博主允許不得轉載。 https://blog.csdn.net/lzm1340458776/article/details/45288503

在Linux終端直接輸入storm,不帶任何引數資訊,或者輸入storm help,可以檢視storm命令列客戶端(Command line client)提供的幫助資訊。Storm 0.9.0.1版本在Linux終端直接輸入storm後的輸出內容如下:

 

 
  1. Commands:

  2. activate

  3. classpath

  4. deactivate

  5. dev-zookeeper

  6. drpc

  7. help

  8. jar

  9. kill

  10. list

  11. localconfvalue

  12. logviewer

  13. nimbus

  14. rebalance

  15. remoteconfvalue

  16. repl

  17. shell

  18. supervisor

  19. ui

  20. version

  21.  
  22. Help:

  23. help

  24. help <command>

  25.  
  26. Documentation for the storm client can be found at https://github.com/nathanmarz/storm/wiki/Command-line-client

  27.  
  28. Configs can be overridden using one or more -c flags, e.g. "storm list -c nimbus.host=nimbus.mycompany.com"

注:由此可知,新版Storm的命令列客戶端提供了19個命令。

 

1.activate

啟用指定的拓撲。語法如下:

 

storm activate topology-name

 

2.classpath

打印出Storm客戶端執行命令時使用的類路徑(classpath)。語法如下:

 

storm classpath

 

3.deactivate

禁用指定的拓撲Spout。語法如下:

 

storm deactivate topology-name

 

4.dev-zookeeper

以dev.zookeeper.path配置的值作為本地目錄,以storm.zookeeper.port配置的值作為埠,啟動一個新的Zookeeper服務,僅用來開發/測試。語法如下:

 

storm dev-zookeeper


5.drpc (常用!)

啟動一個DRPC守護程序。語法如下:

 

storm drpc

注:該命令應該使用daemontools或者monit工具監控執行。

 

6.help (常用!)

列印一條幫助訊息或者可用命令的列表。語法如下:

 

 
  1. storm help

  2. storm help <command>

注:直接輸入不帶引數的storm,也可以啟動storm help命令。

 

7.jar (很常用!)

執行類的指定引數的main方法。語法如下:

 

storm jar topology-jar-path class ...

注:把Storm的jar檔案和"~/.storm"的配置放到類路徑(classpath)中,以便當拓撲提交時,StormSUbmitter會上傳topology-jar-path的jar檔案。

 

8.kill (常用!)

殺死名為topology-name的拓撲。語法如下:

 

storm kill topology-name [-w wait-time-secs]

注:storm首先會在拓撲的訊息超時時間期間禁用spout,以允許所有正在處理的訊息完成。然後,Storm將會關閉Worker並清理他們的狀態。可以使用-w標記覆蓋Storm在禁用與關閉期間等待的時間長度。

 

9.list (常用!)

列出正在執行的拓撲及其狀態。語法如下:

 

storm list



10.localconfvalue

打印出本地Storm配置的conf-name的值。語法如下:

 

storm localconfvalue conf-name

注:本地Storm配置是~/.storm/storm.yaml與defaults.yaml合併的結果。

 

11.logviewer (常用!)

啟動Logviewer守護程序。語法如下:

 

storm logviewer

注:Logviewer提供一個Web介面檢視Storm日誌檔案。該命令應該使用daemontools或者monit工具監控執行。

 

12.nimbus (常用!)

啟動Nimbus守護程序。語法如下:

 

storm nimbus

注:該命令應該使用daemontools或者monit工具監控執行。

 

13.rebalance (常用!)

再平衡即動態設定拓撲的程序數量和執行緒數量等。詳細內容見:這裡

 

14.remoteconfvalue

打印出遠端叢集Storm配置的conf-name的值。語法如下:

 

storm remoteconfvalue conf-name

注:叢集Storm配置是$STORM-PATH/conf/storm.yaml與defaults.yaml合併的結果。該命令必須在叢集節點上執行。

 

15.repl

開啟一個包含路徑(classpath)中的jar檔案和配置的Clojure REPL,以便除錯時使用。語法如下:

 

storm repl

注:Clojure可以作為一種指令碼語言內嵌到java中,但是Clojure的首選程式設計方式是使用REPL,REPL是一個簡單的命令列介面。使用REPL,可以輸入命令並執行,然後檢視結果。

 

16.shell

執行Shell指令碼。語法如下:

 

storm shell resourcesdir command args


17.supervisor (常用!)

啟動Supervisor守護程序。語法如下:

 

storm supervisor

注:該命令應該使用daemontools或者monit工具監控執行。

 

18.ui (常用!)

啟動UI守護程序。語法如下:

 

storm ui

注:UI為Storm叢集提供了一個Web介面並顯示執行拓撲的詳細統計資訊。該命令應該使用daemontools或者monit工具監控執行。

 

19.version

列印Storm釋出的版本號。語法如下:

 

storm version


 

附:文章引用自《從零開始學Storm》

 

Storm+kafka的HelloWorld初體驗

 

從16年4月5號開始學習kafka,後來由於專案需要又涉及到了storm。

經過幾天的掃盲,到今天16年4月13日,磕磕碰碰的總算是寫了一個kafka+storm的HelloWorld的例子。

為了達到前人栽樹後人乘涼的知識共享的目的,我嘗試著梳理一下過程。

 

====例項需求

由kafka訊息佇列源源不斷生產資料,然後由storm進行實時消費。

大家可以設想這些資料來源是不同商品的使用者行為操作行為,我們是不是就可以實時觀測到使用者關注商品的熱點呢?

 

====環境準備

(1)Linux:

 

公司暫時沒有多餘的Linux主機,所以我只能在自己的電腦上建立的3臺Linux虛擬機器。

虛擬機器的建立方法我做了一個小白級別的手冊,按照這個手冊就可以建立起虛擬機器了。

 

百度雲連線地址:http://pan.baidu.com/s/1hr3lVqG

 

(2)JDK:

 

我這裡使用的是:jdk-7u80-linux-x64.tar.gz。

在官方網站上下載,然後配置環境變數即可。

 

 

(3)zookeeper叢集:

搭建方法省略。可以參照我的部落格:http://www.cnblogs.com/quchunhui/p/5356511.html

 

(4)kafka:

搭建方法省略。可以參照我的部落格:http://www.cnblogs.com/quchunhui/p/5356511.html

 

 (5)storm:

我這裡使用的版本是相對穩定的:apache-storm-0.9.5.tar.gz

搭建方法省略,可以參照我的部落格:http://www.cnblogs.com/quchunhui/p/5370191.html

 

(6)Maven:

開發環境的構建使用Maven。我這裡使用的版本是:apache-maven-3.3.3.zip

Maven的入門可以參考我的部落格:http://www.cnblogs.com/quchunhui/p/5359293.html

 

補充一下環境變數配置之後的圖,以供小白參考。

 

====程式執行方式

(1)kafka:

需要手動編寫kafka的生產者程式,然後通過eclipse等工具在Windows端啟動,以達到生產訊息的目的。

 

(2)storm:

可以進行兩種方式的啟動。一種是通過eclipse等工具在Windows端啟動(俗稱本地模式)

另一種是將storm的消費者程式打成jar包釋出到Linux環境上,通過Linux啟動程式進行消費(俗稱叢集模式)。

 

====Storm框架前期理解

從某位大神的QQ群組裡下載了一篇關於storm的基本框架以及安裝的文章

我這裡共享到了我的百度雲盤上了,請大家在開始程式設計之前一定要看看。非常值得一看。

百度雲地址:http://pan.baidu.com/s/1boRcCeb

 

那麼後面我們就可以開始編寫我們的程式了。首先需要編寫的是kafka的生產者程式。

 

====kafka程式相關:

我已經寫好的程式碼共享到了Github上了:https://github.com/quchunhui/kafkaSample/

這裡只對目錄結構以及重要部分進行說明:

(1)src/main路徑結構如下:

+---common

| Constants.java                   //這裡統一定義了所有的常量,修改配置的時候只修改這裡就可以。
|
+---consumer
| +---group
| | GroupConsumer.java        //kafka消費者程式。消費模型:分組消費
| |
| \---partition
| PartitionConsumer.java       //kafka消費者程式。消費模型:分割槽消費
|
+---producer
| +---async
| | AsyncProduce.java           //kafka生產者程式。生產模型:非同步生產(本次例項相關)
| |
| +---partiton
| | SimplePartitioner.java       //message的序列化類
| |
| \---sync
| SyncProduce.java              //kafka生產者程式。生產模型:同步生產
|
\---utilities
CommonUtil.java                 //共通方法類。

 

(2)例項所用的程式碼:

本次例項中,僅僅使用了kafka進行訊息的生產,同事考慮到非同步生產效能更高一些,

本次例項中使用了非同步生產的程式碼,就是上面紅色字標記的java程式(AsyncProduce.java)。

程式碼本身比較簡單,其中下面紅色框的部分為【非同步】的配置項,需要注意。

各個配置項的說明請參考我的另一篇部落格:http://www.cnblogs.com/quchunhui/p/5357040.html

 

 

====Storm程式相關:

(1)拓撲設計

【訊息源(RandomSentenceSpout)】

接入到從上面的kafka訊息佇列中,將kafka作為storm的訊息源。

【資料標準化(WordNormalizerBolt)】

然後使用一個Bolt進行歸一化(語句切分),句子切分成單詞發射出去。(程式碼更新中。。。)

【詞頻統計(WordCountBolt)】

使用一個Bolt接受訂閱切分的單詞Tuple,進行單詞統計,並且選擇使用按欄位分組的策略,詞頻實時排序,把TopN實時發射出去。(程式碼更新中。。。)

【工具類(PrintBolt)】

最後使用一哥Bolt將結果列印到Log中。(程式碼更新中。。。)

 

====例項程式碼

我自己進行驗證用的程式碼已經上傳到Github上了,可以直接下載下來使用。

這裡只對程式碼的目錄結構以及需要格外關注的點進行一些補充。

Git地址:https://github.com/quchunhui/storm-kafka-plus-qch

 

(1)目錄結構

src\main\java\com\dscn\helloworld


| WordCountTopology.java         // Topology程式碼,程式入口,使用eclipse是需要執行該程式。
|
+---bolt
| PrintBolt.java                          // 上面講到的工具類(PrintBolt)類
| WordCountBolt.java                // 上面講到的詞頻統計(WordCountBolt)類
| WordNormalizerBolt.java          // 上面講到的資料標準化(WordNormalizerBolt)類
|
\---spout
RandomSentenceSpout.java       // 未使用

 

(2)重要程式碼說明

由於原始碼已經共享給大家了,Storm的介面的用法在下面的篇幅中單獨羅列了一下,我這裡不進行過多的闡述。

在這裡只將我碰到過的問題羅列出來、以問題&解決方法的形式分享。 

 

【問題1】

storm是如何實現與kafka的對接的

【回答】

Spout作為storm的訊息源的接入點,在這裡可以同構設定Storm官方提供【storm.kafka.SpoutConfig】類來指定訊息源。

 

----------------

//配置zookeeper叢集地址,畢竟storm是需要叢集支援的。

BrokerHosts brokerHosts = new ZkHosts("192.168.93.128:2181,192.168.93.129:2181,192.168.93.130:2181");

//配置Kafka訂閱的Topic,以及zookeeper中資料節點目錄和名字

SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "qchlocaltest", "", "topo");

spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

//如果該Topology因故障停止處理,下次正常執行時是否從Spout對應資料來源Kafka中的該訂閱Topic的起始位置開始讀取
spoutConfig.forceFromStart = true;

//zookeeper叢集host

spoutConfig.zkServers = Arrays.asList(new String[] {"192.168.93.128", "192.168.93.129", "192.168.93.130"});

//zookeeper叢集port

spoutConfig.zkPort = 2181;

----------------

 

【問題2】

我嘗試著自己重新寫程式碼配置開發環境(不是直接使用Github上的程式碼),

編譯時可以正常通過的,但是本地模式通過eclipse啟動Topology的時候,出現了log4j和slf4j的衝突問題。

【解決方法】

問題原因是由於log4j和slf4j之間的重複呼叫,導致死迴圈而致使記憶體溢位。

解決辦法就是log4j和slf4j保留一個, 普遍上都是保留slf4j的。

需要在Maven的pom.xml上將log4j的相關依賴移除。

移除方法:

 

可以通過【mvn dependency:tree】命令來檢視修改之後的依賴關係。

如果發現需要移除的包的時候,使用Maven的【exclusion】標籤來移除依賴關係。

填寫exclusion標籤的時候,下圖中紅色的部分是groupId,藍色的部分是artifactId。

 

 

【問題3】

使用mvn install命令將程式打jar包上傳到Linux的storm目錄下,然後使用命令

[storm jar test-0.1-jar-with-dependencies.jar com.dscn.helloworld.WordCountTopology 192.168.93.128]

啟動Topology的時候,出現了下面的提示錯誤。

【解決方法】

是Maven的pom.xml的配置出現了問題。詳細請參考部落格:http://blog.csdn.net/luyee2010/article/details/18455237

修改方法就是強storm的scope修改為provided。如下圖所示:

 

 

【問題4】

將程式碼放到實際的叢集執行環境(kafka+storm+hbase)中,發現storm接受不到訊息。

【原因】

一直以來都是使用kafka的非同步生產來生產訊息,以為都正常的生產訊息了。由於非同步生產的時候,並沒有訊息確認機制,

所以不能確保訊息是否正確的進入到了訊息佇列之中,改用同步生產的程式碼嘗試了一下,果然發生了一下的錯誤。

【解決辦法】

通過網上搜索[kafka Failed to send messages]關鍵字,發現有可能是需要設定advertised.host.name這個屬性。

抱著嘗試一下的心態試了一下,果然好使了。至於這個屬性的真正意義還有待探索。(TODO)

 

【問題5】

程式碼在本地的時候好好的,通過storm jar命令釋出到叢集環境的時候,發生了Jar包衝突的問題。

【解決方法】

本來是認為自己的Maven環境的依賴有問題,也通過mvn dependency:tree查看了依賴關係,毫無問題。根本就誒有log4j-over-slf4j.jar這個包。

頭疼了很久,通過QQ群諮詢了一些朋友,他們建議我確認叢集環境中storm/lib下是否存在log4j-over-slf4j.jar,如果存在就把它刪掉。

嘗試了一下之後,果然好使了。原來是我的程式的jar包和叢集環境中會有衝突。詳細請參考我的另一篇部落格:

http://www.cnblogs.com/quchunhui/p/5404168.html

 

====Storm介面詳解:

【IComponent介面】

Spout和Bolt都是其Component。所以,Storm定義了一個名叫IComponent的總介面。

 

IComponent的繼承關係如下圖所示:

綠色部分是我們最常用、比較簡單的部分。紅色部分是與事務相關。

BaseComponent 是Storm提供的“偷懶”的類。為什麼這麼說呢,它及其子類,都或多或少實現了其介面定義的部分方法。

這樣我們在用的時候,可以直接繼承該類,而不是自己每次都寫所有的方法。

但值得一提的是,BaseXXX這種定義的類,它所實現的方法,都是空的,直接返回null。

 

【Spout】

類圖如下圖所示:

 

介面如下圖所示:

 

各個介面說明:

①、open方法:

是初始化動作。允許你在該spout初始化時做一些動作,傳入了上下文,方便取上下文的一些資料。 

②、close方法

在該spout關閉前執行,但是並不能得到保證其一定被執行。

spout是作為task執行在worker內,在cluster模式下,supervisor會直接kill -9 woker的程序,這樣它就無法執行了。

而在本地模式下,只要不是kill -9, 如果是傳送停止命令,是可以保證close的執行的。 

③、activate和deactivate方法 :

一個spout可以被暫時啟用和關閉,這兩個方法分別在對應的時刻被呼叫。 

④、nextTuple方法:

負責訊息的接入,執行資料發射。是Spout中的最重要方法。

⑤、ack(Object)方法:

傳入的Object其實是一個id,唯一表示一個tuple。該方法是這個id所對應的tuple被成功處理後執行。 

⑥、fail(Object)方法:

同ack,只不過是tuple處理失敗時執行。 

 

我們的RandomSpout由於繼承了BaseRichSpout,

所以不用實現close、activate、deactivate、ack、fail和getComponentConfiguration方法,只關心最基本核心的部分。 

 

結論:

通常情況下(Shell和事務型的除外),實現一個Spout,可以直接實現介面IRichSpout,如果不想寫多餘的程式碼,可以直接繼承BaseRichSpout。 

 

【Bolt】

類圖如下圖所示:

 

這裡可以看到一個奇怪的問題: 為什麼IBasicBolt並沒有繼承IBolt? 我們帶著問題往下看。 

IBolt定義了三個方法: 

①、prepare方法:

IBolt繼承了java.io.Serializable,我們在nimbus上提交了topology以後,創建出來的bolt會序列化後傳送到具體執行的worker上去。

worker在執行該Bolt時,會先呼叫prepare方法傳入當前執行的上下文。

②、execute方法:

接受一個tuple進行處理,並用prepare方法傳入的OutputCollector的ack方法(表示成功)或fail(表示失敗)來反饋處理結果。

③、cleanup方法:

同ISpout的close方法,在關閉前呼叫。同樣不保證其一定執行。

 

紅色部分(execute方法)是Bolt實現時一定要注意的地方。

而Storm提供了IBasicBolt介面,其目的就是實現該介面的Bolt不用在程式碼中提供反饋結果了,Storm內部會自動反饋成功。

如果你確實要反饋失敗,可以丟擲FailedException。

 

我們來再寫一個Bolt繼承BaseRichBolt替代ExclaimBasicBolt。程式碼如下:

修改topology

執行下,結果一致。

 

結論:

通常情況下,實現一個Bolt,可以實現IRichBolt介面或繼承BaseRichBolt,

如果不想自己處理結果反饋,可以實現IBasicBolt介面或繼承BaseBasicBolt,它實際上相當於自動做掉了prepare方法和collector.emit.ack(inputTuple);

 

====推薦部落格:

【整合實戰類】:

http://shiyanjun.cn/archives/934.html

http://www.tuicool.com/articles/NzyqAn

http://itindex.net/detail/51477-storm-筆記-kafka

【問題解決類】:

http://www.aboutyun.com/thread-12590-1-1.html

【Storm調優類】:

http://blog.csdn.net/derekjiang/article/details/9040243

http://www.51studyit.com/html/notes/20140329/45.html

 

--END--