flume + kafka 日誌採集
阿新 • • 發佈:2019-01-06
將系統產生日誌資訊通過flume採集,推送至kafka進行消費處理
架構圖
服務 | ip | port | 備註 |
flume collectors | 10.200.132.181 | 6333 | flume collectors |
flume agent | 10.200.132.168 | flume採集器(目前使用一個agent) | |
kafka | 10.200.132.181 | 9092 2181 | kafka和zookeeper |
一臺機器部署一個flume agent,如果需要採集多個服務的日誌,在配置檔案裡面可以配置多個collect,
本文主要安裝flume和如何實現日誌採集
一、安裝部署
1、下載安裝flume
[[email protected] opt]# tar -zxvf apache-flume-1.8.0-bin.tar.gz
2、10.200.132.181機器上配置
新建 flume-collecters.properties ,用於將收集到flume agent日誌 推送到kafka
[[email protected] apache-flume-1.8.0-bin]# cd conf [[email protected] conf]# vim flume-collecters.properties #flume collecters agent.sources = s1Flume agent.channels = c1 agent.sinks =sinkKafka # For each one of the sources, the type is defined agent.sources.s1Flume.channels = c1 agent.sources.s1Flume.type = avro #flume ip agent.sources.s1Flume.bind = 10.200.132.181 # flume 埠 agent.sources.s1Flume.port = 6333 # The channel can be defined as follows. agent.sources.s1Flume.channels = c1 # Each sink's type must be defined agent.sinks.sinkKafka.type = org.apache.flume.sink.kafka.KafkaSink # kafka訊息佇列名稱 agent.sinks.sinkKafka.topic = topic-pear # kafka ip:port agent.sinks.sinkKafka.brokerList = 10.200.132.181:9092 agent.sinks.sinkKafka.requiredAcks = 1 agent.sinks.sinkKafka.batchSize = 20 agent.sinks.sinkKafka.channel = c1 #Specify the channel the sink should use #agent.sinks.loggerSink.channel = memoryChannel # Each channel's type is defined. agent.channels.c1.type = memory # Other config values specific to each type of channel(sink or source) # can be defined as well # In this case, it specifies the capacity of the memory channel agent.channels.c1.capacity = 100
啟動flume
[[email protected] apache-flume-1.8.0-bin]# bin/flume-ng agent -c conf -f conf/flume-collecters.properties -n agent -Dflume.root.logger=INFO,console,LOGFILE &
檢視6333埠是否已啟動
3、10.200.132.168上flume配置
[[email protected] conf]# vim flume-test-collect.properties agent.sources = fileSource agent.channels = memoryChannel agent.sinks = collecter1 agent.sinkgroups = gCollecters agent.sinkgroups.gCollecters.sinks = collecter1 #sink排程模式 load_balance failover agent.sinkgroups.gCollecters.processor.type = failover #負載均衡模式 輪詢 random round_robin agent.sinkgroups.gCollecters.processor.selector=round_robin #失效降級 agent.sinkgroups.gCollecters.processor.backoff=true #降級時間30秒 agent.sinkgroups.gCollecters.processor.maxTimeOut=30000 agent.sources.fileSource.type = exec # 監控的日誌檔案 agent.sources.fileSource.command = tail -F /opt/test/logs/test.log #agent.sources.fileSource.charset=utf-8 agent.sources.fileSource.channels = memoryChannel agent.sources.fileSource.restartThrottle = 10000 agent.sources.fileSource.restart = true agent.sources.fileSource.logStdErr = true # Each sink's type must be defined agent.sinks.collecter1.channel = memoryChannel agent.sinks.collecter1.type = avro # flume 服務ip agent.sinks.collecter1.hostname = 10.200.132.181 # flume 埠 agent.sinks.collecter1.port = 6333 agent.sinks.collecter1.batch-size = 10 # Each channel's type is defined. agent.channels.memoryChannel.type = memory # Other config values specific to each type of channel(sink or source) #The max number of events stored in the channel agent.channels.memoryChannel.capacity = 100 #The max number of events stored in the channel per transaction agent.channels.memoryChannel.transactionCapacity = 100 #Timeout in seconds for adding or removing an event agent.channels.memoryChannel.keep-alive=30
建立日誌目錄(如果沒有就建立)
[[email protected] conf]# mkdir -p /opt/test/logs/
啟動服務
[[email protected] apache-flume-1.8.0-bin]# bin/flume-ng agent -c conf -f conf/flume-test-collect.properties -n agent -Dflume.root.logger=INFO,console,LOGFILE
部署配置基本完成
二、驗證
登入10.200.132.181伺服器,執行kafka消費訊息
[[email protected] ~]# /opt/kafka_2.12-2.0.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic-pear --from-beginning
登入10.200.132.168伺服器,往日誌檔案寫日誌
[[email protected] logs]# echo "hello world" >>test.log
寫入完成之後,大概等幾秒鐘,就可以看到kafka消費者消費的佇列資訊了。
自己也可以寫一個springboot程式產生日誌,修改flume agent 監控的日誌目錄檔案,就可以實時的將日誌通過flume推送至kafka