1. 程式人生 > >使用Flume消費Kafka資料到HDFS

使用Flume消費Kafka資料到HDFS

1.概述

對於資料的轉發,Kafka是一個不錯的選擇。Kafka能夠裝載資料到訊息佇列,然後等待其他業務場景去消費這些資料,Kafka的應用介面API非常的豐富,支援各種儲存介質,例如HDFS、HBase等。如果不想使用Kafka API編寫程式碼去消費Kafka Topic,也是有元件可以去整合消費的。下面筆者將為大家介紹如何使用Flume快速消費Kafka Topic資料,然後將消費後的資料轉發到HDFS上。

2.內容

在實現這套方案之間,可以先來看看整個資料的流向,如下圖所示:

業務資料實時儲存到Kafka叢集,然後通過Flume Source元件實時去消費Kafka業務Topic獲取資料,將消費後的資料通過Flume Sink元件傳送到HDFS進行儲存。

2.1 準備基礎環境

按照上圖所示資料流向方案,需要準備好Kafka、Flume、Hadoop(HDFS可用)等元件。

2.1.1 啟動Kafka叢集並建立Topic

Kafka目前來說,並沒有一個批量的管理指令碼,不過我們可以對kafka-server-start.sh指令碼和kafka-server-stop.sh指令碼進行二次封裝。程式碼如下所示:

#! /bin/bash

# Kafka代理節點地址, 如果節點較多可以用一個檔案來儲存
hosts=(dn1 dn2 dn3)

# 列印啟動分散式指令碼資訊
mill=`date "+%N"`
tdate=`date
"+%Y-%m-%d %H:%M:%S,${mill:0:3}"` echo [$tdate] INFO [Kafka Cluster] begins to execute the $1 operation. # 執行分散式開啟命令 function start() { for i in ${hosts[@]} do smill=`date "+%N"` stdate=`date "+%Y-%m-%d %H:%M:%S,${smill:0:3}"` ssh [email protected]
$i "source /etc/profile;echo [$stdate] INFO [Kafka Broker $i] begins to execute the startup operation.;kafka-server-start.sh $KAFKA_HOME/config/server.properties>/dev/null" & sleep 1 done } # 執行分散式關閉命令 function stop() { for i in ${hosts[@]} do smill=`date "+%N"` stdate=`date "+%Y-%m-%d %H:%M:%S,${smill:0:3}"` ssh [email protected]$i "source /etc/profile;echo [$stdate] INFO [Kafka Broker $i] begins to execute the shutdown operation.;kafka-server-stop.sh>/dev/null;" & sleep 1 done } # 檢視Kafka代理節點狀態 function status() { for i in ${hosts[@]} do smill=`date "+%N"` stdate=`date "+%Y-%m-%d %H:%M:%S,${smill:0:3}"` ssh [email protected]$i "source /etc/profile;echo [$stdate] INFO [Kafka Broker $i] status message is :;jps | grep Kafka;" & sleep 1 done } # 判斷輸入的Kafka命令引數是否有效 case "$1" in start) start ;; stop) stop ;; status) status ;; *) echo "Usage: $0 {start|stop|status}" RETVAL=1 esac

啟動Kafka集群后,在Kafka叢集可用的情況下,建立一個業務Topic,執行命令如下:

# 建立一個flume_collector_data主題
kafka-topics.sh --create --zookeeper dn1:2181,dn2:2181,dn3:2181 --replication-factor 3 --partitions 6 --topic flume_collector_data

2.2 配置Flume Agent

然後,開始配置Flume Agent資訊,讓Flume從Kafka叢集的flume_collector_data主題中讀取資料,並將讀取到的資料傳送到HDFS中進行儲存。配置內容如下:

# ------------------- define data source ----------------------
# source alias
agent.sources = source_from_kafka  
# channels alias
agent.channels = mem_channel  
# sink alias
agent.sinks = hdfs_sink  


# define kafka source
agent.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource  
agent.sources.source_from_kafka.channels = mem_channel  
agent.sources.source_from_kafka.batchSize = 5000  

# set kafka broker address  
agent.sources.source_from_kafka.kafka.bootstrap.servers = dn1:9092,dn2:9092,dn3:9092

# set kafka topic
agent.sources.source_from_kafka.kafka.topics = flume_collector_data

# set kafka groupid
agent.sources.source_from_kafka.kafka.consumer.group.id = flume_test_id

# defind hdfs sink
agent.sinks.hdfs_sink.type = hdfs 

# specify the channel the sink should use  
agent.sinks.hdfs_sink.channel = mem_channel

# set store hdfs path
agent.sinks.hdfs_sink.hdfs.path = /data/flume/kafka/%Y%m%d  

# set file size to trigger roll
agent.sinks.hdfs_sink.hdfs.rollSize = 0  
agent.sinks.hdfs_sink.hdfs.rollCount = 0  
agent.sinks.hdfs_sink.hdfs.rollInterval = 3600  
agent.sinks.hdfs_sink.hdfs.threadsPoolSize = 30
agent.sinks.hdfs_sink.hdfs.fileType=DataStream    
agent.sinks.hdfs_sink.hdfs.writeFormat=Text    

# define channel from kafka source to hdfs sink 
agent.channels.mem_channel.type = memory  

# channel store size
agent.channels.mem_channel.capacity = 100000
# transaction size
agent.channels.mem_channel.transactionCapacity = 10000

然後,啟動Flume Agent,執行命令如下:

# 在Linux後臺執行命令
flume-ng agent -n agent -f $FLUME_HOME/conf/kafka2hdfs.properties &

2.3 向Kafka主題中傳送資料

啟動Kafka Eagle監控系統(執行ke.sh start命令),填寫傳送資料。如下圖所示:

然後,查詢Topic中的資料是否有被寫入,如下圖所示:

 

最後,到HDFS對應的路徑檢視Flume傳輸的資料,結果如下圖所示:

3.總結

 至此,Kafka中業務Topic的資料,經過Flume Source元件消費後,再由Flume Sink元件寫入到HDFS,整個過程省略了大量的業務編碼工作。如果實際工作當中不涉及複雜的業務邏輯處理,對於Kafka的資料轉發需求,不妨可以試試這種方案。

4.結束語

這篇部落格就和大家分享到這裡,如果大家在研究學習的過程當中有什麼問題,可以加群進行討論或傳送郵件給我,我會盡我所能為您解答,與君共勉!

另外,博主出書了《Kafka並不難學》,喜歡的朋友或同學, 可以在公告欄那裡點選購買連結購買博主的書進行學習,在此感謝大家的支援。