flume-kafka整合--實時日誌採集
阿新 • • 發佈:2019-02-02
flume採用架構
exec-source + memory-channel + avro-sink
avro-source + memory-channel + kafka-sink
kafka採用架構
啟動zookeeper
zkServer.sh start
啟動kafka(啟動一個“籃子”)
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties &
模擬
啟動一個kafka消費者監聽日誌檔案
kafka-console-consumer.sh –zookeeper 192.168.145.128:2181 –from-beginning –topic firstTopic
向日志文件中新增資料
echo hi,flume-kafka framework >> flume-kafka.txt
exec-memory-avro.conf
flume-ng agent \
--name exec-memory-avro \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/exec-memory-avro.conf \
-Dflume.root.logger=INFO,console
# example exec-memory -avro
exec-memory-avro.sources = exec-source
exec-memory-avro.sinks = avro-sink
exec-memory-avro.channels = memory-channel
# Describe/configure the source
exec-memory-avro.sources.exec-source.type = exec
exec-memory-avro.sources.exec-source.command = tail -F /root/data/flume-kafka.txt
exec-memory-avro.sources.exec -source.shell = /bin/sh -c
# Describe/ the sink
exec-memory-avro.sinks.avro-sink.type = avro
exec-memory-avro.sinks.avro-sink.hostname = 192.168.145.128
exec-memory-avro.sinks.avro-sink.port = 44444
# Use a channel which buffers events in memory
exec-memory-avro.channels.memory-channel.type = memory
# Bind the source and sink to the channel
exec-memory-avro.sources.exec-source.channels = memory-channel
exec-memory-avro.sinks.avro-sink.channel = memory-channel
avro-memory-kafka.conf
flume-ng agent \
--name avro-memory-kafka \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/avro-memory-kafka.conf \
-Dflume.root.logger=INFO,console
# example avro-memory-kafka
avro-memory-kafka.sources = avro-source
avro-memory-kafka.sinks = kafka-sink
avro-memory-kafka.channels = memory-channel
# Describe/configure the source
avro-memory-kafka.sources.avro-source.type = avro
avro-memory-kafka.sources.avro-source.bind = 192.168.145.128
avro-memory-kafka.sources.avro-source.port = 44444
# Describe/ the sink
avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
avro-memory-kafka.sinks.kafka-sink.brokerList = 192.168.145.128:9092
avro-memory-kafka.sinks.kafka-sink.topic = firstTopic
avro-memory-kafka.sinks.kafka-sink.batchSize = 3
avro-memory-kafka.sinks.kafka-sink.requiredAcks = 1
# Use a channel which buffers events in memory
avro-memory-kafka.channels.memory-channel.type = memory
# Bind the source and sink to the channel
avro-memory-kafka.sources.avro-source.channels = memory-channel
avro-memory-kafka.sinks.kafka-sink.channel = memory-channel