window7下Flume+kafka監聽檔案變化
阿新 • • 發佈:2020-12-10
Flume安裝
解壓
需要JDK,以及配置JAVA_HOME
在環境變數中配置FLUME_HOME
並在Path中新增%FLUME_HOME%\conf;%FLUME_HOME%\bin
修改配置檔案
將flume-conf.properties.template、flume-env.ps1.template、flume-env.sh.template的.template字尾刪除
修改內容 flume-env.sh
export JAVA_HOME=D:/softs/Java/jdk1.8.0_271
flume-env.ps1
$FLUME_CLASSPATH="D:/softs/apache-flume-1.9.0-bin/lib"
flume-conf.properties
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
# a1.sources.r1.type = netcat
# a1.sources.r1.bind = localhost
# a1.sources.r1.port = 44444
a1.sources.r1.type = exec
a1.sources.r1.command = D:/softs/tail/tail.exe -f D:/home/logs/TestWeb.log
a1.sources.r1.fileHeader = true
a1.sources.r1.deserializer.outputCharset=UTF-8
# Describe the sink
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = flume-data
a1.sinks.k1.kafka.bootstrap.servers = 192.168.88.62:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
這裡需要一個windows版本的tail小工具 tail工具下載
進入bin目錄
執行Flume
flume-ng agent -c ../conf -f ../conf/flume-conf.properties -n a1 -property flume.root.logger=INFO,console
啟動成功後
在kafka的topic【flume-data】可檢視消費資訊
bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.62:9092 --topic flume-data
可以手動寫一條訊息至log檔案中:
Test Message 回車儲存
kafka消費資訊
[[email protected] kafka_2.12-2.6.0]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.62:9092 --topic flume-data
Test Message