1. 程式人生 > >kafka與flume的整合例項

kafka與flume的整合例項

一、整體步驟:

1.首先安裝kafka,配置flume。建立kafka的topic(利用zookeeper進行管理,所以首先要安裝zookeeper)

2.將檔案放置在flume的source目錄下,啟動flume。將檔案讀取到指定的kafka的topic中。

3.啟動的kafka的consumer端。

 

二、具體整合過程:

1.前提kafka和flume已經安裝好,我們主要講解整合過程。

2,建立kafka的topic

:[[email protected] ~]# kafka-topic.sh --create --topic mytopic

--replication-factor 1 --partition 10 --zookeeper localhosts:2181

檢視建立topic:

[[email protected] ~]# kafka-topic.sh --list --zookeeper localhosts:2181   

3.flume的讀取檔案到kafka的配置,在flume的conf目錄下建立flume-dirToKafka.properties,新增如下配置

[[email protected] conf]# cat flume-dirToKafka.properties 


#agent1 name
agent1.sources=source1
agent1.sinks=sink1
agent1.channels=channel1


#set source1
agent1.sources.source1.type=spooldir

#注意建立目錄的許可權問題:chmod 777 -R (flumePath)和(dir)
agent1.sources.source1.spoolDir=/yangxiaohai/flumePath/dir/logdfs
agent1.sources.source1.channels=channel1
agent1.sources.source1.fileHeader = false
agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = timestamp

 

#set sink1

#設定獲取資料儲存位置,這裡是kafka,如果是hdfs,就設定為相應的hdfs
agent1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.sink1.topic =mytopic(建立的kafka topic)
agent1.sinks.sink1.brokerList = hadoop11:9092,hadoop12:9092,hadoop13:9092
agent1.sinks.sink1.requiredAcks = 1
agent1.sinks.sink1.batchSize = 100
agent1.sinks.sink1.channel = channel1

 

#set channel1
agent1.channels.channel1.type=file
agent1.channels.channel1.checkpointDir=/yangxiaohai/flumePath/dir/logdfstmp/point 
agent1.channels.channel1.dataDirs=/yangxiaohai/flumePath/dir/logdfstmp

4.啟動flume:

注意:agent1為配置檔案中設定的agent命名,要對應,不然啟動會卡主不動。大致為:no configuration host=錯誤名

[[email protected] bin]# ./flume-ng agent --conf conf --conf-file ../conf/flume-dirToKafka.properties --name agent1 -Dflume.root.logger=INFO,console 

注意:在這裡可能會報一個錯誤,如下:

18/11/07 00:39:29 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: sink1 started
18/11/07 00:39:29 ERROR kafka.KafkaSink: Failed to publish events
java.lang.IllegalStateException: Empty value [channel=[channel=channel1]]
        at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
        at org.apache.flume.channel.file.FlumeEventQueue.removeHead(FlumeEventQueue.java:160)
        at org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doTake(FileChannel.java:512)
        at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
        at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
        at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:97)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
18/11/07 00:39:29 ERROR flume.SinkRunner: Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to publish events

解決方法:

     刪除checkpointDir=/flumePath/dir/logdfstmp/point 目錄下的所有檔案,再次啟動,就可以了。

flume啟動成功後,會顯示如下,已將spoolDir=/flumePath/dir/logdfs下的檔案讀入topic中,在此處於停留監控狀態,當我們在監控目錄下傳入資料時,會在此下面顯示:

18/11/07 00:47:20 INFO producer.SyncProducer: Connected to hadoop13:9092 for producing
18/11/07 00:47:20 INFO producer.SyncProducer: Disconnecting from hadoop13:9092
18/11/07 00:47:20 INFO producer.SyncProducer: Disconnecting from hadoop12:9092
18/11/07 00:47:20 INFO producer.SyncProducer: Connected to hadoop12:9092 for producing
18/11/07 00:47:43 INFO file.EventQueueBackingStoreFile: Start checkpoint for /yangxiaohai/flumePath/dir/logdfstmp/point/checkpoint, elements to sync = 26806
18/11/07 00:47:43 INFO file.EventQueueBackingStoreFile: Updating checkpoint metadata: logWriteOrderID: 1541522865614, queueSize: 0, queueHead: 43508
18/11/07 00:47:43 INFO file.Log: Updated checkpoint for file:/yangxiaohai/flumePath/dir/logdfstmp/log-12 position: 1108730 logWriteOrderID: 1541522865614
18/11/07 00:47:43 INFO file.LogFile: Closing RandomReader /yangxiaohai/flumePath/dir/logdfstmp/log-8
18/11/07 00:47:43 INFO file.LogFile: Closing RandomReader /yangxiaohai/flumePath/dir/logdfstmp/log-9
18/11/07 00:47:43 INFO file.LogFile: Closing RandomReader /yangxiaoha
i/flumePath/dir/logdfstmp/log-10

5.這時flume已經source目錄下的檔案讀入kafka的mytopic中,這時我們啟動kafka的consumer,這時會有檔案不斷的被獨處,結果如下:

啟動consumer:

[[email protected] ~]# kafka-console-consumer.sh --zookeeper localhost:2181 --topic mytopic(建立的topic) --from-beginning

讀取的檔案的結果:

{"timestamp":"2017-02-11T10:49:43.043Z","url":"/original/index_6.html","is_entrance":0,"data_type":"null","channel":"null","to_target":0,"keyword":"","search_engine":"","country":"中國","area":"華北","province":"北京市","city":"北京市","use_duration_cat":"null","domain":"www.donews.com","to_domain":0,"use_interval_cat":"null","is_exit":0,"event":"startup","os":"Windows XP","os_type":"pc","browser":"Firefox","browser_version":"Firefox 9.0.1","suuid":"47ab648cb5c15bc8e1952efc16a037cb","short_cookie":"null","ip":"118.192.66.41","use_duration":0,"use_interval":0,"pv_cat":"null","event_name":[],"refer":"","hour":"10","gender":"null","age":0,"account_level":0,"payment_method":"","consumption_point":"","money":0.0,"account":"","zone_id":"","app_version":"","network":"null","nettype":"null","lang":"","app_upgrade_from":"","display":"null","device_type":"null","register_days":0,"refer_domain":"null","appkey":"donews_website_nginx_log","day":"2017-02-11"}
{"timestamp":"2017-02-11T10:22:01.001Z","url":"/column/get_adv_bottom","is_entrance":0,"data_type":"null","channel":"null","to_target":0,"keyword":"","search_engine":"","country":"中國","area":"華東","province":"福建省","city":"福州市","use_duration_cat":"null","domain":"www.donews.com","to_domain":0,"use_interval_cat":"null","is_exit":0,"event":"jump","os":"Windows 7","os_type":"pc","browser":"Internet Explorer","browser_version":"Internet Explorer 7.0","suuid":"4f41eff515e7be6774749383270794e7","short_cookie":"null","ip":"112.5.236.153","use_duration":0,"use_interval":0,"pv_cat":"null","event_name":[],"refer":"http://www.donews.com/ent/index","hour":"10","gender":"null","age":0,"account_level":0,"payment_method":"","consumption_point":"","money":0.0,"account":"","zone_id":"","app_version":"","network":"null","nettype":"null","lang":"","app_upgrade_from":"","display":"null","device_type":"null","register_days":0,"refer_domain":"null","appkey":"donews_website_nginx_log","day":"2017-02-11"}
{"timestamp":"2017-02-11T10:22:14.014Z","url":"/editor/person/34","is_entrance":0,"data_type":"null","channel":"null","to_target":0,"keyword^C
[[email protected] ~]# ^C

6.這時當我們在次向flume的spoolDir=/flumePath/dir/logdfs下傳入檔案時,flume將會監控到,並顯示。我們傳入一個test.txt.

[[email protected] dir]# scp -r test.txt ./logdfs
[[email protected] dir]# cd logdfs
[[email protected] logdfs]# ll
-rw-r--r--. 1 root root 14506039 8月  19 2017 sdkJson.log.COMPLETED
-rw-r--r--. 1 root root       34 11月  7 00:50 test.txt.COMPLETED

flume端的監控結果:

18/11/07 00:47:43 INFO file.LogFile: Closing RandomReader /yangxiaohai/flumePath/dir/logdfstmp/log-8
18/11/07 00:47:43 INFO file.LogFile: Closing RandomReader /yangxiaohai/flumePath/dir/logdfstmp/log-9
18/11/07 00:47:43 INFO file.LogFile: Closing RandomReader /yangxiaohai/flumePath/dir/logdfstmp/log-10

(以上是啟動時停留的位置,下面是新增檔案後增加的日誌)


18/11/07 00:50:58 INFO avro.ReliableSpoolingFileEventReader: Last read took us just up to a file boundary. Rolling to the next file, if there is one.
18/11/07 00:50:58 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file /flumePath/dir/logdfs/test.txt to /yangxiaohai/flumePath/dir/logdfs/test.txt.COMPLETED
18/11/07 00:51:13 INFO file.EventQueueBackingStoreFile: Start checkpoint for /yangxiaohai/flumePath/dir/logdfstmp/point/checkpoint, elements to sync = 2
18/11/07 00:51:13 INFO file.EventQueueBackingStoreFile: Updating checkpoint metadata: logWriteOrderID: 1541522865621, queueSize: 0, queueHead: 43508
18/11/07 00:51:13 INFO file.Log: Updated checkpoint for file:/yangxiaohai/flumePath/dir/logdfstmp/log-12 position: 1109060 logWriteOrderID: 1541522865621

18/11/07 00:51:13 INFO file.Log: Removing old file:/yangxiaohai /flumePath/dir/logdfstmp/log-8
18/11/07 00:51:13 INFO file.Log: Removing old file:/yangxiaohai/flumePath/dir/logdfstmp/log-8.meta
18/11/07 00:51:13 INFO file.Log: Removing old file: /yangxiaohai/flumePath/dir/logdfstmp/log-9
18/11/07 00:51:13 INFO file.Log: Removing old file: /yangxiaohai/flumePath/dir/logdfstmp/log-9.meta
18/11/07 00:51:13 INFO file.Log: Removing old file: /yangxiaohai/flumePath/dir/logdfstmp/log-10
18/11/07 00:51:13 INFO file.Log: Removing old file: /yangxiaohai/flumePath/dir/logdfstmp/log-10.meta