Zookeeper +Kafka +Flume的整合
阿新 • • 發佈:2018-12-22
1.JDK的安裝
2.安裝Zookeeper
3.安裝Kafka
4.安裝Flume
5.配置Flume
在/root/apps/apache-flume-1.6.0-bin/conf 目錄下建立flume-kafka.conf檔案
vi flume-kafka.conf
##主要作用是監聽目錄中的新增資料,採集到資料之後,輸出到kafka
##主要作用是監聽目錄中的新增資料,採集到資料之後,輸出到kafka ## 注意:Flume agent的執行,主要就是配置source channel sink ## 下面的a1就是agent的代號,source叫r1 channel叫c1 sink叫k1 ######################################################### a1.sources = r1 a1.sinks = k1 a1.channels = c1 #具體定義source a1.sources.r1.type = spooldir #先建立此目錄,保證裡面空的 a1.sources.r1.spoolDir = /root/flumeKafkalog #sink到kafka裡面 a1.sinks.k1.channel = c1 a1.sinks.k1.type =org.apache.flume.sink.kafka.KafkaSink #設定Kafka的Topic a1.sinks.k1.topic = test3 #設定Kafka的broker地址和埠號 a1.sinks.k1.brokerList = root1:9092,min2:9092 #配置批量提交的數量 a1.sinks.k1.flumeBatchSize = 20 a1.sinks.k1.producer.acks = 1 a1.sinks.k1.producer.linger.ms = 1 a1.sinks.ki.producer.compression.type= snappy #對於channel的配置描述 使用檔案做資料的臨時快取 這種的安全性要高 a1.channels.c1.type = file a1.channels.c1.checkpointDir = /home/uplooking/data/flume/checkpoint a1.channels.c1.dataDirs = /home/uplooking/data/flume/data #通過channel c1將source r1和sink k1關聯起來 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
6.啟動flume
./bin/flume-ng agent --conf conf --conf-file conf/flume-kafka.conf --name a1 -Dflume.root.logger=INFO,console
注意:Flume是日誌採集,相當於生產者。
6.在kafka檢視消費者位置資訊
./bin/kafka-console-consumer.sh -zookeeper root1:2181,min2:2181 --from-beginning --topic test3
如果 下有檔案,檢視消費者訊息資訊會顯現目錄下所有資料。如果沒有,資料,將檔案轉移到/root/flumeKafkalog,在時間不超時的情況下,會顯示資料。 下有檔案,檢視消費者訊息資訊會顯現目錄下所有資料。如果沒有,資料,將檔案轉移到/根/ flumeKafkalog,在時間不超時的情況下,會顯示資料。
注意:Flume是日誌採集,相當於生產者。