1. 程式人生 > >2017-08-14 flume+kafka+storm+hdfs整合

2017-08-14 flume+kafka+storm+hdfs整合

基礎環境:

Redhat 5.5 64位(我這裡是三臺虛擬機器h40,h41,h42)

myeclipse 8.5

jdk1.7.0_25

zookeeper-3.4.5叢集
apache-storm-0.9.5叢集

kafka_2.10-0.8.2.0叢集

apache-flume-1.6.0-bin(h40主節點裝就行)

整合flume+kafka
flume-1.6.0已經有自帶的kafkasink,我這裡用的是它自帶的外掛,你也可以用其他開源的flumeng-kafka-plugin.jar外掛,並且將你引用的這個jar包匯入到flume的lib目錄下。
[

[email protected] conf]$ cat kafka.conf 

  1. a1.sources = r1  
  2. a1.sinks = k1  
  3. a1.channels = c1  
  4. # Describe/configure the source  
  5. a1.sources.r1.type = exec  
  6. a1.sources.r1.command = tail -F /home/hadoop/data.txt  
  7. a1.sources.r1.port = 44444  
  8. a1.sources.r1.host = 192.168.8.40  
  9. a1.sources.r1.channels = c1  
  10. # Describe the sink  
  11. #引用開源的flumeng-kafka-plugin.jar的sink配置  
  12. #a1.sinks.k1.type = org.apache.flume.plugins.KafkaSink  
  13. #a1.sinks.k1.metadata.broker.list=h40:9092,h41:9092,h42:9092  
  14. #a1.sinks.k1.partition.key=0  
  15. #a1.sinks.k1.partitioner.class=org.apache.flume.plugins.SinglePartition  
  16. #a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder  
  17. #a1.sinks.k1.request.required.acks=0  
  18. #a1.sinks.k1.max.message.size=1000000  
  19. #a1.sinks.k1.producer.type=sync  
  20. #a1.sinks.k1.custom.encoding=UTF-8  
  21. #a1.sinks.k1.custom.topic.name=test  
  22. #kafka自帶的kafkasink的sink配置  
  23. a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink  
  24. a1.sinks.k1.topic = test  
  25. a1.sinks.k1.brokerList = h40:9092,h41:9092,h42:9092  
  26. a1.sinks.k1.requiredAcks = 1  
  27. a1.sinks.k1.batchSize = 20  
  28. a1.sinks.k1.channel = c1  
  29. # Use a channel which buffers events in memory  
  30. a1.channels.c1.type = memory  
  31. a1.channels.c1.capacity = 1000  
  32. a1.channels.c1.transactionCapacity = 100  
  33. # Bind the source and sink to the channel  
  34. a1.sources.r1.channels = c1  
  35. a1.sinks.k1.channel = c1  
啟動Kafka監控工具:
KafkaOffsetMonitor執行比較簡單,因為所有執行檔案,資原始檔,jar檔案都打包到KafkaOffsetMonitor-assembly-0.2.0.jar了,直接執行就可以,這種方式太棒了。既不用編譯也不用配置,呵呵,也不是絕對不配置。
a.新建一個目錄kafka-offset-console,然後把jar拷貝到該目錄下

參考:
http://blog.csdn.NET/lizhitao/article/details/27199863
http://www.cnblogs.com/smartloli/p/4615908.html

[[email protected] ~]$ mkdir kafka-offset-console
[[email protected] ~]$ ls kafka-offset-console/
KafkaOffsetMonitor-assembly-0.2.0.jar

在kafka-offset-console目錄下執行該命令執行在後臺:

  1. java -cp KafkaOffsetMonitor-assembly-0.2.0.jar \  
  2.  com.quantifind.kafka.offsetapp.OffsetGetterWeb \  
  3.  --zk h40:2181,h41:2181,h42:2181/kafka \  
  4.  --port 8089 \  
  5.  --refresh 10.seconds \  
  6.  --retain 1.days 1>/dev/null 2>&1 &  

[[email protected] apache-flume-1.6.0-bin]$ bin/flume-ng agent -c . -f conf/kafka.conf -n a1 -Dflume.root.logger=INFO,console
[[email protected] ~]$ echo "hello world" >> data.txt

檢驗:
在h40節點的kafka消費者視窗可見“hello world”,說明整合成功!
[[email protected] kafka_2.10-0.8.2.0]$ bin/kafka-console-consumer.sh --zookeeper h40:2181,h41:2181,h42:2181/kafka --topic  test  --from-beginning
hello world

預覽:(在瀏覽器上輸入http://192.168.8.40:8089)
我們通過Kafka的監控工具,來預覽我們上傳的日誌記錄,有沒有在Kafka中產生訊息資料
(如果對英文不是很熟悉的話,還可以用谷歌瀏覽器將頁面翻譯成中文,這樣更方便讀取資訊)
(後來我又在網上看到了另一個kafka的監控工具https://github.com/smartloli/kafka-eagle,但感覺這個比上一個要複雜一些,這個我還沒有親自測試安裝,不知道效果如何)

Storm安裝配置:

Storm叢集也依賴Zookeeper叢集,要保證Zookeeper叢集正常執行。Storm的安裝配置比較簡單,我們仍然使用下面3臺機器搭建:
192.168.4.142   h40
192.168.4.143   h41
192.168.4.144   h42

[[email protected] ~]$ tar -zxvf apache-storm-0.9.5.tar.gz

然後,修改配置檔案conf/storm.yaml,新增如下內容:

  1. storm.zookeeper.servers:  
  2.     - "h40"  
  3.     - "h41"  
  4.     - "h42"  
  5. storm.zookeeper.port: 2181  
  6. nimbus.host: "h40"  
  7. supervisor.slots.ports:  
  8.     - 6700  
  9.     - 6701  
  10.     - 6702  
  11.     - 6703  
  12. 這個配置非常的膈應人,在某些地方必須加空格,否則啟動會報錯)  

將配置好的安裝檔案,分發到其他節點上:
[[email protected] ~]$ scp -r apache-storm-0.9.5/ h41:/home/hadoop/
[[email protected] ~]$ scp -r apache-storm-0.9.5/ h42:/home/hadoop/

Storm叢集的主節點為Nimbus,從節點為Supervisor,我們需要在h40上啟動Nimbus服務,在從節點h41、h42上啟動Supervisor服務:
[[email protected] apache-storm-0.9.5]$ bin/storm nimbus
卻報這個錯:

  1. File "bin/storm", line 61  
  2.   normclasspath = cygpath if sys.platform == 'cygwin' else identity  
  3.                            ^  
  4. ntaxError: invalid syntax  

百度了一下說是Python版本過低造成的,我用python -V看來一下,果然是很古老的Python 2.4.3版本,否則Python的好多新功能它都沒有,於是我打算重新安裝Python(如果你的Python版本夠高可忽略此步驟,2.7以上就可以了),去官網下了個2.7.12的,下載地址:https://www.python.org/downloads/

Python2.7.12安裝:三臺機器都重複一下步驟:

[[email protected] usr]# tar -zxvf python-2.7.12.tgz
[[email protected] usr]# cd Python-2.7.12/

編譯前,請先確認gcc、make、patch等編譯工具是否已安裝,並可正常使用。
[[email protected] ~]# yum -y install gcc*

[[email protected] Python-2.7.12]# ./configure

[[email protected] Python-2.7.12]# make && make install

[[email protected] Python-2.7.12]# rm -rf /usr/bin/python

錯誤做法:
[[email protected] Python-2.7.12]# ln -s python /usr/bin/python
[[email protected] Python-2.7.12]# ll /usr/bin/python
lrwxrwxrwx 1 root root 6 May 10 10:33 /usr/bin/python -> python
否則在執行bin/storm nimbus的時候會報這個錯:
[[email protected] apache-storm-0.9.5]$ bin/storm supervisor
-bash: bin/storm: /usr/bin/python: bad interpreter: Too many levels of symbolic links
正確做法:
[[email protected] ~]# ln -s /usr/Python-2.7.12/python /usr/bin/python
[[email protected] ~]# ll /usr/bin/python
lrwxrwxrwx 1 root root 25 May 10 10:37 /usr/bin/python -> /usr/Python-2.7.12/python

[[email protected] Python-2.7.12]# python -V
Python 2.7.12

可是再執行bin/storm nimbus的時候還是報錯:
[[email protected] apache-storm-0.9.5]$ bin/storm nimbus
-bash: bin/storm: /usr/bin/python: bad interpreter: Permission denied
解決方案:在/home/hadoop/apache-storm-0.9.5/bin/storm中,用你的實際Python安裝路徑#!/usr/Python-2.7.12/python替換第一行#!/usr/bin/python
(這裡我不太懂的是前面的#不是註釋的意思嗎,那修不修改又有什麼意義呢,可是結果它還好使。後來我在第二次試驗的時候並沒有報這個錯,我也不知道啥原因,如果你沒有出現該報錯可忽略)

[[email protected] apache-storm-0.9.5]$ bin/storm supervisor(不知道為什麼這個主節點的supervisor也得開,我當時不開的話再後面的試驗中無法將kafka中的資料實時傳到storm做分析,三個節點都開supervisor的時候就正常,除了主節點只開兩個節點的supervisor的話就產生空檔案無資料產生。後來開主節點和一個從節點的supervisor並且關另一個從節點的supervisor卻也好使,並且還會在有supervisor程序的主節點h40中再建立一個新的檔案寫入,我已將被玩壞了。。。。。
但是storm的容錯性不是很好嗎,只缺一個supervisor咋麼會出錯呢,再說我看人家部落格中也沒說要必須主節點也得開supervisor程序也成功了啊,不知道大家有沒有遇到這個問題,很是困惑我。後來我在本地模式下把主節點即使把其他兩節點的supervisor關掉卻都能正常往裡寫檔案裡寫資料,就是稍微等一下而已,並且發現開啟的supervisor程序越多所等待的時間越少,我也真是奇了怪了!順便這裡提一句正常的情況下提交完Topology後會產生空檔案,但是得等好長一會兒才能將kafka中的資料寫入,時間長到你都懷疑試驗失敗了。。)
[[email protected] apache-storm-0.9.5]$ bin/storm supervisor
[[email protected] apache-storm-0.9.5]$ bin/storm supervisor
(還有就是在提交Topology後,在啟動supervisor程序的控制檯總是打印出如kill 24361: No such process之類的,其中數字不斷的變化,但卻不影響正常使用,我不太明白是什麼原因)

為了方便監控,可以啟動Storm UI,可以從Web頁面上監控Storm Topology的執行狀態,例如在h40上啟動:
[[email protected] apache-storm-0.9.5]$ bin/storm ui &
這樣可以通過訪問http://192.168.8.40:8080(我用http://h40:8080沒有訪問成功)來檢視Topology的執行狀況。

整合Kafka+Storm

訊息通過各種方式進入到Kafka訊息中介軟體,比如可以通過使用Flume來收集日誌資料,然後在Kafka中路由暫存,然後再由實時計算程式Storm做實時分析,這時我們就需要將在Storm的Spout中讀取Kafka中的訊息,然後交由具體的Spot元件去分析處理。
下面,我們開發了一個簡單WordCount示例程式,從Kafka讀取訂閱的訊息行,通過空格拆分出單個單詞,然後再做詞頻統計計算,實現的Topology的程式碼,如下所示:

  1. package org.shirdrn.storm.examples;  
  2. import java.io.FileWriter;  
  3. import java.io.IOException;  
  4. import java.util.HashMap;  
  5. import java.util.Map;  
  6. import java.util.StringTokenizer;  
  7. import java.util.UUID;  
  8. import storm.kafka.BrokerHosts;  
  9. import storm.kafka.KafkaSpout;  
  10. import storm.kafka.SpoutConfig;  
  11. import storm.kafka.StringScheme;  
  12. import storm.kafka.ZkHosts;  
  13. import backtype.storm.Config;  
  14. import backtype.storm.LocalCluster;  
  15. import backtype.storm.StormSubmitter;  
  16. import backtype.storm.spout.SchemeAsMultiScheme;  
  17. import backtype.storm.task.TopologyContext;  
  18. import backtype.storm.topology.BasicOutputCollector;  
  19. import backtype.storm.topology.OutputFieldsDeclarer;  
  20. import backtype.storm.topology.TopologyBuilder;  
  21. import backtype.storm.topology.base.BaseBasicBolt;  
  22. import backtype.storm.tuple.Fields;  
  23. import backtype.storm.tuple.Tuple;  
  24. import backtype.storm.tuple.Values;  
  25. publicclass MyKafkaTopology {  
  26.     publicstaticclass WordSpliter extends BaseBasicBolt{    
  27.         @Override
  28.         publicvoid execute(Tuple tuple, BasicOutputCollector collector){    
  29.             // 接收到一個句子  
  30.             String sentence = tuple.getString(0);  
  31.             // 把句子切割為單詞  
  32.             StringTokenizer iter =