Flume和Kafka完成實時數據的采集
Flume和Kafka完成實時數據的采集
寫在前面
Flume和Kafka在生產環境中,一般都是結合起來使用的。可以使用它們兩者結合起來收集實時產生日誌信息,這一點是很重要的。如果,你不了解flume和kafka,你可以先查看我寫的關於那兩部分的知識。再來學習,這部分的操作,也是可以的。
實時數據的采集,就面臨一個問題。我們的實時數據源,怎麽產生呢?因為我們可能想直接獲取實時的數據流不是那麽的方便。我前面寫過一篇文章,關於實時數據流的python產生器,文章地址:http://blog.csdn.net/liuge36/article/details/78596876
你可以先看一下,如何生成一個實時的數據...
思路??如何開始呢??
分析:我們可以從數據的流向著手,數據一開始是在webserver的,我們的訪問日誌是被nginx服務器實時收集到了指定的文件,我們就是從這個文件中把日誌數據收集起來,即:webserver=>flume=>kafka
webserver日誌存放文件位置
這個文件的位置,一般是我們自己設置的
我們的web日誌存放的目錄是在:
/home/hadoop/data/project/logs/access.log下面
[hadoop@hadoop000 logs]$ pwd /home/hadoop/data/project/logs [hadoop@hadoop000 logs]$ ls access.log [hadoop@hadoop000 logs]$
Flume
做flume,其實就是寫conf文件,就面臨選型的問題
source選型?channel選型?sink選型?
這裏我們選擇 exec source memory channel kafka sink
怎麽寫呢?
按照之前說的那樣1234步驟
從官網中,我們可以找到我們的選型應該如何書寫:
1) 配置Source
exec source
# Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /home/hadoop/data/project/logs/access.log a1.sources.r1.shell = /bin/sh -c
2) 配置Channel
memory channel
a1.channels.c1.type = memory
3) 配置Sink
kafka sink
flume1.6版本可以參照http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.7.0/FlumeUserGuide.html#kafka-sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = hadoop000:9092
a1.sinks.k1.topic = flume_kafka
a1.sinks.k1.batchSize = 5
a1.sinks.k1.requiredAcks =1
4) 把以上三個組件串起來
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
我們new一個文件叫做test3.conf
把我們自己分析的代碼貼進去:
[hadoop@hadoop000 conf]$ vim test3.conf
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/data/project/logs/access.log
a1.sources.r1.shell = /bin/sh -c
a1.channels.c1.type = memory
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = hadoop000:9092
a1.sinks.k1.topic = flume_kafka
a1.sinks.k1.batchSize = 5
a1.sinks.k1.requiredAcks =1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
這裏我們先不啟動,因為其中涉及到kafka的東西,必須先把kafka部署起來,,
Kafka的部署
kafka如何部署呢??
參照官網的說法,我們首先啟動一個zookeeper進程,接著,才能夠啟動kafka的server
Step 1: Start the zookeeper
[hadoop@hadoop000 ~]$
[hadoop@hadoop000 ~]$ jps
29147 Jps
[hadoop@hadoop000 ~]$ zkServer.sh start
JMX enabled by default
Using config: /home/hadoop/app/zk/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[hadoop@hadoop000 ~]$ jps
29172 QuorumPeerMain
29189 Jps
[hadoop@hadoop000 ~]$
Step 2: Start the server
[hadoop@hadoop000 ~]$ kafka-server-start.sh $KAFKA_HOME/config/server.properties
#外開一個窗口,查看jps
[hadoop@hadoop000 ~]$ jps
29330 Jps
29172 QuorumPeerMain
29229 Kafka
[hadoop@hadoop000 ~]$
如果,這部分不是很熟悉,可以參考http://blog.csdn.net/liuge36/article/details/78592169
Step 3: Create a topic
[hadoop@hadoop000 ~]$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flume_kafka
WARNING: Due to limitations in metric names, topics with a period (‘.‘) or underscore (‘_‘) could collide. To avoid issues it is best to use either, but not both.
Created topic "flume_kafka".
[hadoop@hadoop000 ~]$
Step 4: 開啟之前的agent
[hadoop@hadoop000 conf]$ flume-ng agent --name a1 --conf . --conf-file ./test3.conf -Dflume.root.logger=INFO,console
Step 5: Start a consumer
kafka-console-consumer.sh --zookeeper hadoop000:2181 –topic flume-kafka
上面的第五步執行之後,就會收到刷屏的結果,哈哈哈!!
上面的消費者會一直一直的刷屏,還是很有意思的!!!
這裏的消費者是把接收到的數據數據到屏幕上
後面,我們會介紹,使用SparkStreaming作為消費者實時接收數據,並且接收到的數據做簡單數據清洗的開發,從隨機產生的日誌中篩選出我們需要的數據.....
Flume和Kafka完成實時數據的采集