1. 程式人生 > >Zookeeper +Kafka +Flume的整合

Zookeeper +Kafka +Flume的整合

1.JDK的安裝

2.安裝Zookeeper

3.安裝Kafka

4.安裝Flume

5.配置Flume

在/root/apps/apache-flume-1.6.0-bin/conf 目錄下建立flume-kafka.conf檔案

vi flume-kafka.conf

##主要作用是監聽目錄中的新增資料,採集到資料之後,輸出到kafka

##主要作用是監聽目錄中的新增資料,採集到資料之後,輸出到kafka
##    注意:Flume agent的執行,主要就是配置source channel sink
##  下面的a1就是agent的代號,source叫r1 channel叫c1 sink叫k1
#########################################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#具體定義source
a1.sources.r1.type = spooldir
#先建立此目錄,保證裡面空的
a1.sources.r1.spoolDir = /root/flumeKafkalog
#sink到kafka裡面
a1.sinks.k1.channel = c1
a1.sinks.k1.type =org.apache.flume.sink.kafka.KafkaSink
#設定Kafka的Topic
a1.sinks.k1.topic = test3
#設定Kafka的broker地址和埠號
a1.sinks.k1.brokerList = root1:9092,min2:9092
#配置批量提交的數量
a1.sinks.k1.flumeBatchSize = 20
a1.sinks.k1.producer.acks = 1
a1.sinks.k1.producer.linger.ms = 1
a1.sinks.ki.producer.compression.type= snappy

#對於channel的配置描述 使用檔案做資料的臨時快取 這種的安全性要高
a1.channels.c1.type = file

a1.channels.c1.checkpointDir = /home/uplooking/data/flume/checkpoint
a1.channels.c1.dataDirs = /home/uplooking/data/flume/data

#通過channel c1將source r1和sink k1關聯起來
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

6.啟動flume

 

./bin/flume-ng agent --conf conf --conf-file conf/flume-kafka.conf --name a1 -Dflume.root.logger=INFO,console

注意:Flume是日誌採集,相當於生產者。

6.在kafka檢視消費者位置資訊

./bin/kafka-console-consumer.sh -zookeeper root1:2181,min2:2181 --from-beginning --topic test3

 

如果   
  下有檔案,檢視消費者訊息資訊會顯現目錄下所有資料。如果沒有,資料,將檔案轉移到/root/flumeKafkalog,在時間不超時的情況下,會顯示資料。  下有檔案,檢視消費者訊息資訊會顯現目錄下所有資料。如果沒有,資料,將檔案轉移到/根/ flumeKafkalog,在時間不超時的情況下,會顯示資料。

注意:Flume是日誌採集,相當於生產者。