1. 程式人生 > >Flume+Kafka雙劍合璧玩轉大資料平臺日誌採集

Flume+Kafka雙劍合璧玩轉大資料平臺日誌採集

概述

大資料平臺每天會產生大量的日誌,處理這些日誌需要特定的日誌系統。

一般而言,這些系統需要具有以下特徵:

  • 構建應用系統和分析系統的橋樑,並將它們之間的關聯解耦;
  • 支援近實時的線上分析系統和類似於Hadoop之類的離線分析系統;
  • 具有高可擴充套件性。即:當資料量增加時,可以通過增加節點進行水平擴充套件。

為此建議將日誌採集分析系統分為如下幾個模組:

 

 

 

 

 

  • 資料採集模組:負責從各節點上實時採集資料,建議選用Flume-NG來實現。
  • 資料接入模組:由於採集資料的速度和資料處理的速度不一定同步,因此新增一個訊息中介軟體來作為緩衝,建議選用Kafka來實現。
  • 流式計算模組:對採集到的資料進行實時分析,建議選用Storm來實現。
  • 資料輸出模組:對分析後的結果持久化,可以使用HDFS、MySQL等。

日誌採集選型

大資料平臺每天會產生大量的日誌,處理這些日誌需要特定的日誌系統。目前常用的開源日誌系統有 Flume 和Kafka兩種, 都是非常優秀的日誌系統,且各有特點。下面我們來逐一認識一下。

Flume元件特點

Flume是一個分散式、可靠、高可用的海量日誌採集、聚合和傳輸的日誌收集系統。支援在日誌系統中定製各類資料傳送方,用於收集資料;同時,Flume提供對資料進行簡單處理,並寫到各種資料接受方(可定製)的能力。

Flume的設計目標

  • 可靠性

Flume的核心是把資料從資料來源收集過來,再送到目的地。為了保證輸送一定成功,在送到目的地之前,會先快取資料,待資料真正到達目的地後,刪除自己快取的資料。Flume 使用事務性的方式保證傳送Event整個過程的可靠性。

  • 可擴充套件性

Flume中只有一個角色Agent,其中包含Source、Sink、Channel三種元件。一個Agent的Sink可以輸出到另一個Agent的Source。這樣通過配置可以實現多個層次的流配置。

  • 功能可擴充套件性

Flume自帶豐富的Source、Sink、Channel實現。使用者也可以根據需要新增自定義的元件實現, 並在配置中使用起來。

Flume的架構

Flume的基本架構是Agent。它是一個完整的資料收集工具,含有三個核心元件,分別是 Source、Channel、Sink。資料以Event為基本單位經過Source、Channel、Sink,從外部資料來源來,向外部的目的地去。

 

 

 

 

 

 

 

 

除了單Agent的架構外,還可以將多個Agent組合起來形成多層的資料流架構:

  • 多個Agent順序連線:將多個Agent順序連線起來,將最初的資料來源經過收集,儲存到最終的儲存系統中。一般情況下,應該控制這種順序連線的Agent的數量,因為資料流經的路徑變長了,如果不考慮Failover的話,出現故障將影響整個Flow上的Agent收集服務。

 

 

 

 

 

  • 多個Agent的資料匯聚到同一個Agent:這種情況應用的場景比較多,適用於資料來源分散的分散式系統中資料流彙總。

 

 

 

 

 

  • 多路(Multiplexing)Agent:多路模式一般有兩種實現方式,一種是用來複制,另一種是用來分流。複製方式可以將最前端的資料來源複製多份,分別傳遞到多個Channel中,每個Channel接收到的資料都是相同的。分流方式,Selector可以根據Header的值來確定資料傳遞到哪一個Channel。

 

 

 

 

 

  • 實現Load Balance功能:Channel中Event可以均衡到對應的多個Sink元件上,而每個Sink元件再分別連線到一個獨立的Agent上,這樣可以實現負載均衡。

 

 

 

 

 

Kafka元件特點

kafka實際上是一個訊息釋出訂閱系統。Producer向某個Topic釋出訊息,而Consumer訂閱某個Topic的訊息。一旦有新的關於某個Topic的訊息,Broker會傳遞給訂閱它的所有Consumer。

Kafka的設計目標

  • 資料在磁碟上的存取代價為O(1)

Kafka以Topic來進行訊息管理,每個Topic包含多個Partition,每個Partition對應一個邏輯log,由多個Segment組成。每個Segment中儲存多條訊息。訊息id由其邏輯位置決定,即從訊息id可直接定位到訊息的儲存位置,避免id到位置的額外對映。

  • 為釋出和訂閱提供高吞吐量

Kafka每秒可以生產約25萬訊息(50 MB),每秒處理55萬訊息(110 MB)。

  • 分散式系統,易於向外擴充套件

所有的Producer、Broker和Consumer都會有多個,均為分散式的。無需停機即可擴充套件機器。

Kafka的架構

Kafka是一個分散式的、可分割槽的、可複製的訊息系統,維護訊息佇列。

Kafka的整體架構非常簡單,是顯式分散式架構,Producer、Broker和Consumer都可以有多個。Producer,consumer實現Kafka註冊的介面,資料從Producer傳送到Broker,Broker承擔一箇中間快取和分發的作用。Broker分發註冊到系統中的Consumer。Broker的作用類似於快取,即活躍的資料和離線處理系統之間的快取。客戶端和伺服器端的通訊,是基於簡單、高效能、且與程式語言無關的TCP協議。

 

 

 

 

 

 

 

 

 

 

Flume與Kafka的比較

Flume和Kafka都是優秀的日誌系統,其都能實現資料採集、資料傳輸、負載均衡、容錯等一系列的需求, 但是兩者之間還是有著一定的差別。

 

 

 

 

 

由此可見Flume和Kafka還是各有特點的:

  • Flume 適用於沒有程式設計的配置解決方案,由於提供了豐富的source、channel、sink實現,各種資料來源的引入只是配置變更就可實現。
  • Kafka 適用於對資料管道的吞吐量、可用性要求都很高的解決方案,基本需要程式設計實現資料的生產和消費。

 

日誌採集選型小結

建議採用Flume作為資料的生產者,這樣可以不用程式設計就實現資料來源的引入,並採用Kafka Sink作為資料的消費者,這樣可以得到較高的吞吐量和可靠性。如果對資料的可靠性要求高的話,可以採用Kafka Channel來作為Flume的Channel使用。

Flume對接Kafka

Flume作為訊息的生產者,將生產的訊息資料(日誌資料、業務請求資料等)通過Kafka Sink釋出到Kafka中。

對接配置

 

 

 

 

對接示例

假設現有Flume實時讀取/data1/logs/component_role.log的資料並匯入到Kafka的mytopic主題中。

環境預設為:
Zookeeper 的地址為  zdh100:2181 zdh101:2181 zdh102:2181
Kafka broker的地址為 zdh100:9092 zdh101:9092  zdh102:9093

配置Flume agent,如下修改Flume配置:

gent1.sources = logsrc
agent1.channels = memcnl
agent1.sinks = kafkasink
 
#source section
agent1.sources.logsrc.type = exec
agent1.sources.logsrc.command = tail -F /data1/logs/component_role.log
agent1.sources.logsrc.shell = /bin/sh -c
agent1.sources.logsrc.batchSize = 50
agent1.sources.logsrc.channels = memcnl
 
# Each sink's type must be defined
agent1.sinks.kafkasink.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafkasink.brokerList=zdh100:9092, zdh101:9092,zdh102:9092
agent1.sinks.kafkasink.topic=mytopic
agent1.sinks.kafkasink.requiredAcks = 1
agent1.sinks.kafkasink.batchSize = 20
agent1.sinks.kafkasink.channel = memcnl
 
# Each channel's type is defined.
agent1.channels.memcnl.type = memory
agent1.channels.memcnl.capacity = 1000

啟動該Flume節點:

/home/mr/flume/bin/flume-ng agent -c 
/home/mr/flume/conf -f /home/mr/flume/conf/flume-conf.properties -n agent1 -Dflume.monitoring.type=http -Dflume.monitoring.port=10100

動態追加日誌資料,執行命令向 /data1/logs/component_role.log 新增資料:

echo "測試程式碼" >> /data1/logs/component_role.log
echo "檢測Flume+Kafka資料管道通暢" >> /data1/logs/component_role.log

驗證Kafka資料接收結果,執行命令檢查Kafka收到的資料是否正確,應該可以呈現剛才追加的資料:

/home/mr/kafka/bin/kafka-console-consumer.sh --zookeeper zdh100:2181 --topic mytopic --from-beginning

輸出結果如下:

 

歡迎工作一到五年的Java工程師朋友們加入Java程式設計師開發: 854393687
群內提供免費的Java架構學習資料(裡面有高可用、高併發、高效能及分散式、Jvm效能調優、Spring原始碼,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多個知識點的架構資料)合理利用自己每一分每一秒的時間來學習提升自己,不要再用"沒有時間“來掩飾自己思想上的懶惰!趁年輕,使勁拼,給未來的自己一個交代!