用flume將Kafka資料同步到另一個kafka叢集
阿新 • • 發佈:2022-05-26
Flume
安裝地址
- Flume官網地址:http://flume.apache.org/
- 文件檢視地址:http://flume.apache.org/FlumeUserGuide.html
- 下載地址:http://archive.apache.org/dist/flume/
安裝部署
將下載的軟體包上傳解壓
# tar -zxvf /opt/soft/apache-flume-1.9.0-bin.tar.gz -C /opt/
# ln -s /opt/apache-flume-1.9.0-bin /opt/flume
檢視安裝的版本
# cd /opt/flume/bin/ # flume-ng version Flume1.9.0 Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git Revision: d4fcab4f501d41597bc616921329a4339f73585e Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018 From source with checksum 35db629a3bda49d23e9b3690c80737f9
修改 conf 目錄下的 log4j.properties
配置檔案,配置日誌檔案路徑
# vim conf/log4j.properties flume.log.dir=/data/flume/log
修改conf目錄下的flume-env.sh,配置JAVA_HOME
# vim flume-env.sh
export JAVA_HOME=/opt/jdk
建立配置檔案flume-conf-kafka.properties
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.kafka.bootstrap.servers= 源端KAFKA:9092 a1.sources.r1.kafka.topics=kitchen a1.sources.r1.kafka.groupId = flume a1.sources.r1.kafka.consumer.timeout.ms = 100 # Describe the sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.bootstrap.servers=目端KAFKA:9092 a1.sinks.k1.kafka.topic=kitchen #a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder a1.sinks.k1.kafka.producer.acks=1 a1.sinks.k1.custom.encoding=UTF-8 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 1000 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
啟動agent
/opt/flume/bin/flume-ng agent -n a1 -c /opt/flume/conf/ -f /opt/flume/conf/flume-conf-kafka.properties -Dflume.root.logger=INFO,console
看到Component type: SINK, name: k1 started 即為啟動成功
2022-05-26 19:33:56,247 (lifecycleSupervisor-1-1) [INFO - org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:109)] Kafka version : 2.0.1 2022-05-26 19:33:56,247 (lifecycleSupervisor-1-1) [INFO - org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:110)] Kafka commitId : fa14705e51bd2ce5 2022-05-26 19:33:56,248 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SINK, name: k1: Successfully registered new MBean. 2022-05-26 19:33:56,248 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: k1 started
測試
#消費訊息
# /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka02:9092 --topic kitchen --from-beginning
#生產訊息
/opt/kafka/bin/kafka-console-producer.sh --broker-list kafka01:9092 --topic init-test
檢視flume日誌,有新的Update資訊
2022-05-26 19:35:09,295 (kafka-producer-network-thread | producer-1) [INFO - org.apache.kafka.clients.Metadata.update(Metadata.java:285)] Cluster ID: 5zfnXAaMT4iGG48TrmsiLQ
參考
https://blog.csdn.net/huxili2020/article/details/120391578?spm=1001.2101.3001.6661.1&utm_medium=distribute.pc_relevant_t0.none-task-blog-2%7Edefault%7ECTRLIST%7ERate-1-120391578-blog-115274144.pc_relevant_antiscanv2&depth_1-utm_source=distribute.pc_relevant_t0.none-task-blog-2%7Edefault%7ECTRLIST%7ERate-1-120391578-blog-115274144.pc_relevant_antiscanv2&utm_relevant_index=1
https://blog.csdn.net/qq_47183158/article/details/112179052