1. 程式人生 > >flume介紹

flume介紹

前言

本文通過對flume的架構,資料鏈路和資料的可靠性來分析flume的原理,並在文末提供了demo(官網搬運)。

 

架構

flume可以理解為是一個ETL工具,本身是單點的,也就是隻有agent,沒有server,但是通過強大的source-channel-sink-source…機制,可以通過在多個節點上起agent構成一個DAG,從而形成分散式形態。參考圖1,圖2(其他圖就不貼了,參考官網文件),flume支援在任意節點啟動任意數量的agent,並且agent與agent之間可以通過rpc連線。需要注意的是,1個source可以下發資料到多個channel(通過`ChannelSelector`,最常見的就是廣播,或者動態路由),1個sink只能從1個channel獲取資料,但是多個sink又可以從1個channel拉取資料(通過`SinkProcessor`)。這樣的DAG設計意圖很明顯,source生產資料通常都是比sink處理資料快的,所以channel起到資料緩衝作用,並且通過事務機制保證資料的可靠性。試想一下場景,假如資料量很大,1個source消費速度跟不上,也就是說達到了單程序的效能瓶頸,那麼可以啟動多個agent;假如資料量一般,1個source就足夠,但是處理很複雜比如IO密集型的操作,那麼可以通過多sink的方式從channel拉資料,也就是sink端做負載均衡,比如`LoadBalancingSinkProcessor`。flume這樣的設計很好的滿足了各種場景需求。

 

圖1

圖2

資料鏈路

參考圖3,flume的三個核心元件的資料鏈路是基於推送和拉取模式,其中Channel可以理解為列隊,也是實現資料可靠性的關鍵元件。比如常見的,採用FileChannel之類落盤的channel場景中,當source推送資料到channel失敗,那麼就會觸發source重試,當sink拉取資料操作失敗,那麼回通知channel回滾,直到sink操作成功才提交之前的那些資料,從而使得channel移除那些已經被成功處理的資料。

圖3

 

flume的元件也不僅僅只有source,channel,sink這三個。參考圖4,完整的資料鏈路還有ChannelProcessor和SinkeProcessor。ChannelProcessor主要功能有兩個,1.對事件進行攔截,提供修改事件的入口; 2.對channel的選擇,我們知道1個source可以傳送資料到多個channel,但是具體傳送到哪些channel呢?這裡就是選擇器決定了source傳送到哪些channel。SinkProcessor的功能其實也類似,由於一個channel的事件可以被多個sink拉取,那麼SinkProcessor決定了sink拉取的策略,這裡flume衍生出了sinkGroup的概念,一般情況下,1個sink對應1個執行緒,而sinkGroup可以包含多個sink共用1個執行緒。

 

圖4

 

資料可靠性

資料可靠性是非常重要的一個特性,所以拉出來單獨做一下說明。

flume基於Channel和Transaction介面實現資料不丟失的特性。其中channel負責對資料持久化,維護了所有沒有被事務提交的事件,Transaction負責實現事務語義,類似jdbc的事務語法,如下

Transaction tx = ch.getTransaction();
try {
  tx.begin();
  ...
  // ch.put(event) or ch.take()
  ...
  tx.commit();
 } catch
(ChannelException ex) { tx.rollback(); ... } finally { tx.close(); }

通過這兩個介面的保證,flume可以實現at leaset once的語義,這是由於sink可以出現rpc超時等一些問題,導致誤以為失敗導致事件被重複拉取。這個問題可以通過對事件分配唯一id,再通過其他大資料元件去重。

 

總結

flume提供一個靈活的設計思路,可以通過agent組合構建出符合自己需求的DAG,有點類似storm,但是程式更加輕量。並且提供了很多開箱即用的外掛,可以說是很良心了。

 

demo

下面通過一個案例來了解flume,思路是構建基於netcat的agent,然後通過telnet進行驗證。

# 建立一個新的flume配置檔案

vi example.conf

# example.conf: A single-node Flume configuration

 

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

 

# Describe/configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = localhost

a1.sources.r1.port = 44444

 

# Describe the sink

a1.sinks.k1.type = logger

 

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

 

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

 

# 啟動flume agent,並開啟http的度量監控,可以通過http請求獲取相關度量資料

flume-ng agent --name a1 --conf-file example.conf -Dflume.root.logger=INFO,console -Dflume.monitoring.type=http -Dflume.monitoring.port=9999

 

# 通過telnet進行除錯

telnet localhost 44444

# 發現訊息,可以看到flume agent可以成功接收

 

參考

// flume官網文件以及原始碼

http://flume.apache.org/FlumeUserGuide.html

 

// 書乃本也~

《Flume  構建高可用、可擴充套件的海量日誌採集系統》