整合flume和kafka
技術選型:
exec source + memory channel + avro sink
avro source + memory channel + kafka sink
name:exec-memory-avro.conf
a1.sources = r1 a1.sinks = k1 a1.channels = c1
a1.sources.r1.type = exec a1.sources.r1.command = tail -F /root/zoujc/data a1.sources.r1.shell = /bin/sh -c
a1.sinks.k1.type = avro a1.sinks.k1.hostname = zoujc01 a1.sinks.k1.port = 44444
a1.channels.c1.type = memory
a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
name:avro-memory-kafka.conf
a2.sources = r1 a2.sinks = k1 a2.channels = c1
a2.sources.r1.type = avro a2.sources.r1.bind = zoujc01 a2.sources.r1.port = 44444
a2.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a2.sinks.k1.brokerList = zoujc0101:9092 a2.sinks.k1.topic = hello_topic1 a2.sinks.k1.batchSize = 5 //關係著消費者消費資料的大小 a2.sinks.k1.requiredAcks = 1
a2.channels.c1.type = memory
a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1
啟動
啟動zookeeper
先啟動avro-memory-kafka.conf
flume-ng agent –name a2 –conf $FLUME_HOME/conf –conf-file $FLUME_HOME/conf/avro-memory-kafka.conf -Dflume.root.logger=INFO,console
再啟動exec-memory-avro.conf
flume-ng agent –name a1 –conf $FLUME_HOME/conf –conf-file $FLUME_HOME/conf/exec-memory-avro.conf -Dflume.root.logger=INFO,console
啟動kafka生產者消費者
kafka-console-producer.sh --broker-list zoujc01:9092 --topic hello_topic1 kafka-console-consumer.sh --zookeeper zoujc01:2181–topic hello_topic1