kafka的配置,kafka和flume的配置
阿新 • • 發佈:2018-11-14
參考文件: https://files.cnblogs.com/files/han-guang-xue/kafka.zip
其中實現如圖的效果詳細步驟如下:
#han01.conf
a1.sources=r1 a1.channels=c1 a1.sinks=k1 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /logs a1.sinks.k1.type=avro a1.sinks.k1.hostname=han01 a1.sinks.k1.port=22222 a1.channels.c1.type=file a1.channels.c1.checkpointDir=/home/uplooking/data/flume/checkpoint a1.channels.c1.dataDirs = /home/uplooking/data/flume/data a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1
#han02-1.conf
a1.sources=r1 a1.channels=c1 a1.sinks=k1
a1.sources.r1.type=exec
a1.sources.r1.command = tail -F /logs/a.log
a1.sinks.k1.type=avro a1.sinks.k1.hostname=han01 a1.sinks.k1.port=22222 a1.channels.c1.type=file a1.channels.c1.checkpointDir=/home/uplooking/data/flume/checkpoint a1.channels.c1.dataDirs = /home/uplooking/data/flume/data a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1
#han02-2.conf
b1.sources=r1 b1.channels=c1 b1.sinks=k1 b1.sources.r1.type=spooldirb1.sources.r1.spoolDir = /logs b1.sinks.k1.type=avro b1.sinks.k1.hostname=han01 b1.sinks.k1.port=22222 b1.channels.c1.type=file b1.channels.c1.checkpointDir=/home/uplooking/data/flume/checkpoint b1.channels.c1.dataDirs = /home/uplooking/data/flume/data b1.sources.r1.channels=c1 b1.sinks.k1.channel=c1
# han03.conf a1.sources = r1 a1.sinks = k1 a1.channels = c1 #對於source的配置描述 監聽avro(表示flume的型別) a1.sources.r1.type = avro a1.sources.r1.bind = han01 a1.sources.r1.port = 22222 #sink到kafka裡面 a1.sinks.k1.type =org.apache.flume.sink.kafka.KafkaSink #設定Kafka的Topic a1.sinks.k1.kafka.topic = haha1 #設定Kafka的broker地址和埠號 a1.sinks.k1.kafka.bootstrap.servers = han01:9092,han02:9092,han03:9092 #配置批量提交的數量 a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 a1.sinks.ki.kafka.producer.compression.type= snappy #對於channel的配置描述 使用檔案做資料的臨時快取 這種的安全性要高 a1.channels.c1.type = file a1.channels.c1.checkpointDir = /home/uplooking/data/flume/checkpoint a1.channels.c1.dataDirs = /home/uplooking/data/flume/data #通過channel c1將source r1和sink k1關聯起來 a1.sources.r1.channels = c1
先開啟 han03 機器上的flume; 在開啟其他的
開啟flume命令:
bin/flume-ng agent --conf conf --conf-file conf/flume-kafka.conf --name a1 -Dflume.root.logger=INFO,console
然後開啟消費者:
./bin/kafka-console-consumer.sh --bootstrap-server zhiyou01:9092, zhiyou02:9092, zhiyou03:9092 --from-beginning --topic test3
建立話題命令:
./bin/kafka-topics.sh --create --zookeeper zhiyou01:2181,zhiyou02:2181,zhiyou03:2181 --replication-factor 2 --partitions 3 --topic test3
|
|