1. 程式人生 > >kafka的配置,kafka和flume的配置

kafka的配置,kafka和flume的配置

參考文件:  https://files.cnblogs.com/files/han-guang-xue/kafka.zip

其中實現如圖的效果詳細步驟如下:

#han01.conf

a1.sources=r1 a1.channels=c1 a1.sinks=k1 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /logs a1.sinks.k1.type=avro a1.sinks.k1.hostname=han01 a1.sinks.k1.port=22222 a1.channels.c1.type=file a1.channels.c1.checkpointDir
=/home/uplooking/data/flume/checkpoint a1.channels.c1.dataDirs = /home/uplooking/data/flume/data a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1
#han02-1.conf
a1.sources
=r1 a1.channels=c1 a1.sinks=k1

a1.sources.r1.type=exec
a1.sources.r1.command = tail  -F  /logs/a.log
a1.sinks.k1.type=avro
a1.sinks.k1.hostname
=han01 a1.sinks.k1.port=22222 a1.channels.c1.type=file a1.channels.c1.checkpointDir=/home/uplooking/data/flume/checkpoint a1.channels.c1.dataDirs = /home/uplooking/data/flume/data a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1
#han02-2.conf

b1.sources=r1 b1.channels=c1 b1.sinks=k1 b1.sources.r1.type=spooldir
b1.sources.r1.spoolDir = /logs b1.sinks.k1.type=avro b1.sinks.k1.hostname=han01 b1.sinks.k1.port=22222 b1.channels.c1.type=file b1.channels.c1.checkpointDir=/home/uplooking/data/flume/checkpoint b1.channels.c1.dataDirs = /home/uplooking/data/flume/data b1.sources.r1.channels=c1 b1.sinks.k1.channel=c1
# han03.conf

a1.sources = r1

a1.sinks = k1

a1.channels = c1

 

#對於source的配置描述 監聽avro(表示flume的型別)

a1.sources.r1.type = avro

a1.sources.r1.bind = han01

a1.sources.r1.port = 22222

 

#sink到kafka裡面

a1.sinks.k1.type =org.apache.flume.sink.kafka.KafkaSink

#設定Kafka的Topic

a1.sinks.k1.kafka.topic = haha1

#設定Kafka的broker地址和埠號

a1.sinks.k1.kafka.bootstrap.servers = han01:9092,han02:9092,han03:9092

#配置批量提交的數量

a1.sinks.k1.kafka.flumeBatchSize = 20

a1.sinks.k1.kafka.producer.acks = 1

a1.sinks.k1.kafka.producer.linger.ms = 1

a1.sinks.ki.kafka.producer.compression.type= snappy

 

#對於channel的配置描述 使用檔案做資料的臨時快取 這種的安全性要高

a1.channels.c1.type = file

a1.channels.c1.checkpointDir = /home/uplooking/data/flume/checkpoint

a1.channels.c1.dataDirs = /home/uplooking/data/flume/data

 

#通過channel c1將source r1和sink k1關聯起來

a1.sources.r1.channels = c1

 

 

先開啟 han03 機器上的flume; 在開啟其他的

開啟flume命令:

bin/flume-ng agent --conf conf --conf-file conf/flume-kafka.conf  --name a1  -Dflume.root.logger=INFO,console

 

然後開啟消費者:

./bin/kafka-console-consumer.sh --bootstrap-server zhiyou01:9092, zhiyou02:9092, zhiyou03:9092 --from-beginning --topic test3

 

建立話題命令:

./bin/kafka-topics.sh --create --zookeeper zhiyou01:2181,zhiyou02:2181,zhiyou03:2181 --replication-factor 2 --partitions 3 --topic test3