1. 程式人生 > 其它 >window7下Flume+kafka監聽檔案變化

window7下Flume+kafka監聽檔案變化

技術標籤:kafkaflumekafka

Flume安裝

下載Flume安裝包

解壓

需要JDK,以及配置JAVA_HOME
在環境變數中配置FLUME_HOME
並在Path中新增%FLUME_HOME%\conf;%FLUME_HOME%\bin

修改配置檔案

將flume-conf.properties.template、flume-env.ps1.template、flume-env.sh.template的.template字尾刪除
修改內容 flume-env.sh

export JAVA_HOME=D:/softs/Java/jdk1.8.0_271

flume-env.ps1

$FLUME_CLASSPATH="D:/softs/apache-flume-1.9.0-bin/lib"

flume-conf.properties

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
# a1.sources.r1.type = netcat
# a1.sources.r1.bind = localhost
# a1.sources.r1.port = 44444
a1.sources.r1.type =
exec a1.sources.r1.command = D:/softs/tail/tail.exe -f D:/home/logs/TestWeb.log a1.sources.r1.fileHeader = true a1.sources.r1.deserializer.outputCharset=UTF-8 # Describe the sink a1.sinks.k1.channel = c1 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = flume-data a1.sinks.k1.kafka.bootstrap.servers =
192.168.88.62:9092 a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 a1.sinks.k1.kafka.producer.compression.type = snappy # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

這裡需要一個windows版本的tail小工具 tail工具下載
進入bin目錄

執行Flume

flume-ng agent -c ../conf -f ../conf/flume-conf.properties -n a1 -property flume.root.logger=INFO,console

啟動成功後
在kafka的topic【flume-data】可檢視消費資訊

 bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.62:9092 --topic flume-data

可以手動寫一條訊息至log檔案中:
在這裡插入圖片描述
Test Message 回車儲存
kafka消費資訊

[[email protected] kafka_2.12-2.6.0]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.62:9092 --topic flume-data
Test Message