2017-08-14 flume+kafka+storm+hdfs整合
基礎環境:
Redhat 5.5 64位(我這裡是三臺虛擬機器h40,h41,h42)myeclipse 8.5
jdk1.7.0_25
zookeeper-3.4.5叢集apache-storm-0.9.5叢集
kafka_2.10-0.8.2.0叢集
apache-flume-1.6.0-bin(h40主節點裝就行)
整合flume+kafka
flume-1.6.0已經有自帶的kafkasink,我這裡用的是它自帶的外掛,你也可以用其他開源的flumeng-kafka-plugin.jar外掛,並且將你引用的這個jar包匯入到flume的lib目錄下。
[
- a1.sources = r1
- a1.sinks = k1
- a1.channels = c1
- # Describe/configure the source
- a1.sources.r1.type = exec
- a1.sources.r1.command = tail -F /home/hadoop/data.txt
- a1.sources.r1.port = 44444
- a1.sources.r1.host = 192.168.8.40
-
a1.sources.r1.channels = c1
- # Describe the sink
- #引用開源的flumeng-kafka-plugin.jar的sink配置
- #a1.sinks.k1.type = org.apache.flume.plugins.KafkaSink
- #a1.sinks.k1.metadata.broker.list=h40:9092,h41:9092,h42:9092
- #a1.sinks.k1.partition.key=0
- #a1.sinks.k1.partitioner.class=org.apache.flume.plugins.SinglePartition
-
#a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
- #a1.sinks.k1.request.required.acks=0
- #a1.sinks.k1.max.message.size=1000000
- #a1.sinks.k1.producer.type=sync
- #a1.sinks.k1.custom.encoding=UTF-8
- #a1.sinks.k1.custom.topic.name=test
- #kafka自帶的kafkasink的sink配置
- a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
- a1.sinks.k1.topic = test
- a1.sinks.k1.brokerList = h40:9092,h41:9092,h42:9092
- a1.sinks.k1.requiredAcks = 1
- a1.sinks.k1.batchSize = 20
- a1.sinks.k1.channel = c1
- # Use a channel which buffers events in memory
- a1.channels.c1.type = memory
- a1.channels.c1.capacity = 1000
- a1.channels.c1.transactionCapacity = 100
- # Bind the source and sink to the channel
- a1.sources.r1.channels = c1
- a1.sinks.k1.channel = c1
KafkaOffsetMonitor執行比較簡單,因為所有執行檔案,資原始檔,jar檔案都打包到KafkaOffsetMonitor-assembly-0.2.0.jar了,直接執行就可以,這種方式太棒了。既不用編譯也不用配置,呵呵,也不是絕對不配置。
a.新建一個目錄kafka-offset-console,然後把jar拷貝到該目錄下
參考:
http://blog.csdn.NET/lizhitao/article/details/27199863
http://www.cnblogs.com/smartloli/p/4615908.html
[[email protected] ~]$ mkdir kafka-offset-console
[[email protected] ~]$ ls kafka-offset-console/
KafkaOffsetMonitor-assembly-0.2.0.jar
在kafka-offset-console目錄下執行該命令執行在後臺:
- java -cp KafkaOffsetMonitor-assembly-0.2.0.jar \
- com.quantifind.kafka.offsetapp.OffsetGetterWeb \
- --zk h40:2181,h41:2181,h42:2181/kafka \
- --port 8089 \
- --refresh 10.seconds \
- --retain 1.days 1>/dev/null 2>&1 &
[[email protected] apache-flume-1.6.0-bin]$ bin/flume-ng agent -c . -f conf/kafka.conf -n a1 -Dflume.root.logger=INFO,console
[[email protected] ~]$ echo "hello world" >> data.txt
檢驗:
在h40節點的kafka消費者視窗可見“hello world”,說明整合成功!
[[email protected] kafka_2.10-0.8.2.0]$ bin/kafka-console-consumer.sh --zookeeper h40:2181,h41:2181,h42:2181/kafka --topic test --from-beginning
hello world
預覽:(在瀏覽器上輸入http://192.168.8.40:8089)
我們通過Kafka的監控工具,來預覽我們上傳的日誌記錄,有沒有在Kafka中產生訊息資料
(如果對英文不是很熟悉的話,還可以用谷歌瀏覽器將頁面翻譯成中文,這樣更方便讀取資訊)
(後來我又在網上看到了另一個kafka的監控工具https://github.com/smartloli/kafka-eagle,但感覺這個比上一個要複雜一些,這個我還沒有親自測試安裝,不知道效果如何)
Storm安裝配置:
Storm叢集也依賴Zookeeper叢集,要保證Zookeeper叢集正常執行。Storm的安裝配置比較簡單,我們仍然使用下面3臺機器搭建:
192.168.4.142 h40
192.168.4.143 h41
192.168.4.144 h42
[[email protected] ~]$ tar -zxvf apache-storm-0.9.5.tar.gz
然後,修改配置檔案conf/storm.yaml,新增如下內容:
- storm.zookeeper.servers:
- - "h40"
- - "h41"
- - "h42"
- storm.zookeeper.port: 2181
- nimbus.host: "h40"
- supervisor.slots.ports:
- - 6700
- - 6701
- - 6702
- - 6703
- 這個配置非常的膈應人,在某些地方必須加空格,否則啟動會報錯)
將配置好的安裝檔案,分發到其他節點上:
[[email protected] ~]$ scp -r apache-storm-0.9.5/ h41:/home/hadoop/
[[email protected] ~]$ scp -r apache-storm-0.9.5/ h42:/home/hadoop/
Storm叢集的主節點為Nimbus,從節點為Supervisor,我們需要在h40上啟動Nimbus服務,在從節點h41、h42上啟動Supervisor服務:
[[email protected] apache-storm-0.9.5]$ bin/storm nimbus
卻報這個錯:
- File "bin/storm", line 61
- normclasspath = cygpath if sys.platform == 'cygwin' else identity
- ^
- ntaxError: invalid syntax
百度了一下說是Python版本過低造成的,我用python -V看來一下,果然是很古老的Python 2.4.3版本,否則Python的好多新功能它都沒有,於是我打算重新安裝Python(如果你的Python版本夠高可忽略此步驟,2.7以上就可以了),去官網下了個2.7.12的,下載地址:https://www.python.org/downloads/
Python2.7.12安裝:三臺機器都重複一下步驟:
[[email protected] usr]# tar -zxvf python-2.7.12.tgz
[[email protected] usr]# cd Python-2.7.12/
編譯前,請先確認gcc、make、patch等編譯工具是否已安裝,並可正常使用。
[[email protected] ~]# yum -y install gcc*
[[email protected] Python-2.7.12]# ./configure
[[email protected] Python-2.7.12]# make && make install
[[email protected] Python-2.7.12]# rm -rf /usr/bin/python
錯誤做法:
[[email protected] Python-2.7.12]# ln -s python /usr/bin/python
[[email protected] Python-2.7.12]# ll /usr/bin/python
lrwxrwxrwx 1 root root 6 May 10 10:33 /usr/bin/python -> python
否則在執行bin/storm nimbus的時候會報這個錯:
[[email protected] apache-storm-0.9.5]$ bin/storm supervisor
-bash: bin/storm: /usr/bin/python: bad interpreter: Too many levels of symbolic links
正確做法:
[[email protected] ~]# ln -s /usr/Python-2.7.12/python /usr/bin/python
[[email protected] ~]# ll /usr/bin/python
lrwxrwxrwx 1 root root 25 May 10 10:37 /usr/bin/python -> /usr/Python-2.7.12/python
[[email protected] Python-2.7.12]# python -V
Python 2.7.12
可是再執行bin/storm nimbus的時候還是報錯:
[[email protected] apache-storm-0.9.5]$ bin/storm nimbus
-bash: bin/storm: /usr/bin/python: bad interpreter: Permission denied
解決方案:在/home/hadoop/apache-storm-0.9.5/bin/storm中,用你的實際Python安裝路徑#!/usr/Python-2.7.12/python替換第一行#!/usr/bin/python
(這裡我不太懂的是前面的#不是註釋的意思嗎,那修不修改又有什麼意義呢,可是結果它還好使。後來我在第二次試驗的時候並沒有報這個錯,我也不知道啥原因,如果你沒有出現該報錯可忽略)
[[email protected] apache-storm-0.9.5]$ bin/storm supervisor(不知道為什麼這個主節點的supervisor也得開,我當時不開的話再後面的試驗中無法將kafka中的資料實時傳到storm做分析,三個節點都開supervisor的時候就正常,除了主節點只開兩個節點的supervisor的話就產生空檔案無資料產生。後來開主節點和一個從節點的supervisor並且關另一個從節點的supervisor卻也好使,並且還會在有supervisor程序的主節點h40中再建立一個新的檔案寫入,我已將被玩壞了。。。。。
但是storm的容錯性不是很好嗎,只缺一個supervisor咋麼會出錯呢,再說我看人家部落格中也沒說要必須主節點也得開supervisor程序也成功了啊,不知道大家有沒有遇到這個問題,很是困惑我。後來我在本地模式下把主節點即使把其他兩節點的supervisor關掉卻都能正常往裡寫檔案裡寫資料,就是稍微等一下而已,並且發現開啟的supervisor程序越多所等待的時間越少,我也真是奇了怪了!順便這裡提一句正常的情況下提交完Topology後會產生空檔案,但是得等好長一會兒才能將kafka中的資料寫入,時間長到你都懷疑試驗失敗了。。)
[[email protected] apache-storm-0.9.5]$ bin/storm supervisor
[[email protected] apache-storm-0.9.5]$ bin/storm supervisor
(還有就是在提交Topology後,在啟動supervisor程序的控制檯總是打印出如kill 24361: No such process之類的,其中數字不斷的變化,但卻不影響正常使用,我不太明白是什麼原因)
為了方便監控,可以啟動Storm UI,可以從Web頁面上監控Storm Topology的執行狀態,例如在h40上啟動:
[[email protected] apache-storm-0.9.5]$ bin/storm ui &
這樣可以通過訪問http://192.168.8.40:8080(我用http://h40:8080沒有訪問成功)來檢視Topology的執行狀況。
整合Kafka+Storm
訊息通過各種方式進入到Kafka訊息中介軟體,比如可以通過使用Flume來收集日誌資料,然後在Kafka中路由暫存,然後再由實時計算程式Storm做實時分析,這時我們就需要將在Storm的Spout中讀取Kafka中的訊息,然後交由具體的Spot元件去分析處理。
下面,我們開發了一個簡單WordCount示例程式,從Kafka讀取訂閱的訊息行,通過空格拆分出單個單詞,然後再做詞頻統計計算,實現的Topology的程式碼,如下所示:
- package org.shirdrn.storm.examples;
- import java.io.FileWriter;
- import java.io.IOException;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.StringTokenizer;
- import java.util.UUID;
- import storm.kafka.BrokerHosts;
- import storm.kafka.KafkaSpout;
- import storm.kafka.SpoutConfig;
- import storm.kafka.StringScheme;
- import storm.kafka.ZkHosts;
- import backtype.storm.Config;
- import backtype.storm.LocalCluster;
- import backtype.storm.StormSubmitter;
- import backtype.storm.spout.SchemeAsMultiScheme;
- import backtype.storm.task.TopologyContext;
- import backtype.storm.topology.BasicOutputCollector;
- import backtype.storm.topology.OutputFieldsDeclarer;
- import backtype.storm.topology.TopologyBuilder;
- import backtype.storm.topology.base.BaseBasicBolt;
- import backtype.storm.tuple.Fields;
- import backtype.storm.tuple.Tuple;
- import backtype.storm.tuple.Values;
- publicclass MyKafkaTopology {
- publicstaticclass WordSpliter extends BaseBasicBolt{
- @Override
- publicvoid execute(Tuple tuple, BasicOutputCollector collector){
- // 接收到一個句子
- String sentence = tuple.getString(0);
- // 把句子切割為單詞
- StringTokenizer iter =