1. 程式人生 > >Flume和Kafka完成實時數據的采集

Flume和Kafka完成實時數據的采集

ces arc either sco describe rep net 技術分享 efault

Flume和Kafka完成實時數據的采集

寫在前面
Flume和Kafka在生產環境中,一般都是結合起來使用的。可以使用它們兩者結合起來收集實時產生日誌信息,這一點是很重要的。如果,你不了解flume和kafka,你可以先查看我寫的關於那兩部分的知識。再來學習,這部分的操作,也是可以的。

實時數據的采集,就面臨一個問題。我們的實時數據源,怎麽產生呢?因為我們可能想直接獲取實時的數據流不是那麽的方便。我前面寫過一篇文章,關於實時數據流的python產生器,文章地址:http://blog.csdn.net/liuge36/article/details/78596876
你可以先看一下,如何生成一個實時的數據...

思路??如何開始呢??

分析:我們可以從數據的流向著手,數據一開始是在webserver的,我們的訪問日誌是被nginx服務器實時收集到了指定的文件,我們就是從這個文件中把日誌數據收集起來,即:webserver=>flume=>kafka

webserver日誌存放文件位置
這個文件的位置,一般是我們自己設置的

我們的web日誌存放的目錄是在:
/home/hadoop/data/project/logs/access.log下面

[hadoop@hadoop000 logs]$ pwd
/home/hadoop/data/project/logs
[hadoop@hadoop000 logs]$ ls
access.log
[hadoop@hadoop000 logs]$ 

Flume

做flume,其實就是寫conf文件,就面臨選型的問題
source選型?channel選型?sink選型?

這裏我們選擇 exec source memory channel kafka sink

怎麽寫呢?
按照之前說的那樣1234步驟

從官網中,我們可以找到我們的選型應該如何書寫:
1) 配置Source
exec source

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/data/project/logs/access.log
a1.sources.r1.shell = /bin/sh -c

2) 配置Channel
memory channel

a1.channels.c1.type = memory

3) 配置Sink
kafka sink
flume1.6版本可以參照http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.7.0/FlumeUserGuide.html#kafka-sink

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = hadoop000:9092
a1.sinks.k1.topic = flume_kafka
a1.sinks.k1.batchSize = 5
a1.sinks.k1.requiredAcks =1

4) 把以上三個組件串起來

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

我們new一個文件叫做test3.conf
把我們自己分析的代碼貼進去:

[hadoop@hadoop000 conf]$ vim test3.conf 
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/data/project/logs/access.log
a1.sources.r1.shell = /bin/sh -c


a1.channels.c1.type = memory


a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = hadoop000:9092
a1.sinks.k1.topic = flume_kafka
a1.sinks.k1.batchSize = 5
a1.sinks.k1.requiredAcks =1

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

這裏我們先不啟動,因為其中涉及到kafka的東西,必須先把kafka部署起來,,

Kafka的部署

kafka如何部署呢??
參照官網的說法,我們首先啟動一個zookeeper進程,接著,才能夠啟動kafka的server

Step 1: Start the zookeeper

[hadoop@hadoop000 ~]$ 
[hadoop@hadoop000 ~]$ jps
29147 Jps
[hadoop@hadoop000 ~]$ zkServer.sh start
JMX enabled by default
Using config: /home/hadoop/app/zk/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[hadoop@hadoop000 ~]$ jps
29172 QuorumPeerMain
29189 Jps
[hadoop@hadoop000 ~]$ 

Step 2: Start the server

[hadoop@hadoop000 ~]$ kafka-server-start.sh $KAFKA_HOME/config/server.properties
#外開一個窗口,查看jps
[hadoop@hadoop000 ~]$ jps
29330 Jps
29172 QuorumPeerMain
29229 Kafka
[hadoop@hadoop000 ~]$ 

如果,這部分不是很熟悉,可以參考http://blog.csdn.net/liuge36/article/details/78592169

Step 3: Create a topic

[hadoop@hadoop000 ~]$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flume_kafka
WARNING: Due to limitations in metric names, topics with a period (‘.‘) or underscore (‘_‘) could collide. To avoid issues it is best to use either, but not both.
Created topic "flume_kafka".
[hadoop@hadoop000 ~]$ 

Step 4: 開啟之前的agent

  [hadoop@hadoop000 conf]$ flume-ng agent --name a1 --conf . --conf-file ./test3.conf -Dflume.root.logger=INFO,console

Step 5: Start a consumer

kafka-console-consumer.sh --zookeeper hadoop000:2181 –topic flume-kafka

上面的第五步執行之後,就會收到刷屏的結果,哈哈哈!!
技術分享圖片

上面的消費者會一直一直的刷屏,還是很有意思的!!!
這裏的消費者是把接收到的數據數據到屏幕上

後面,我們會介紹,使用SparkStreaming作為消費者實時接收數據,並且接收到的數據做簡單數據清洗的開發,從隨機產生的日誌中篩選出我們需要的數據.....

Flume和Kafka完成實時數據的采集