kafka與flume的整合例項
一、整體步驟:
1.首先安裝kafka,配置flume。建立kafka的topic(利用zookeeper進行管理,所以首先要安裝zookeeper)
2.將檔案放置在flume的source目錄下,啟動flume。將檔案讀取到指定的kafka的topic中。
3.啟動的kafka的consumer端。
二、具體整合過程:
1.前提kafka和flume已經安裝好,我們主要講解整合過程。
2,建立kafka的topic
:[[email protected] ~]# kafka-topic.sh --create --topic mytopic
檢視建立topic:
[[email protected] ~]# kafka-topic.sh --list --zookeeper localhosts:2181
3.flume的讀取檔案到kafka的配置,在flume的conf目錄下建立flume-dirToKafka.properties,新增如下配置:
[[email protected] conf]# cat flume-dirToKafka.properties #agent1 name agent1.sources=source1 agent1.sinks=sink1 agent1.channels=channel1
#注意建立目錄的許可權問題:chmod 777 -R (flumePath)和(dir)
#set sink1 #設定獲取資料儲存位置,這裡是kafka,如果是hdfs,就設定為相應的hdfs
#set channel1 |
4.啟動flume:
注意:agent1為配置檔案中設定的agent命名,要對應,不然啟動會卡主不動。大致為:no configuration host=錯誤名
[[email protected] bin]# ./flume-ng agent --conf conf --conf-file ../conf/flume-dirToKafka.properties --name agent1 -Dflume.root.logger=INFO,console
注意:在這裡可能會報一個錯誤,如下:
18/11/07 00:39:29 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: sink1 started 18/11/07 00:39:29 ERROR kafka.KafkaSink: Failed to publish events java.lang.IllegalStateException: Empty value [channel=[channel=channel1]] at com.google.common.base.Preconditions.checkState(Preconditions.java:145) at org.apache.flume.channel.file.FlumeEventQueue.removeHead(FlumeEventQueue.java:160) at org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doTake(FileChannel.java:512) at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113) at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95) at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:97) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) at java.lang.Thread.run(Thread.java:745) 18/11/07 00:39:29 ERROR flume.SinkRunner: Unable to deliver event. Exception follows. org.apache.flume.EventDeliveryException: Failed to publish events |
解決方法:
刪除checkpointDir=/flumePath/dir/logdfstmp/point 目錄下的所有檔案,再次啟動,就可以了。
flume啟動成功後,會顯示如下,已將spoolDir=/flumePath/dir/logdfs下的檔案讀入topic中,在此處於停留監控狀態,當我們在監控目錄下傳入資料時,會在此下面顯示:
18/11/07 00:47:20 INFO producer.SyncProducer: Connected to hadoop13:9092 for producing 18/11/07 00:47:20 INFO producer.SyncProducer: Disconnecting from hadoop13:9092 18/11/07 00:47:20 INFO producer.SyncProducer: Disconnecting from hadoop12:9092 18/11/07 00:47:20 INFO producer.SyncProducer: Connected to hadoop12:9092 for producing 18/11/07 00:47:43 INFO file.EventQueueBackingStoreFile: Start checkpoint for /yangxiaohai/flumePath/dir/logdfstmp/point/checkpoint, elements to sync = 26806 18/11/07 00:47:43 INFO file.EventQueueBackingStoreFile: Updating checkpoint metadata: logWriteOrderID: 1541522865614, queueSize: 0, queueHead: 43508 18/11/07 00:47:43 INFO file.Log: Updated checkpoint for file:/yangxiaohai/flumePath/dir/logdfstmp/log-12 position: 1108730 logWriteOrderID: 1541522865614 18/11/07 00:47:43 INFO file.LogFile: Closing RandomReader /yangxiaohai/flumePath/dir/logdfstmp/log-8 18/11/07 00:47:43 INFO file.LogFile: Closing RandomReader /yangxiaohai/flumePath/dir/logdfstmp/log-9 18/11/07 00:47:43 INFO file.LogFile: Closing RandomReader /yangxiaohai/flumePath/dir/logdfstmp/log-10 |
5.這時flume已經source目錄下的檔案讀入kafka的mytopic中,這時我們啟動kafka的consumer,這時會有檔案不斷的被獨處,結果如下:
啟動consumer:
[[email protected] ~]# kafka-console-consumer.sh --zookeeper localhost:2181 --topic mytopic(建立的topic) --from-beginning
讀取的檔案的結果:
{"timestamp":"2017-02-11T10:49:43.043Z","url":"/original/index_6.html","is_entrance":0,"data_type":"null","channel":"null","to_target":0,"keyword":"","search_engine":"","country":"中國","area":"華北","province":"北京市","city":"北京市","use_duration_cat":"null","domain":"www.donews.com","to_domain":0,"use_interval_cat":"null","is_exit":0,"event":"startup","os":"Windows XP","os_type":"pc","browser":"Firefox","browser_version":"Firefox 9.0.1","suuid":"47ab648cb5c15bc8e1952efc16a037cb","short_cookie":"null","ip":"118.192.66.41","use_duration":0,"use_interval":0,"pv_cat":"null","event_name":[],"refer":"","hour":"10","gender":"null","age":0,"account_level":0,"payment_method":"","consumption_point":"","money":0.0,"account":"","zone_id":"","app_version":"","network":"null","nettype":"null","lang":"","app_upgrade_from":"","display":"null","device_type":"null","register_days":0,"refer_domain":"null","appkey":"donews_website_nginx_log","day":"2017-02-11"} {"timestamp":"2017-02-11T10:22:01.001Z","url":"/column/get_adv_bottom","is_entrance":0,"data_type":"null","channel":"null","to_target":0,"keyword":"","search_engine":"","country":"中國","area":"華東","province":"福建省","city":"福州市","use_duration_cat":"null","domain":"www.donews.com","to_domain":0,"use_interval_cat":"null","is_exit":0,"event":"jump","os":"Windows 7","os_type":"pc","browser":"Internet Explorer","browser_version":"Internet Explorer 7.0","suuid":"4f41eff515e7be6774749383270794e7","short_cookie":"null","ip":"112.5.236.153","use_duration":0,"use_interval":0,"pv_cat":"null","event_name":[],"refer":"http://www.donews.com/ent/index","hour":"10","gender":"null","age":0,"account_level":0,"payment_method":"","consumption_point":"","money":0.0,"account":"","zone_id":"","app_version":"","network":"null","nettype":"null","lang":"","app_upgrade_from":"","display":"null","device_type":"null","register_days":0,"refer_domain":"null","appkey":"donews_website_nginx_log","day":"2017-02-11"} {"timestamp":"2017-02-11T10:22:14.014Z","url":"/editor/person/34","is_entrance":0,"data_type":"null","channel":"null","to_target":0,"keyword^C [[email protected] ~]# ^C |
6.這時當我們在次向flume的spoolDir=/flumePath/dir/logdfs下傳入檔案時,flume將會監控到,並顯示。我們傳入一個test.txt.
[[email protected] dir]# scp -r test.txt ./logdfs [[email protected] dir]# cd logdfs [[email protected] logdfs]# ll -rw-r--r--. 1 root root 14506039 8月 19 2017 sdkJson.log.COMPLETED -rw-r--r--. 1 root root 34 11月 7 00:50 test.txt.COMPLETED |
flume端的監控結果:
18/11/07 00:47:43 INFO file.LogFile: Closing RandomReader /yangxiaohai/flumePath/dir/logdfstmp/log-8 (以上是啟動時停留的位置,下面是新增檔案後增加的日誌)
|