1. 程式人生 > >Flume(NG)架構設計要點及配置實踐 Flume NG是一個分散式、可靠、可用的系統,它能夠將不同資料來源的海量日誌資料進行高效收集

Flume(NG)架構設計要點及配置實踐 Flume NG是一個分散式、可靠、可用的系統,它能夠將不同資料來源的海量日誌資料進行高效收集

Flume(NG)架構設計要點及配置實踐

Flume NG是一個分散式、可靠、可用的系統,它能夠將不同資料來源的海量日誌資料進行高效收集、聚合、移動,最後儲存到一箇中心化資料儲存系統中。由原來的Flume OG到現在的Flume NG,進行了架構重構,並且現在NG版本完全不相容原來的OG版本。經過架構重構後,Flume NG更像是一個輕量的小工具,非常簡單,容易適應各種方式日誌收集,並支援failover和負載均衡。

架構設計要點

Flume的架構主要有一下幾個核心概念:

  • Event:一個數據單元,帶有一個可選的訊息頭
  • Flow:Event從源點到達目的點的遷移的抽象
  • Client:操作位於源點處的Event,將其傳送到Flume Agent
  • Agent:一個獨立的Flume程序,包含元件Source、Channel、Sink
  • Source:用來消費傳遞到該元件的Event
  • Channel:中轉Event的一個臨時儲存,儲存有Source元件傳遞過來的Event
  • Sink:從Channel中讀取並移除Event,將Event傳遞到Flow Pipeline中的下一個Agent(如果有的話)

Flume NG架構,如圖所示:
flume-ng-architecture
外部系統產生日誌,直接通過Flume的Agent的Source元件將事件(如日誌行)傳送到中間臨時的channel元件,最後傳遞給Sink元件,HDFS Sink元件可以直接把資料儲存到HDFS叢集上。
一個最基本Flow的配置,格式如下:

01 # list the sources, sinks and channels for the agent
02 <Agent>.sources = <Source1> <Source2>
03 <Agent>.sinks = <Sink1> <Sink2>
04 <Agent>.channels = <Channel1> <Channel2>
05
06 # set channel for source
07 <Agent>.sources.<Source1>.channels = <Channel1> <Channel2> ...
08 <Agent>.sources.<Source2>.channels = <Channel1> <Channel2> ...
09
10 # set channel for sink
11 <Agent>.sinks.<Sink1>.channel = <Channel1>
12 <Agent>.sinks.<Sink2>.channel = <Channel2>

尖括號裡面的,我們可以根據實際需求或業務來修改名稱。下面詳細說明:
表示配置一個Agent的名稱,一個Agent肯定有一個名稱。與是Agent的Source元件的名稱,消費傳遞過來的Event。與是Agent的Channel元件的名稱。與是Agent的Sink元件的名稱,從Channel中消費(移除)Event。
上面配置內容中,第一組中配置Source、Sink、Channel,它們的值可以有1個或者多個;第二組中配置Source將把資料儲存(Put)到哪一個Channel中,可以儲存到1個或多個Channel中,同一個Source將資料儲存到多個Channel中,實際上是Replication;第三組中配置Sink從哪一個Channel中取(Task)資料,一個Sink只能從一個Channel中取資料。
下面,根據官網文件,我們展示幾種Flow Pipeline,各自適應於什麼樣的應用場景:

  • 多個Agent順序連線

flume-multiseq-agents
可以將多個Agent順序連線起來,將最初的資料來源經過收集,儲存到最終的儲存系統中。這是最簡單的情況,一般情況下,應該控制這種順序連線的Agent的數量,因為資料流經的路徑變長了,如果不考慮failover的話,出現故障將影響整個Flow上的Agent收集服務。

  • 多個Agent的資料匯聚到同一個Agent

flume-join-agent
這種情況應用的場景比較多,比如要收集Web網站的使用者行為日誌,Web網站為了可用性使用的負載均衡的叢集模式,每個節點都產生使用者行為日誌,可以為每個節點都配置一個Agent來單獨收集日誌資料,然後多個Agent將資料最終匯聚到一個用來儲存資料儲存系統,如HDFS上。

  • 多路(Multiplexing)Agent

flume-multiplexing-agent
這種模式,有兩種方式,一種是用來複制(Replication),另一種是用來分流(Multiplexing)。Replication方式,可以將最前端的資料來源複製多份,分別傳遞到多個channel中,每個channel接收到的資料都是相同的,配置格式,如下所示:

01 # List the sources, sinks and channels for the agent
02 <Agent>.sources = <Source1>
03 <Agent>.sinks = <Sink1> <Sink2>
04 <Agent>.channels = <Channel1> <Channel2>
05
06 # set list of channels for source (separated by space)
07 <Agent>.sources.<Source1>.channels = <Channel1> <Channel2>
08
09 # set channel for sinks
10 <Agent>.sinks.<Sink1>.channel = <Channel1>
11 <Agent>.sinks.<Sink2>.channel = <Channel2>
12
13 <Agent>.sources.<Source1>.selector.type = replicating

上面指定了selector的type的值為replication,其他的配置沒有指定,使用的Replication方式,Source1會將資料分別儲存到Channel1和Channel2,這兩個channel裡面儲存的資料是相同的,然後資料被傳遞到Sink1和Sink2。
Multiplexing方式,selector可以根據header的值來確定資料傳遞到哪一個channel,配置格式,如下所示:

1 # Mapping for multiplexing selector
2 <Agent>.sources.<Source1>.selector.type = multiplexing
3 <Agent>.sources.<Source1>.selector.header = <someHeader>
4 <Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1>
5 <Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2>
6 <Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2>
7 #...
8
9 <Agent>.sources.<Source1>.selector.default = <Channel2>

上面selector的type的值為multiplexing,同時配置selector的header資訊,還配置了多個selector的mapping的值,即header的值:如果header的值為Value1、Value2,資料從Source1路由到Channel1;如果header的值為Value2、Value3,資料從Source1路由到Channel2。

  • 實現load balance功能

flume-load-balance-agents
Load balancing Sink Processor能夠實現load balance功能,上圖Agent1是一個路由節點,負責將Channel暫存的Event均衡到對應的多個Sink元件上,而每個Sink元件分別連線到一個獨立的Agent上,示例配置,如下所示:

1 a1.sinkgroups = g1
2 a1.sinkgroups.g1.sinks = k1 k2 k3
3 a1.sinkgroups.g1.processor.type = load_balance
4 a1.sinkgroups.g1.processor.backoff = true
5 a1.sinkgroups.g1.processor.selector = round_robin
6 a1.sinkgroups.g1.processor.selector.maxTimeOut=10000
  • 實現failover能

Failover Sink Processor能夠實現failover功能,具體流程類似load balance,但是內部處理機制與load balance完全不同:Failover Sink Processor維護一個優先順序Sink元件列表,只要有一個Sink元件可用,Event就被傳遞到下一個元件。如果一個Sink能夠成功處理Event,則會加入到一個Pool中,否則會被移出Pool並計算失敗次數,設定一個懲罰因子,示例配置如下所示:

1 a1.sinkgroups = g1
2 a1.sinkgroups.g1.sinks = k1 k2 k3
3 a1.sinkgroups.g1.processor.type = failover
4 a1.sinkgroups.g1.processor.priority.k1 = 5
5 a1.sinkgroups.g1.processor.priority.k2 = 7
6 a1.sinkgroups.g1.processor.priority.k3 = 6
7 a1.sinkgroups.g1.processor.maxpenalty = 20000

基本功能

我們看一下,Flume NG都支援哪些功能(目前最新版本是1.5.0.1),瞭解它的功能集合,能夠讓我們在應用中更好地選擇使用哪一種方案。說明Flume NG的功能,實際還是圍繞著Agent的三個元件Source、Channel、Sink來看它能夠支援哪些技術或協議。我們不再對各個元件支援的協議詳細配置進行說明,通過列表的方式分別對三個元件進行概要說明:

  • Flume Source
Source型別 說明
Avro Source 支援Avro協議(實際上是Avro RPC),內建支援
Thrift Source 支援Thrift協議,內建支援
Exec Source 基於Unix的command在標準輸出上生產資料
JMS Source 從JMS系統(訊息、主題)中讀取資料,ActiveMQ已經測試過
Spooling Directory Source 監控指定目錄內資料變更
Twitter 1% firehose Source 通過API持續下載Twitter資料,試驗性質
Netcat Source 監控某個埠,將流經埠的每一個文字行資料作為Event輸入
Sequence Generator Source 序列生成器資料來源,生產序列資料
Syslog Sources 讀取syslog資料,產生Event,支援UDP和TCP兩種協議
HTTP Source 基於HTTP POST或GET方式的資料來源,支援JSON、BLOB表示形式
Legacy Sources 相容老的Flume OG中Source(0.9.x版本)
  • Flume Channel
Channel型別 說明
Memory Channel Event資料儲存在記憶體中
JDBC Channel Event資料儲存在持久化儲存中,當前Flume Channel內建支援Derby
File Channel Event資料儲存在磁碟檔案中
Spillable Memory Channel Event資料儲存在記憶體中和磁碟上,當記憶體佇列滿了,會持久化到磁碟檔案(當前試驗性的,不建議生產環境使用)
Pseudo Transaction Channel 測試用途
Custom Channel 自定義Channel實現
  • Flume Sink
Sink型別 說明
HDFS Sink 資料寫入HDFS
Logger Sink 資料寫入日誌檔案
Avro Sink 資料被轉換成Avro Event,然後傳送到配置的RPC埠上
Thrift Sink 資料被轉換成Thrift Event,然後傳送到配置的RPC埠上
IRC Sink 資料在IRC上進行回放
File Roll Sink 儲存資料到本地檔案系統
Null Sink 丟棄到所有資料
HBase Sink 資料寫入HBase資料庫
Morphline Solr Sink 資料傳送到Solr搜尋伺服器(叢集)
ElasticSearch Sink 資料傳送到Elastic Search搜尋伺服器(叢集)
Kite Dataset Sink 寫資料到Kite Dataset,試驗性質的
Custom Sink 自定義Sink實現

另外還有Channel Selector、Sink Processor、Event Serializer、Interceptor等元件,可以參考官網提供的使用者手冊。

應用實踐

安裝Flume NG非常簡單,我們使用最新的1.5.0.1版本,執行如下命令:

如果需要使用到Hadoop叢集,保證Hadoop相關的環境變數都已經正確配置,並且Hadoop叢集可用。下面,通過一些實際的配置例項,來了解Flume的使用。為了簡單期間,channel我們使用Memory型別的channel。

  • Avro Source+Memory Channel+Logger Sink

使用apache-flume-1.5.0.1自帶的例子,使用Avro Source接收外部資料來源,Logger作為sink,即通過Avro RPC呼叫,將資料快取在channel中,然後通過Logger打印出呼叫傳送的資料。
配置Agent,修改配置檔案conf/flume-conf.properties,內容如下:

01 # Define a memory channel called ch1 on agent1
02 agent1.channels.ch1.type = memory
03
04 # Define an Avro source called avro-source1 on agent1 and tell it
05 # to bind to 0.0.0.0:41414. Connect it to channel ch1.
06 agent1.sources.avro-source1.channels = ch1
07 agent1.sources.avro-source1.type = avro
08 agent1.sources.avro-source1.bind = 0.0.0.0
09 agent1.sources.avro-source1.port = 41414
10
11 # Define a logger sink that simply logs all events it receives
12 # and connect it to the other end of the same channel.
13 agent1.sinks.log-sink1.channel = ch1
14 agent1.sinks.log-sink1.type = logger
15
16 # Finally, now that we've defined all of our components, tell
17 # agent1 which ones we want to activate.
18 agent1.channels = ch1
19 agent1.channels.ch1.capacity = 1000
20 agent1.sources = avro-source1
21 agent1.sinks = log-sink1

首先,啟動Agent程序:

1 bin/flume-ng agent -c ./conf/ -f conf/flume-conf.properties -Dflume.root.logger=DEBUG,console -n agent1

然後,啟動Avro Client,傳送資料:

1 bin/flume-ng avro-client -c ./conf/ -H 0.0.0.0 -p 41414 -F /usr/local/programs/logs/sync.log -Dflume.root.logger=DEBUG,console
  • Avro Source+Memory Channel+HDFS Sink

配置Agent,修改配置檔案conf/flume-conf-hdfs.properties,內容如下:

01 # Define a source, channel, sink
02 agent1.sources = avro-source1
03 agent1.channels = ch1
04 agent1.sinks = hdfs-sink
05
06 # Configure channel
07 agent1.channels.ch1.type = memory
08 agent1.channels.ch1.capacity = 1000000
09 agent1.channels.ch1.transactionCapacity = 500000
10
11 # Define an Avro source called avro-source1 on agent1 and tell it
12 # to bind to 0.0.0.0:41414. Connect it to channel ch1.
13 agent1.sources.avro-source1.channels = ch1
14 agent1.sources.avro-source1.type = avro
15 agent1.sources.avro-source1.bind = 0.0.0.0
16 agent1.sources.avro-source1.port = 41414
17
18 # Define a logger sink that simply logs all events it receives
19 # and connect it to the other end of the same channel.
20 agent1.sinks.hdfs-sink1.channel = ch1
21 agent1.sinks.hdfs-sink1.type = hdfs
23 agent1.sinks.hdfs-sink1.hdfs.filePrefix = sync_file
24 agent1.sinks.hdfs-sink1.hdfs.fileSuffix = .log
25 agent1.sinks.hdfs-sink1.hdfs.rollSize = 1048576
26 agent1.sinks.hdfs-sink1.rollInterval = 0
27 agent1.sinks.hdfs-sink1.hdfs.rollCount = 0
28 agent1.sinks.hdfs-sink1.hdfs.batchSize = 1500
29 agent1.sinks.hdfs-sink1.hdfs.round = true
30 agent1.sinks.hdfs-sink1.hdfs.roundUnit = minute
31 agent1.sinks.hdfs-sink1.hdfs.threadsPoolSize = 25
32 agent1.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = true
33 agent1.sinks.hdfs-sink1.hdfs.minBlockReplicas = 1
34 agent1.sinks.hdfs-sink1.fileType = SequenceFile
35 agent1.sinks.hdfs-sink1.writeFormat = TEXT

首先,啟動Agent:

1 bin/flume-ng agent -c ./conf/ -f conf/flume-conf-hdfs.properties -Dflume.root.logger=INFO,console -n agent1

然後,啟動Avro Client,傳送資料:

1 bin/flume-ng avro-client -c ./conf/ -H 0.0.0.0 -p 41414 -F /usr/local/programs/logs/sync.log -Dflume.root.logger=DEBUG,console

可以檢視同步到HDFS上的資料:

1 hdfs dfs -ls/data/flume

結果示例,如下所示:

1 -rw-r--r--   3 shirdrn supergroup    1377617 2014-09-16 14:35 /data/flume/sync_file.1410849320761.log
2 -rw-r--r--   3 shirdrn supergroup    1378137 2014-09-16 14:35 /data/flume/sync_file.1410849320762.log
3 -rw-r--r--   3 shirdrn supergroup     259148 2014-09-16 14:35 /data/flume/sync_file.1410849320763.log
  • Spooling Directory Source+Memory Channel+HDFS Sink

配置Agent,修改配置檔案flume-conf-spool.properties,內容如下:

01 # Define source, channel, sink
02 agent1.sources = spool-source1
03 agent1.channels = ch1
04 agent1.sinks = hdfs-sink1
05
06 # Configure channel
07 agent1.channels.ch1.type = memory
08 agent1.channels.ch1.capacity = 1000000
09 agent1.channels.ch1.transactionCapacity = 500000
10
11 # Define and configure an Spool directory source
12 agent1.sources.spool-source1.channels = ch1
13 agent1.sources.spool-source1.type = spooldir
14 agent1.sources.spool-source1.spoolDir = /home/shirdrn/data/
15 agent1.sources.spool-source1.ignorePattern = event(_\d{4}\-\d{2}\-\d{2}_\d{2}_\d{2})?\.log(\.COMPLETED)?
16 agent1.sources.spool-source1.batchSize = 50
17 agent1.sources.spool-source1.inputCharset = UTF-8
18
19 # Define and configure a hdfs sink
20 agent1.sinks.hdfs-sink1.channel = ch1
21 agent1.sinks.hdfs-sink1.type = hdfs
23 agent1.sinks.hdfs-sink1.hdfs.filePrefix = event_%y-%m-%d_%H_%M_%S
24 agent1.sinks.hdfs-sink1.hdfs.fileSuffix = .log
25 agent1.sinks.hdfs-sink1.hdfs.rollSize = 1048576
26 agent1.sinks.hdfs-sink1.hdfs.rollCount = 0
27 agent1.sinks.hdfs-sink1.hdfs.batchSize = 1500
28 agent1.sinks.hdfs-sink1.hdfs.round = true
29 agent1.sinks.hdfs-sink1.hdfs.roundUnit = minute
30 agent1.sinks.hdfs-sink1.hdfs.threadsPoolSize = 25
31 agent1.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = true
32 agent1.sinks.hdfs-sink1.hdfs.minBlockReplicas = 1
33 agent1.sinks.hdfs-sink1.fileType = SequenceFile
34 agent1.sinks.hdfs-sink1.writeFormat = TEXT
35 agent1.sinks.hdfs-sink1.rollInterval = 0

啟動Agent程序,執行如下命令:

1 bin/flume-ng agent -c ./conf/ -f conf/flume-conf-spool.properties -Dflume.root.logger=INFO,console -n agent1

可以檢視HDFS上同步過來的資料:

1 hdfs dfs -ls/data/flume

結果示例,如下所示:

01 -rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355094.log
02 -rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355095.log
03 -rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355096.log
04 -rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355097.log
05 -rw-r--r--   3 shirdrn supergroup       1530 2014-09-17 10:53 /data/flume/event_14-09-17_10_52_00.1410922355098.log
06 -rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380386.log
07 -rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380387.log
08 -rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380388.log
09 -rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380389.log
10 -rw-r--r--   3 shirdrn supergroup    1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380390.log
  • Exec Source+Memory Channel+File Roll Sink

配置Agent,修改配置檔案flume-conf-file.properties,內容如下:

01 # Define source, channel, sink
02 agent1.sources = tail-source1
03 agent1.channels = ch1
04 agent1.sinks = file-sink1
05
06 # Configure channel
07 agent1.channels.ch1.type = memory
08 agent1.channels.ch1.capacity = 1000000
09 agent1.channels.ch1.transactionCapacity = 500000
10
11 # Define and configure an Exec source
12 agent1.sources.tail-source1.channels = ch1
13 agent1.sources.tail-source1.type = exec
14 agent1.sources.tail-source1.command = tail -F /home/shirdrn/data/event.log
15 agent1.sources.tail-source1.shell = /bin/sh -c
16 agent1.sources.tail-source1.batchSize = 50
17
18 # Define and configure a File roll sink
19 # and connect it to the other end of the same channel.
20 agent1.sinks.file-sink1.channel = ch1
21 agent1.sinks.file-sink1.type = file_roll
22 agent1.sinks.file-sink1.batchSize = 100
23 agent1.sinks.file-sink1.serializer = TEXT
24 agent1.sinks.file-sink1.sink.directory = /home/shirdrn/sink_data

啟動Agent程序,執行如下命令:

1 bin/flume-ng agent -c ./conf/ -f conf/flume-conf-file.properties -Dflume.root.logger=INFO,console -n agent1

可以檢視File Roll Sink對應的本地檔案系統目錄/home/shirdrn/sink_data下,示例如下所示:

1 -rw-rw-r-- 1 shirdrn shirdrn 13944825 Sep 17 11:36 1410924990039-1
2 -rw-rw-r-- 1 shirdrn shirdrn 11288870 Sep 17 11:37 1410924990039-2
3 -rw-rw-r-- 1 shirdrn shirdrn        0 Sep 17 11:37 1410924990039-3
4 -rw-rw-r-- 1 shirdrn shirdrn 20517500 Sep 17 11:38 1410924990039-4
5 -rw-rw-r-- 1 shirdrn shirdrn 16343250 Sep 17 11:38 1410924990039-5

有關Flume NG更多配置及其說明,請參考官方使用者手冊,非常詳細。

參考連結

轉載自yangjun