Flume(NG)架構設計要點及配置實踐 Flume NG是一個分散式、可靠、可用的系統,它能夠將不同資料來源的海量日誌資料進行高效收集
Flume(NG)架構設計要點及配置實踐
Flume NG是一個分散式、可靠、可用的系統,它能夠將不同資料來源的海量日誌資料進行高效收集、聚合、移動,最後儲存到一箇中心化資料儲存系統中。由原來的Flume OG到現在的Flume NG,進行了架構重構,並且現在NG版本完全不相容原來的OG版本。經過架構重構後,Flume NG更像是一個輕量的小工具,非常簡單,容易適應各種方式日誌收集,並支援failover和負載均衡。
架構設計要點
Flume的架構主要有一下幾個核心概念:
- Event:一個數據單元,帶有一個可選的訊息頭
- Flow:Event從源點到達目的點的遷移的抽象
- Client:操作位於源點處的Event,將其傳送到Flume Agent
- Agent:一個獨立的Flume程序,包含元件Source、Channel、Sink
- Source:用來消費傳遞到該元件的Event
- Channel:中轉Event的一個臨時儲存,儲存有Source元件傳遞過來的Event
- Sink:從Channel中讀取並移除Event,將Event傳遞到Flow Pipeline中的下一個Agent(如果有的話)
Flume NG架構,如圖所示:
外部系統產生日誌,直接通過Flume的Agent的Source元件將事件(如日誌行)傳送到中間臨時的channel元件,最後傳遞給Sink元件,HDFS Sink元件可以直接把資料儲存到HDFS叢集上。
一個最基本Flow的配置,格式如下:
01 |
# list the sources, sinks and channels for the agent |
02 |
<Agent>.sources = <Source1> <Source2> |
03 |
<Agent>.sinks = <Sink1> <Sink2> |
04 |
<Agent>.channels = <Channel1> <Channel2> |
05 |
06 |
# set channel for source |
07 |
<Agent>.sources.<Source1>.channels = <Channel1> <Channel2> ... |
08 |
<Agent>.sources.<Source2>.channels = <Channel1> <Channel2> ... |
09 |
10 |
# set channel for sink |
11 |
<Agent>.sinks.<Sink1>.channel = <Channel1> |
12 |
<Agent>.sinks.<Sink2>.channel = <Channel2> |
尖括號裡面的,我們可以根據實際需求或業務來修改名稱。下面詳細說明:
表示配置一個Agent的名稱,一個Agent肯定有一個名稱。與是Agent的Source元件的名稱,消費傳遞過來的Event。與是Agent的Channel元件的名稱。與是Agent的Sink元件的名稱,從Channel中消費(移除)Event。
上面配置內容中,第一組中配置Source、Sink、Channel,它們的值可以有1個或者多個;第二組中配置Source將把資料儲存(Put)到哪一個Channel中,可以儲存到1個或多個Channel中,同一個Source將資料儲存到多個Channel中,實際上是Replication;第三組中配置Sink從哪一個Channel中取(Task)資料,一個Sink只能從一個Channel中取資料。
下面,根據官網文件,我們展示幾種Flow Pipeline,各自適應於什麼樣的應用場景:
- 多個Agent順序連線
可以將多個Agent順序連線起來,將最初的資料來源經過收集,儲存到最終的儲存系統中。這是最簡單的情況,一般情況下,應該控制這種順序連線的Agent的數量,因為資料流經的路徑變長了,如果不考慮failover的話,出現故障將影響整個Flow上的Agent收集服務。
- 多個Agent的資料匯聚到同一個Agent
這種情況應用的場景比較多,比如要收集Web網站的使用者行為日誌,Web網站為了可用性使用的負載均衡的叢集模式,每個節點都產生使用者行為日誌,可以為每個節點都配置一個Agent來單獨收集日誌資料,然後多個Agent將資料最終匯聚到一個用來儲存資料儲存系統,如HDFS上。
- 多路(Multiplexing)Agent
這種模式,有兩種方式,一種是用來複制(Replication),另一種是用來分流(Multiplexing)。Replication方式,可以將最前端的資料來源複製多份,分別傳遞到多個channel中,每個channel接收到的資料都是相同的,配置格式,如下所示:
01 |
# List the sources, sinks and channels for the agent |
02 |
<Agent>.sources = <Source1> |
03 |
<Agent>.sinks = <Sink1> <Sink2> |
04 |
<Agent>.channels = <Channel1> <Channel2> |
05 |
06 |
# set list of channels for source (separated by space) |
07 |
<Agent>.sources.<Source1>.channels = <Channel1> <Channel2> |
08 |
09 |
# set channel for sinks |
10 |
<Agent>.sinks.<Sink1>.channel = <Channel1> |
11 |
<Agent>.sinks.<Sink2>.channel = <Channel2> |
12 |
13 |
<Agent>.sources.<Source1>.selector.type = replicating |
上面指定了selector的type的值為replication,其他的配置沒有指定,使用的Replication方式,Source1會將資料分別儲存到Channel1和Channel2,這兩個channel裡面儲存的資料是相同的,然後資料被傳遞到Sink1和Sink2。
Multiplexing方式,selector可以根據header的值來確定資料傳遞到哪一個channel,配置格式,如下所示:
1 |
# Mapping for multiplexing selector |
2 |
<Agent>.sources.<Source1>.selector.type = multiplexing |
3 |
<Agent>.sources.<Source1>.selector.header = <someHeader> |
4 |
<Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1> |
5 |
<Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2> |
6 |
<Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2> |
7 |
#... |
8 |
9 |
<Agent>.sources.<Source1>.selector.default = <Channel2> |
上面selector的type的值為multiplexing,同時配置selector的header資訊,還配置了多個selector的mapping的值,即header的值:如果header的值為Value1、Value2,資料從Source1路由到Channel1;如果header的值為Value2、Value3,資料從Source1路由到Channel2。
- 實現load balance功能
Load balancing Sink Processor能夠實現load balance功能,上圖Agent1是一個路由節點,負責將Channel暫存的Event均衡到對應的多個Sink元件上,而每個Sink元件分別連線到一個獨立的Agent上,示例配置,如下所示:
1 |
a1.sinkgroups = g1 |
2 |
a1.sinkgroups.g1.sinks = k1 k2 k3 |
3 |
a1.sinkgroups.g1.processor.type = load_balance |
4 |
a1.sinkgroups.g1.processor.backoff = true |
5 |
a1.sinkgroups.g1.processor.selector = round_robin |
6 |
a1.sinkgroups.g1.processor.selector.maxTimeOut=10000 |
- 實現failover能
Failover Sink Processor能夠實現failover功能,具體流程類似load balance,但是內部處理機制與load balance完全不同:Failover Sink Processor維護一個優先順序Sink元件列表,只要有一個Sink元件可用,Event就被傳遞到下一個元件。如果一個Sink能夠成功處理Event,則會加入到一個Pool中,否則會被移出Pool並計算失敗次數,設定一個懲罰因子,示例配置如下所示:
1 |
a1.sinkgroups = g1 |
2 |
a1.sinkgroups.g1.sinks = k1 k2 k3 |
3 |
a1.sinkgroups.g1.processor.type = failover |
4 |
a1.sinkgroups.g1.processor.priority.k1 = 5 |
5 |
a1.sinkgroups.g1.processor.priority.k2 = 7 |
6 |
a1.sinkgroups.g1.processor.priority.k3 = 6 |
7 |
a1.sinkgroups.g1.processor.maxpenalty = 20000 |
基本功能
我們看一下,Flume NG都支援哪些功能(目前最新版本是1.5.0.1),瞭解它的功能集合,能夠讓我們在應用中更好地選擇使用哪一種方案。說明Flume NG的功能,實際還是圍繞著Agent的三個元件Source、Channel、Sink來看它能夠支援哪些技術或協議。我們不再對各個元件支援的協議詳細配置進行說明,通過列表的方式分別對三個元件進行概要說明:
- Flume Source
Source型別 | 說明 |
Avro Source | 支援Avro協議(實際上是Avro RPC),內建支援 |
Thrift Source | 支援Thrift協議,內建支援 |
Exec Source | 基於Unix的command在標準輸出上生產資料 |
JMS Source | 從JMS系統(訊息、主題)中讀取資料,ActiveMQ已經測試過 |
Spooling Directory Source | 監控指定目錄內資料變更 |
Twitter 1% firehose Source | 通過API持續下載Twitter資料,試驗性質 |
Netcat Source | 監控某個埠,將流經埠的每一個文字行資料作為Event輸入 |
Sequence Generator Source | 序列生成器資料來源,生產序列資料 |
Syslog Sources | 讀取syslog資料,產生Event,支援UDP和TCP兩種協議 |
HTTP Source | 基於HTTP POST或GET方式的資料來源,支援JSON、BLOB表示形式 |
Legacy Sources | 相容老的Flume OG中Source(0.9.x版本) |
- Flume Channel
Channel型別 | 說明 |
Memory Channel | Event資料儲存在記憶體中 |
JDBC Channel | Event資料儲存在持久化儲存中,當前Flume Channel內建支援Derby |
File Channel | Event資料儲存在磁碟檔案中 |
Spillable Memory Channel | Event資料儲存在記憶體中和磁碟上,當記憶體佇列滿了,會持久化到磁碟檔案(當前試驗性的,不建議生產環境使用) |
Pseudo Transaction Channel | 測試用途 |
Custom Channel | 自定義Channel實現 |
- Flume Sink
Sink型別 | 說明 |
HDFS Sink | 資料寫入HDFS |
Logger Sink | 資料寫入日誌檔案 |
Avro Sink | 資料被轉換成Avro Event,然後傳送到配置的RPC埠上 |
Thrift Sink | 資料被轉換成Thrift Event,然後傳送到配置的RPC埠上 |
IRC Sink | 資料在IRC上進行回放 |
File Roll Sink | 儲存資料到本地檔案系統 |
Null Sink | 丟棄到所有資料 |
HBase Sink | 資料寫入HBase資料庫 |
Morphline Solr Sink | 資料傳送到Solr搜尋伺服器(叢集) |
ElasticSearch Sink | 資料傳送到Elastic Search搜尋伺服器(叢集) |
Kite Dataset Sink | 寫資料到Kite Dataset,試驗性質的 |
Custom Sink | 自定義Sink實現 |
另外還有Channel Selector、Sink Processor、Event Serializer、Interceptor等元件,可以參考官網提供的使用者手冊。
應用實踐
安裝Flume NG非常簡單,我們使用最新的1.5.0.1版本,執行如下命令:
如果需要使用到Hadoop叢集,保證Hadoop相關的環境變數都已經正確配置,並且Hadoop叢集可用。下面,通過一些實際的配置例項,來了解Flume的使用。為了簡單期間,channel我們使用Memory型別的channel。
- Avro Source+Memory Channel+Logger Sink
使用apache-flume-1.5.0.1自帶的例子,使用Avro Source接收外部資料來源,Logger作為sink,即通過Avro RPC呼叫,將資料快取在channel中,然後通過Logger打印出呼叫傳送的資料。
配置Agent,修改配置檔案conf/flume-conf.properties,內容如下:
01 |
# Define a memory channel called ch1 on agent1 |
02 |
agent1.channels.ch1.type = memory |
03 |
04 |
# Define an Avro source called avro-source1 on agent1 and tell it |
05 |
# to bind to 0.0.0.0:41414. Connect it to channel ch1. |
06 |
agent1.sources.avro-source1.channels = ch1 |
07 |
agent1.sources.avro-source1.type = avro |
08 |
agent1.sources.avro-source1.bind = 0.0.0.0 |
09 |
agent1.sources.avro-source1.port = 41414 |
10 |
11 |
# Define a logger sink that simply logs all events it receives |
12 |
# and connect it to the other end of the same channel. |
13 |
agent1.sinks.log-sink1.channel = ch1 |
14 |
agent1.sinks.log-sink1.type = logger |
15 |
16 |
# Finally, now that we've defined all of our components, tell |
17 |
# agent1 which ones we want to activate. |
18 |
agent1.channels = ch1 |
19 |
agent1.channels.ch1.capacity = 1000 |
20 |
agent1.sources = avro-source1 |
21 |
agent1.sinks = log-sink1 |
首先,啟動Agent程序:
1 |
bin/flume-ng agent -c ./conf/ -f conf/flume-conf.properties -Dflume.root.logger=DEBUG,console -n agent1 |
然後,啟動Avro Client,傳送資料:
1 |
bin/flume-ng avro-client -c ./conf/ -H 0.0.0.0 -p 41414 -F /usr/ local /programs/logs/ sync .log -Dflume.root.logger=DEBUG,console |
- Avro Source+Memory Channel+HDFS Sink
配置Agent,修改配置檔案conf/flume-conf-hdfs.properties,內容如下:
01 |
# Define a source, channel, sink |
02 |
agent1.sources = avro-source1 |
03 |
agent1.channels = ch1 |
04 |
agent1.sinks = hdfs-sink |
05 |
06 |
# Configure channel |
07 |
agent1.channels.ch1.type = memory |
08 |
agent1.channels.ch1.capacity = 1000000 |
09 |
agent1.channels.ch1.transactionCapacity = 500000 |
10 |
11 |
# Define an Avro source called avro-source1 on agent1 and tell it |
12 |
# to bind to 0.0.0.0:41414. Connect it to channel ch1. |
13 |
agent1.sources.avro-source1.channels = ch1 |
14 |
agent1.sources.avro-source1.type = avro |
15 |
agent1.sources.avro-source1.bind = 0.0.0.0 |
16 |
agent1.sources.avro-source1.port = 41414 |
17 |
18 |
# Define a logger sink that simply logs all events it receives |
19 |
# and connect it to the other end of the same channel. |
20 |
agent1.sinks.hdfs-sink1.channel = ch1 |
21 |
agent1.sinks.hdfs-sink1.type = hdfs |
23 |
agent1.sinks.hdfs-sink1.hdfs.filePrefix = sync_file |
24 |
agent1.sinks.hdfs-sink1.hdfs.fileSuffix = .log |
25 |
agent1.sinks.hdfs-sink1.hdfs.rollSize = 1048576 |
26 |
agent1.sinks.hdfs-sink1.rollInterval = 0 |
27 |
agent1.sinks.hdfs-sink1.hdfs.rollCount = 0 |
28 |
agent1.sinks.hdfs-sink1.hdfs.batchSize = 1500 |
29 |
agent1.sinks.hdfs-sink1.hdfs.round = true |
30 |
agent1.sinks.hdfs-sink1.hdfs.roundUnit = minute |
31 |
agent1.sinks.hdfs-sink1.hdfs.threadsPoolSize = 25 |
32 |
agent1.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = true |
33 |
agent1.sinks.hdfs-sink1.hdfs.minBlockReplicas = 1 |
34 |
agent1.sinks.hdfs-sink1.fileType = SequenceFile |
35 |
agent1.sinks.hdfs-sink1.writeFormat = TEXT |
首先,啟動Agent:
1 |
bin/flume-ng agent -c ./conf/ -f conf/flume-conf-hdfs.properties -Dflume.root.logger=INFO,console -n agent1 |
然後,啟動Avro Client,傳送資料:
1 |
bin/flume-ng avro-client -c ./conf/ -H 0.0.0.0 -p 41414 -F /usr/ local /programs/logs/ sync .log -Dflume.root.logger=DEBUG,console |
可以檢視同步到HDFS上的資料:
1 |
hdfs dfs - ls /data/flume |
結果示例,如下所示:
1 |
-rw-r--r-- 3 shirdrn supergroup 1377617 2014-09-16 14:35 /data/flume/sync_file.1410849320761.log |
2 |
-rw-r--r-- 3 shirdrn supergroup 1378137 2014-09-16 14:35 /data/flume/sync_file.1410849320762.log |
3 |
-rw-r--r-- 3 shirdrn supergroup 259148 2014-09-16 14:35 /data/flume/sync_file.1410849320763.log |
- Spooling Directory Source+Memory Channel+HDFS Sink
配置Agent,修改配置檔案flume-conf-spool.properties,內容如下:
01 |
# Define source, channel, sink |
02 |
agent1.sources = spool-source1 |
03 |
agent1.channels = ch1 |
04 |
agent1.sinks = hdfs-sink1 |
05 |
06 |
# Configure channel |
07 |
agent1.channels.ch1.type = memory |
08 |
agent1.channels.ch1.capacity = 1000000 |
09 |
agent1.channels.ch1.transactionCapacity = 500000 |
10 |
11 |
# Define and configure an Spool directory source |
12 |
agent1.sources.spool-source1.channels = ch1 |
13 |
agent1.sources.spool-source1.type = spooldir |
14 |
agent1.sources.spool-source1.spoolDir = /home/shirdrn/data/ |
15 |
agent1.sources.spool-source1.ignorePattern = event(_\d{4}\-\d{2}\-\d{2}_\d{2}_\d{2})?\.log(\.COMPLETED)? |
16 |
agent1.sources.spool-source1.batchSize = 50 |
17 |
agent1.sources.spool-source1.inputCharset = UTF-8 |
18 |
19 |
# Define and configure a hdfs sink |
20 |
agent1.sinks.hdfs-sink1.channel = ch1 |
21 |
agent1.sinks.hdfs-sink1.type = hdfs |
23 |
agent1.sinks.hdfs-sink1.hdfs.filePrefix = event_%y-%m-%d_%H_%M_%S |
24 |
agent1.sinks.hdfs-sink1.hdfs.fileSuffix = .log |
25 |
agent1.sinks.hdfs-sink1.hdfs.rollSize = 1048576 |
26 |
agent1.sinks.hdfs-sink1.hdfs.rollCount = 0 |
27 |
agent1.sinks.hdfs-sink1.hdfs.batchSize = 1500 |
28 |
agent1.sinks.hdfs-sink1.hdfs.round = true |
29 |
agent1.sinks.hdfs-sink1.hdfs.roundUnit = minute |
30 |
agent1.sinks.hdfs-sink1.hdfs.threadsPoolSize = 25 |
31 |
agent1.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = true |
32 |
agent1.sinks.hdfs-sink1.hdfs.minBlockReplicas = 1 |
33 |
agent1.sinks.hdfs-sink1.fileType = SequenceFile |
34 |
agent1.sinks.hdfs-sink1.writeFormat = TEXT |
35 |
agent1.sinks.hdfs-sink1.rollInterval = 0 |
啟動Agent程序,執行如下命令:
1 |
bin/flume-ng agent -c ./conf/ -f conf/flume-conf-spool.properties -Dflume.root.logger=INFO,console -n agent1 |
可以檢視HDFS上同步過來的資料:
1 |
hdfs dfs - ls /data/flume |
結果示例,如下所示:
01 |
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355094.log |
02 |
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355095.log |
03 |
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355096.log |
04 |
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:52 /data/flume/event_14-09-17_10_52_00.1410922355097.log |
05 |
-rw-r--r-- 3 shirdrn supergroup 1530 2014-09-17 10:53 /data/flume/event_14-09-17_10_52_00.1410922355098.log |
06 |
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380386.log |
07 |
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380387.log |
08 |
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380388.log |
09 |
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380389.log |
10 |
-rw-r--r-- 3 shirdrn supergroup 1072265 2014-09-17 10:53 /data/flume/event_14-09-17_10_53_00.1410922380390.log |
- Exec Source+Memory Channel+File Roll Sink
配置Agent,修改配置檔案flume-conf-file.properties,內容如下:
01 |
# Define source, channel, sink |
02 |
agent1.sources = tail-source1 |
03 |
agent1.channels = ch1 |
04 |
agent1.sinks = file-sink1 |
05 |
06 |
# Configure channel |
07 |
agent1.channels.ch1.type = memory |
08 |
agent1.channels.ch1.capacity = 1000000 |
09 |
agent1.channels.ch1.transactionCapacity = 500000 |
10 |
11 |
# Define and configure an Exec source |
12 |
agent1.sources.tail-source1.channels = ch1 |
13 |
agent1.sources.tail-source1.type = exec |
14 |
agent1.sources.tail-source1.command = tail -F /home/shirdrn/data/event.log |
15 |
agent1.sources.tail-source1.shell = /bin/sh -c |
16 |
agent1.sources.tail-source1.batchSize = 50 |
17 |
18 |
# Define and configure a File roll sink |
19 |
# and connect it to the other end of the same channel. |
20 |
agent1.sinks.file-sink1.channel = ch1 |
21 |
agent1.sinks.file-sink1.type = file_roll |
22 |
agent1.sinks.file-sink1.batchSize = 100 |
23 |
agent1.sinks.file-sink1.serializer = TEXT |
24 |
agent1.sinks.file-sink1.sink.directory = /home/shirdrn/sink_data |
啟動Agent程序,執行如下命令:
1 |
bin/flume-ng agent -c ./conf/ -f conf/flume-conf- file .properties -Dflume.root.logger=INFO,console -n agent1 |
可以檢視File Roll Sink對應的本地檔案系統目錄/home/shirdrn/sink_data下,示例如下所示:
1 |
-rw-rw-r-- 1 shirdrn shirdrn 13944825 Sep 17 11:36 1410924990039-1 |
2 |
-rw-rw-r-- 1 shirdrn shirdrn 11288870 Sep 17 11:37 1410924990039-2 |
3 |
-rw-rw-r-- 1 shirdrn shirdrn 0 Sep 17 11:37 1410924990039-3 |
4 |
-rw-rw-r-- 1 shirdrn shirdrn 20517500 Sep 17 11:38 1410924990039-4 |
5 |
-rw-rw-r-- 1 shirdrn shirdrn 16343250 Sep 17 11:38 1410924990039-5 |
有關Flume NG更多配置及其說明,請參考官方使用者手冊,非常詳細。
參考連結
轉載自yangjun