Kafka與Flume和HDFS整合應用之日誌採集系統專案
阿新 • • 發佈:2021-01-03
技術標籤:kafkaflumehadoophadoopflume大資料kafkazookeeper
文章目錄
前言
Flume 是一個連線各種元件和系統的橋樑,在$FLUME_HOME/lib 目錄下有Flume與HBase、HDFS等整合的 jar檔案,可以很方便地與HBase和HDFS連線。在實際業務中,我們一般通過Flume 從應用程式實時採集資料寫入到Kafka,而將歷史資料通過Flume 匯入到HDFS以用於離線分析計算。當然,我們也可以通過Flume從Kafka將資料寫入到HBase和HDFS。
專案需求
使用Flume實時採集資料寫入Kafka主題中,並持久化到本地磁碟,然後通過Flume消費主題訊息,並實時寫入到HDFS,供開發人員進行離線分析計算。
一、Flume採集日誌寫入Kafka
建立一個flume-kafka.properties檔案,寫入相關配置如下。
首先指定源、接收器和通道的名稱,配置如下:
agent.sources = sc #指定源名稱 agent.sinks = sk #指定接收器名稱 agent.channels = chl #指定通道名稱
1.Source配置
agent.sources.sc.type = exec #指定源型別為linux命令
agent.sources.sc.channels = chl #繫結通道,指定源將事件傳遞的通道,可以指定多個通道
agent.sources.sc.command = tail -f /opt/flume/test.log #以tail命令開啟檔案輸出流
agent.sources.sc.fileHeader = false #指定事件不包括頭資訊
2.Sinks配置
#接收器型別
agent.sinks.sk.type = org.apache.flume.sink.kafka. KafkaSink
#繫結通道,指定接收器讀取資料的通道
agent.sinks.sk.channel = chl
agent.sinks.sk.kafka.bootstrap.servers = 172.20.10.3:9092,172.20.10.4:9092:172.20.10.5:9092
#指定寫入Kafka的主題
agent.sinks.sk.kafka.topic=flume-kafka
#指定序列化類
agent.sinks.sk.serializer.class=kafka.serializer.StringEncoder
#生產者acks方式
agent.sinks.sk.producer.acks = 1
#指定字元編碼
agent.sinks.sk.custom.encoding = UTF-8
3.Channel配置
agent.channels.chl.type = memory #指定通道型別
agent.channels.chl.capacity = 1000 #在通道中停留的最大事件數
agent.channels.chl.transactionCapacity = 1000 #每次從源拉取的事件數及給接收器的事件數
Flume安裝以及更詳細的請看我之前寫的:Flume採集日誌寫入Kafka
二、Flume採集Kafka訊息寫入HDFS
首先建立一個kafka2hdfs.properties檔案,並完成源、通道和接收器名稱的定義,配置資訊如下:
# source alias
agent.sources = source_from_kafka
# channels alias
agent.channels = mem_channel
# sink alias
agent.sinks = hdfs_sink
1.KafkaSource配置
程式碼如下:
# define kafka source
agent.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.source_from_kafka.channels = mem_channel
agent.sources.source_from_kafka.batchSize = 5000
# set kafka broker address
agent.sources.source_from_kafka.kafka.bootstrap.servers = hadoop2:9092,hadoop3:9092
# set kafka topic
agent.sources.source_from_kafka.kafka.topics = flume-kafka
# set kafka groupid
agent.sources.source_from_kafka.kafka.consumer.group.id = flume_test_id
2.KafkaSinks配置
程式碼如下:
# defind hdfs sink
agent.sinks.hdfs_sink.type = hdfs
# specify the channel the sink should use
agent.sinks.hdfs_sink.channel = mem_channel
# set store hdfs path
agent.sinks.hdfs_sink.hdfs.path = /data/flume/kafka/%Y-%m-%d/%H
agent.sinks.hdfs_sink.hdfs.fileSuffix = .txt
# set file size to trigger roll
agent.sinks.hdfs_sink.hdfs.rollSize = 0
agent.sinks.hdfs_sink.hdfs.rollCount = 0
agent.sinks.hdfs_sink.hdfs.rollInterval = 3600
agent.sinks.hdfs_sink.hdfs.threadsPoolSize = 30
agent.sinks.hdfs_sink.hdfs.fileType=DataStream
agent.sinks.hdfs_sink.hdfs.writeFormat=Text
3.KafkaChannel配置
程式碼如下:
# define channel from kafka source to hdfs sink
agent.channels.mem_channel.type = memory
# channel store size
agent.channels.mem_channel.capacity = 100000
# transaction size
agent.channels.mem_channel.transactionCapacity = 10000
三、啟動Flume NG和Kafka驗證
1.啟動Flume採集日誌寫入Kafka代理
flume-ng agent -n agent -f $FLUME_HOME/conf/flume-kafka.properties &>flume-kafka.log &
2.啟動Flume採集Kafka訊息寫入HDFS代理
flume-ng agent -n agent -f $FLUME_HOME/conf/kafka2hdfs.properties &>kafka2hdfs.log &
3.效果展示
消費:
[[email protected] ~]# kafka-console-consumer.sh --bootstrap-server 172.18.221.221:9092,172.29.155.250:9092 --topic flume-kafka
hello,2020-12-25,16:38:41,15223012324
hello,2020-12-25,16:38:51,15223012324
hello,2020-12-25,16:39:01,15223012324
hello,2020-12-25,16:39:11,15223012324
hello,2020-12-25,16:39:21,15223012324
hello,2020-12-25,16:39:31,15223012324
hello,2020-12-25,16:39:41,15223012324
HDFS:
[[email protected] /]# hdfs dfs -cat /data/flume/kafka/2020-12-25/16/FlumeData.1608885511599.txt.tmp
hello,2020-12-25,16:38:11,15223012324
hello,2020-12-25,16:38:01,15223012324
hello,2020-12-25,16:38:21,15223012324
hello,2020-12-25,16:38:31,15223012324
hello,2020-12-25,16:38:41,15223012324
hello,2020-12-25,16:38:51,15223012324
hello,2020-12-25,16:39:01,15223012324
hello,2020-12-25,16:39:11,15223012324
hello,2020-12-25,16:39:21,15223012324
hello,2020-12-25,16:39:31,15223012324
hello,2020-12-25,16:39:41,15223012324
總結
這裡我寫入到HDFS的方法為一個小時建立一個資料夾,具體請根據實際情況而定,正在接收資料寫操作的檔案的字尾預設為.tmp,寫操作完成之後,字尾會自動刪除。