Flume監控檔案內容拉取到Kafka消費
阿新 • • 發佈:2018-12-24
1 zookeeper的安裝配置
1.1 ZK配置(安裝過程省略)
1.1.1安裝完成後進入到zk的安裝目錄下的conf目錄
[[email protected] ~]$ cd /home/install/zookeeper/conf
[[email protected] conf]$
1.1.2重新命名zoo_sample.cfg 為zoo.cfg
[[email protected] conf]$ mv zoo_sample.cfg zoo.cfg
1.1.3修改zoo.conf配置檔案
[[email protected] conf]$ vi zoo.cfg
# example sakes.
# 設定zookeeper的資料存放路徑
dataDir=/home/hadoop/install/zookeeper/data
..............
# 配置zookeeper叢集地址 第一個埠用於選舉leader
# 第二個埠用於leader宕機以後再次選舉新的leader
server.1=192.168.13.128:2888:3888
server.2=192.168.13.129:2888:3888
server.3=192.168.13.131:2888:3888
1.1.4建立zookeeper的資料存放路徑並將id寫入到myid檔案(需要手動建立myid檔案)
[[email protected] conf]$ cd ../
[[email protected] zookeeper]$ mkdir data
[[email protected] zookeeper]$ echo 1 >> data/myid
1.2叢集同步zookeeper資料夾(所有檔案都會被同步)
[[email protected] zookeeper]$ cd ../
[[email protected] install]$ xsync zookeeper/
xsyc是個同步指令碼,指令碼內容詳見
https://blog.csdn.net/huoliangwu/article/details/84591893
1.3分別啟動叢集上的zookeeper 並檢視狀態
[[email protected] zookeeper]$ ./bin/zkServer.sh start
[[email protected] zookeeper]$ ./bin/zkServer.sh status
下次寫一個zookeeper叢集啟動的指令碼
2 Kafka 安裝配置
2.1 Kafka配置(安裝過程省略)
2.1.1Kafka安裝目錄下建立logs目錄
[[email protected] kafka]$ mkdir logs
2.1.2修改配置檔案
[[email protected] kafka]$ cd config/
[[email protected] config]$ vi server.properties
#broker的全域性唯一編號,不能重複
broker.id=0
#是否允許刪除topic
delete.topic.enable=true
#處理網路請求的執行緒數量
num.network.threads=3
#用來處理磁碟IO的執行緒數量
num.io.threads=8
#傳送套接字的緩衝區大小
socket.send.buffer.bytes=102400
#接收套接字的緩衝區大小
socket.receive.buffer.bytes=102400
#請求套接字的最大緩衝區大小
socket.request.max.bytes=104857600
#kafka執行日誌存放的路徑
log.dirs=/home/hadoop/install/kafka/logs
#topic在當前broker上的分割槽個數
num.partitions=1
#用來恢復和清理data下資料的執行緒數量
num.recovery.threads.per.data.dir=1
#segment檔案保留的最長時間,超時將被刪除
log.retention.hours=168
#配置連線Zookeeper叢集地址
zookeeper.connect=mini01:2181,mini02:2181,mini03:2181
2.2分發Kafka安裝後的目錄
[[email protected] config]$ cd ../../
[[email protected] install]$ xsync kafka/
2.3分別修改叢集其他機器上的配置檔案 修改broker.id broker.id不得重複
mini02 broker.id=1
mini03 broker.id=2
2.4啟動叢集
[[email protected] kafka]$ bin/kafka-server-start.sh config/server.properties &
[[email protected] kafka]$ bin/kafka-server-start.sh config/server.properties &
[[email protected] kafka]$ bin/kafka-server-start.sh config/server.properties &
3 Flume安裝配置
3.1 ZFlume配置(安裝過程省略)
修改 flume-env.sh 配置檔案,主要是JAVA_HOME變數設定
# during Flume startup.
# Enviroment variables can be set here.
export JAVA_HOME=/home/hadoop/install/jdk1.8.0_111.jdk/
3.2驗證是否安裝成功
[[email protected] flume]$ bin/flume-ng version
Flume 1.7.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 511d868555dd4d16e6ce4fedc72c2d1454546707
Compiled by bessbd on Wed Oct 12 20:51:10 CEST 2016
From source with checksum 0d21b3ffdc55a07e1d08875872c00523
出現提示便表示安裝成功
Flume拉取檔案資料到Kafka消費訊息
新建flume配置檔案 flume2kafka.conf
[[email protected] flume] vi conf/flume2kafka.conf
#定義了當前agent的名字叫做a1
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/logs.tsv
a1.sources.r1.shell=/bin/sh -c
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# 指定Flume sink
#a1.sinks.k1.type = logger
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = test
a1.sinks.k1.brokerList = mini01:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 100
動態造資料
[[email protected] ~]$ test/getlog.sh
16991082028 餘建堂 18401456522 楊佔昊 46-08-04 15:07:46 1182
19641660102 劉洋 13059125383 郭振君 75-10-14 04:42:31 3926
14361606522 劉優 14692570569 陳猛 33-07-01 13:57:10 1700
17755364600 霍風浩 13059125383 郭振君 90-04-12 14:32:53 5587
15093813308 賈明燦 15060932038 閔強 90-02-11 04:42:22 1416
19641660102 劉洋 18506948961 冀纓菲 25-06-24 06:19:43 2622
15060932038 閔強 13305040991 高永斌 05-05-13 21:10:10 5015
13113007783 孫良明 14692570569 陳猛 94-08-12 03:35:48 3909
啟動 flume
[[email protected] flume]$ bin/flume-ng agent -c conf -f conf/flume_kafka.conf --name a1 -Dflume.root.logger=INFO,console
啟動kafka消費者
[[email protected] kafka]$ bin/kafka-console-consumer.sh --zookeeper mini01:2181 --topic test --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
13658626467 劉海濤 13288940364 賈鑫瑜 61-12-23 17:35:42 2615
19920594188 段雪鵬 17755364600 霍風浩 74-11-18 17:42:22 6740
17533432302 張文舉 14865818526 常天罡 05-05-12 06:43:29 2569
15142556083 趙曉露 18491428393 張苗 51-02-03 20:39:20 2719
13305040991 高永斌 14692570569 陳猛 75-08-08 18:50:56 5506
19641660102 劉洋 14385342683 陳凱 49-09-22 04:50:07 3719