1. 程式人生 > 其它 >Kafka與Flume和HDFS整合應用之日誌採集系統專案

Kafka與Flume和HDFS整合應用之日誌採集系統專案

技術標籤:kafkaflumehadoophadoopflume大資料kafkazookeeper

文章目錄


前言

Flume 是一個連線各種元件和系統的橋樑,在$FLUME_HOME/lib 目錄下有Flume與HBase、HDFS等整合的 jar檔案,可以很方便地與HBase和HDFS連線。在實際業務中,我們一般通過Flume 從應用程式實時採集資料寫入到Kafka,而將歷史資料通過Flume 匯入到HDFS以用於離線分析計算。當然,我們也可以通過Flume從Kafka將資料寫入到HBase和HDFS。

下面我將為大家介紹如何使用Flume快速消費Kafka Topic資料,然後將消費後的資料實時轉發到HDFS上。


專案需求

使用Flume實時採集資料寫入Kafka主題中,並持久化到本地磁碟,然後通過Flume消費主題訊息,並實時寫入到HDFS,供開發人員進行離線分析計算。


一、Flume採集日誌寫入Kafka

建立一個flume-kafka.properties檔案,寫入相關配置如下。
首先指定源、接收器和通道的名稱,配置如下:

agent.sources = sc #指定源名稱
agent.sinks = sk #指定接收器名稱
agent.channels = chl #指定通道名稱

1.Source配置

agent.sources.sc.type = exec #指定源型別為linux命令
agent.sources.sc.channels = chl #繫結通道,指定源將事件傳遞的通道,可以指定多個通道
agent.sources.sc.command = tail -f /opt/flume/test.log #以tail命令開啟檔案輸出流
agent.sources.sc.fileHeader = false #指定事件不包括頭資訊

2.Sinks配置

#接收器型別
agent.sinks.sk.type = org.apache.flume.sink.kafka.
KafkaSink #繫結通道,指定接收器讀取資料的通道 agent.sinks.sk.channel = chl agent.sinks.sk.kafka.bootstrap.servers = 172.20.10.3:9092,172.20.10.4:9092:172.20.10.5:9092 #指定寫入Kafka的主題 agent.sinks.sk.kafka.topic=flume-kafka #指定序列化類 agent.sinks.sk.serializer.class=kafka.serializer.StringEncoder #生產者acks方式 agent.sinks.sk.producer.acks = 1 #指定字元編碼 agent.sinks.sk.custom.encoding = UTF-8

3.Channel配置

agent.channels.chl.type = memory #指定通道型別
agent.channels.chl.capacity = 1000 #在通道中停留的最大事件數
agent.channels.chl.transactionCapacity = 1000 #每次從源拉取的事件數及給接收器的事件數

Flume安裝以及更詳細的請看我之前寫的:Flume採集日誌寫入Kafka


二、Flume採集Kafka訊息寫入HDFS

首先建立一個kafka2hdfs.properties檔案,並完成源、通道和接收器名稱的定義,配置資訊如下:

# source alias
agent.sources = source_from_kafka  
# channels alias
agent.channels = mem_channel  
# sink alias
agent.sinks = hdfs_sink

1.KafkaSource配置

程式碼如下:

# define kafka source
agent.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource  
agent.sources.source_from_kafka.channels = mem_channel  
agent.sources.source_from_kafka.batchSize = 5000  

# set kafka broker address  
agent.sources.source_from_kafka.kafka.bootstrap.servers = hadoop2:9092,hadoop3:9092

# set kafka topic
agent.sources.source_from_kafka.kafka.topics = flume-kafka

# set kafka groupid
agent.sources.source_from_kafka.kafka.consumer.group.id = flume_test_id

2.KafkaSinks配置

程式碼如下:

# defind hdfs sink
agent.sinks.hdfs_sink.type = hdfs 

# specify the channel the sink should use  
agent.sinks.hdfs_sink.channel = mem_channel

# set store hdfs path
agent.sinks.hdfs_sink.hdfs.path = /data/flume/kafka/%Y-%m-%d/%H 

agent.sinks.hdfs_sink.hdfs.fileSuffix = .txt

# set file size to trigger roll
agent.sinks.hdfs_sink.hdfs.rollSize = 0  
agent.sinks.hdfs_sink.hdfs.rollCount = 0  
agent.sinks.hdfs_sink.hdfs.rollInterval = 3600  
agent.sinks.hdfs_sink.hdfs.threadsPoolSize = 30
agent.sinks.hdfs_sink.hdfs.fileType=DataStream    
agent.sinks.hdfs_sink.hdfs.writeFormat=Text    

3.KafkaChannel配置

程式碼如下:

# define channel from kafka source to hdfs sink 
agent.channels.mem_channel.type = memory  

# channel store size
agent.channels.mem_channel.capacity = 100000
# transaction size
agent.channels.mem_channel.transactionCapacity = 10000

三、啟動Flume NG和Kafka驗證

1.啟動Flume採集日誌寫入Kafka代理

flume-ng agent -n agent -f $FLUME_HOME/conf/flume-kafka.properties &>flume-kafka.log &

2.啟動Flume採集Kafka訊息寫入HDFS代理

flume-ng agent -n agent -f $FLUME_HOME/conf/kafka2hdfs.properties &>kafka2hdfs.log &

3.效果展示

消費:

[[email protected] ~]# kafka-console-consumer.sh --bootstrap-server 172.18.221.221:9092,172.29.155.250:9092  --topic flume-kafka
hello,2020-12-25,16:38:41,15223012324
hello,2020-12-25,16:38:51,15223012324
hello,2020-12-25,16:39:01,15223012324
hello,2020-12-25,16:39:11,15223012324
hello,2020-12-25,16:39:21,15223012324
hello,2020-12-25,16:39:31,15223012324
hello,2020-12-25,16:39:41,15223012324

HDFS:

[[email protected] /]# hdfs dfs -cat /data/flume/kafka/2020-12-25/16/FlumeData.1608885511599.txt.tmp
hello,2020-12-25,16:38:11,15223012324
hello,2020-12-25,16:38:01,15223012324
hello,2020-12-25,16:38:21,15223012324
hello,2020-12-25,16:38:31,15223012324
hello,2020-12-25,16:38:41,15223012324
hello,2020-12-25,16:38:51,15223012324
hello,2020-12-25,16:39:01,15223012324
hello,2020-12-25,16:39:11,15223012324
hello,2020-12-25,16:39:21,15223012324
hello,2020-12-25,16:39:31,15223012324
hello,2020-12-25,16:39:41,15223012324

在這裡插入圖片描述

總結

這裡我寫入到HDFS的方法為一個小時建立一個資料夾,具體請根據實際情況而定,正在接收資料寫操作的檔案的字尾預設為.tmp,寫操作完成之後,字尾會自動刪除。