大資料專案之電信客服三
阿新 • • 發佈:2018-12-16
1.啟動Kafaka叢集
這裡的Kafka叢集搭建就不再說了,如果不會搭建可以看我之前的博文
首先啟動Zookeeper叢集,然後再啟動Kafka叢集
bin/zkServer.sh start
bin/kafka-server-start.sh config/server.properties
2.建立Kafka主題
bin/kafka-topics.sh --zookeeper cdh0:2181 --create --replication-factor 3 --partitions 3 --topic ctlog
3.檢視Kafka主題是否建立成功
bin/kafka-topics.sh --zookeeper cdh0:2181 --list
4.啟動一個Kafka的消費者,等待Flume的資訊的輸入
bin/kafka-console-consumer.sh --bootstrap-server cdh0:9092 --topic ctlog --from-beginning
5.配置Flume
建立ct_log.conf
# define a1.sources = r1 a1.sinks = k1 a1.channels = c1 # source a1.sources.r1.type = exec a1.sources.r1.command = tail -F -c +0 /opt/package/log.csv a1.sources.r1.shell = /bin/bash -c # sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.brokerList = cdh0:9092,cdh1:9092,cdh2:9092 a1.sinks.k1.topic = ctlog a1.sinks.k1.batchSize = 20 a1.sinks.k1.requiredAcks = 1 # channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
6.執行Flume
$ bin/flume-ng agent --conf conf/ --name a1 --conf-file testjob/ct_log.conf
到這裡基本就穩了,前面的生產資料程式碼在生產資料,Flume監控產生資料的檔案並將資料傳到Kafka,Kafka進行消費
這時候應該可以在Kafka的消費端看到資料的輸出