Flume構建日誌採集系統
title: Flume構建日誌採集系統 date: 2018-02-03 19:45 tags: [flume,kafka]
一、Flume介紹
1.Flume特點
- Flume是一個分散式的、可靠的、高可用的海量日誌採集 、聚合和傳輸的系統
- 資料流模型:Source-Channel-Sink
- 事務機制保證訊息傳遞的可靠性
- 內建豐富外掛,輕鬆與其他系統整合
- Java實現,優秀的系統框架設計,模組分明,易於開發
2.Flume原型圖
3.Flume基本元件
- Event:訊息的基本單位,有header和body組成
- Agent:JVM程式,負責將一端外部來源產生的訊息轉 發到另一端外部的目的地
- Source:從外部來源讀入event,並寫入channel
- Channel:event暫存元件,source寫入後,event將會 一直儲存,
- Sink:從channel讀入event,並寫入目的地
3.Flume事件流
4.Flumes資料流
二、Flume搭建
1.下載二進位制安裝包
下載地址:flume.apache.org/download.ht…
2.安裝Flume
解壓縮安裝包檔案
[hadoop@hadoop01 apps]$ tar -zxvf apache-flume-1.8.0-bin.tar.gz
[hadoop@hadoop01 apps]$ cd apache-flume-1.8.0-bin/
[hadoop@hadoop01 apache-flume-1.8.0-bin]$ ll
總用量 148
drwxr-xr-x. 2 hadoop hadoop 62 1月 21 14:31 bin
-rw-r--r--. 1 hadoop hadoop 81264 9月 15 20:26 CHANGELOG
drwxr-xr-x. 2 hadoop hadoop 127 1月 21 14:31 conf
-rw-r--r--. 1 hadoop hadoop 5681 9月 15 20:26 DEVNOTES
-rw-r--r--. 1 hadoop hadoop 2873 9月 15 20:26 doap_Flume.rdf
drwxr-xr-x. 10 hadoop hadoop 4096 9月 15 20:48 docs
drwxr-xr-x. 2 hadoop hadoop 8192 1月 21 14:31 lib
-rw-r--r--. 1 hadoop hadoop 27663 9月 15 20:26 LICENSE
-rw-r--r--. 1 hadoop hadoop 249 9月 15 20:26 NOTICE
-rw-r--r--. 1 hadoop hadoop 2483 9月 15 20:26 README.md
-rw-r--r--. 1 hadoop hadoop 1588 9月 15 20:26 RELEASE-NOTES
drwxr-xr-x. 2 hadoop hadoop 68 1月 21 14:31 tools
[hadoop@hadoop01 apache-flume-1.8.0-bin]$
複製程式碼
3.建立軟連線【此步驟可省略】
[root@hadoop01 bin]# ln -s /home/hadoop/apps/apache-flume-1.8.0-bin /usr/local/flume
複製程式碼
4.配置環境變數
編輯 /etc/profile檔案,增加以下內容:
export FLUME_HOME=/usr/local/flume
export PATH=$PATH:${JAVA_HOME}/bin:${ZOOKEEPER_HOME}/bin:${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:${HIVE_HOME}/bin:${FLUME_HOME}/bin
複製程式碼
4.啟動flume
使用example.conf 配置檔案啟動一個例項
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
複製程式碼
啟動命令如下:
[root@hadoop01 conf]# pwd
/home/hadoop/apps/apache-flume-1.8.0-bin/conf
[root@hadoop01 conf]# flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
複製程式碼
啟動成功後如下圖所示:
........略
18/01/27 18:17:25 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1,k1]
18/01/27 18:17:25 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@20470f counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
18/01/27 18:17:25 INFO node.Application: Starting Channel c1
18/01/27 18:17:25 INFO node.Application: Waiting for channel: c1 to start. Sleeping for 500 ms
18/01/27 18:17:25 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL,name: c1: Successfully registered new MBean.
18/01/27 18:17:25 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL,name: c1 started
18/01/27 18:17:26 INFO node.Application: Starting Sink k1
18/01/27 18:17:26 INFO node.Application: Starting Source r1
18/01/27 18:17:26 INFO source.NetcatSource: Source starting
18/01/27 18:17:26 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]
複製程式碼
使用telnet傳送資料
[root@hadoop01 apps]# telnet localhost 44444
Trying ::1...
telnet: connect to address ::1: Connection refused
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Are you OK ?
OK
複製程式碼
控制檯列印如下:
Impl[/127.0.0.1:44444]
18/01/27 18:21:00 INFO sink.LoggerSink: Event: { headers:{} body: 41 72 65 20 79 6F 75 20 4F 4B 20 3F 0D Are you OK ?. }
複製程式碼
如無法使用telnet,請先安裝telnet工具
[root@hadoop01 apps]# yum -y install telnet
複製程式碼
三、Flume實踐
1.Source元件清單
- Source:對接各種外部資料來源,將收集到的事件傳送到Channel中,一個source可以向多個channel傳送event,Flume內建非常豐富的Source,同時使用者可以自定義Source
Source型別 | Type | 用途 |
---|---|---|
Avro Source | avro | 啟動一個Avro Server,可與上一級Agent連線 |
HTTP Source | http | 啟動一個HttpServer |
Exec Source | exec | 執行unix command,獲取標準輸出,如tail -f |
Taildir Source | TAILDIR | 監聽目錄或檔案 |
Spooling Directory Source | spooldir | 監聽目錄下的新增檔案 |
Kafka Source | org.apache.flume.sourc e.kafka.KafkaSource | 讀取Kafka資料 |
JMS Source | jms | 從JMS源讀取資料 |
2.avro Source Agent 和Exec Source Agent
- 配置一個avroagent,avrosource.conf 配置檔案如下:
//avrosource.conf
avroagent.sources = r1
avroagent.channels = c1
avroagent.sinks = k1
avroagent.sources.r1.type = avro
avroagent.sources.r1.bind = 192.168.43.20
avroagent.sources.r1.port = 8888
avroagent.sources.r1.threads= 3
avroagent.sources.r1.channels = c1
avroagent.channels.c1.type = memory
avroagent.channels.c1.capacity = 10000
avroagent.channels.c1.transactionCapacity = 1000
avroagent.sinks.k1.type = logger
avroagent.sinks.k1.channel = c1
複製程式碼
- 啟動一個avrosource的agent
[root@hadoop01 conf]# flume-ng agent --conf conf --conf-file avrosource.conf --name avroagent -Dflume.root.logger=INFO,console
複製程式碼
啟動成功入下圖所示:
...略
18/01/27 18:46:36 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL,name: c1: Successfully registered new MBean.
18/01/27 18:46:36 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL,name: c1 started
18/01/27 18:46:36 INFO node.Application: Starting Sink k1
18/01/27 18:46:36 INFO node.Application: Starting Source r1
18/01/27 18:46:36 INFO source.AvroSource: Starting Avro source r1: { bindAddress: 192.168.43.20,port: 8888 }...
18/01/27 18:46:37 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE,name: r1: Successfully registered new MBean.
18/01/27 18:46:37 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE,name: r1 started
18/01/27 18:46:37 INFO source.AvroSource: Avro source r1 started
複製程式碼
- 配置一個execAgent,實現與sourceAgent實現串聯,execsource.conf 配置檔案如下:
execagent.sources = r1
execagent.channels = c1
execagent.sinks = k1
execagent.sources.r1.type = exec
execagent.sources.r1.command = tail -F /home/hadoop/apps/flume/execsource/exectest.log
execagent.sources.r1.channels = c1
execagent.channels.c1.type = memory
execagent.channels.c1.capacity = 10000
execagent.channels.c1.transactionCapacity = 1000
execagent.sinks.k1.type = avro
execagent.sinks.k1.channel = c1
execagent.sinks.k1.hostname = 192.168.43.20
execagent.sinks.k1.port = 8888
複製程式碼
- 啟動一個execAgent,並實現execagent監控檔案變化,sourceAgent接收變化內容
啟動 execAgent
[root@hadoop01 conf]# flume-ng agent --conf conf --conf-file execsource.conf --name execagent
複製程式碼
啟動成功如下下圖所示:
18/01/27 18:58:43 INFO instrumentation.MonitoredCounterGroup: Component type: SINK,name: k1 started
18/01/27 18:58:43 INFO sink.AbstractRpcSink: Rpc sink k1: Building RpcClient with hostname: 192.168.43.20,port: 8888
18/01/27 18:58:43 INFO sink.AvroSink: Attempting to create Avro Rpc client.
18/01/27 18:58:43 WARN api.NettyAvroRpcClient: Using default maxIOWorkers
18/01/27 18:58:44 INFO sink.AbstractRpcSink: Rpc sink k1 started.
複製程式碼
在execAgent監控的檔案下寫入內容,觀察sourceagent是否接收到變化內容
[root@hadoop01 execsource]# echo 222 > exectest.log
[root@hadoop01 execsource]# echo 5555 >> exectest.log
[root@hadoop01 execsource]# cat exectest.log
222
5555
複製程式碼
在sourceagent控制列印臺下檢視監控訊息如下:
18/01/27 18:58:50 INFO sink.LoggerSink: Event: { headers:{} body: 31 32 33 123 }
18/01/27 18:59:55 INFO sink.LoggerSink: Event: { headers:{} body: 35 35 35 35 5555 }
複製程式碼
則說明2個串聯agent傳遞資訊成功。
說明:
avroagent 配置檔案配置項起始名稱需要與服務啟動 -name 名稱相一致。
3.Source元件- Spooling Directory Source
- 配置一個Spooling Directory Source,spooldirsource.conf 配置檔案內容如下:
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = spooldir
a1.sources.r1.channels = c1
a1.sources.r1.spoolDir = /home/hadoop/apps/flume/spoolDir
a1.sources.r1.fileHeader = true
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
複製程式碼
/home/hadoop/apps/flume/spoolDir 必須已經建立且具有使用者讀寫許可權。
啟動 SpoolDirsourceAgent
[hadoop@hadoop01 conf]$ flume-ng agent --conf conf --conf-file spooldirsource.conf --name a1 -Dflume.root.logger=INFO,console
複製程式碼
在spoolDir資料夾下建立檔案並寫入檔案內容,觀察控制檯訊息:
18/01/28 17:06:54 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file /home/hadoop/apps/flume/spoolDir/test to /home/hadoop/apps/flume/spoolDir/test.COMPLETED
18/01/28 17:06:55 INFO sink.LoggerSink: Event: { headers:{file=/home/hadoop/apps/flume/spoolDir/test} body: 32 32 32 222 }
複製程式碼
此時監測到SpoolDirSourceAgent 可以監控到檔案變化。
值得說明的是:Spooling Directory Source Agent 並不能監聽子級資料夾的檔案變化,也不支援已存在的檔案更新資料變化.
4.Source元件- Kafka Source
- 配置一個Kafa Source,kafasource.conf 配置檔案內容如下:
kafkasourceagent.sources = r1
kafkasourceagent.channels = c1
kafkasourceagent.sinks = k1
kafkasourceagent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
kafkasourceagent.sources.r1.channels = c1
kafkasourceagent.sources.r1.batchSize = 100
kafkasourceagent.sources.r1.batchDurationMillis = 1000
kafkasourceagent.sources.r1.kafka.bootstrap.servers = 192.168.43.22:9092,192.168.43.23:9092,192.168.43.24:9092
kafkasourceagent.sources.r1.kafka.topics = flumetopictest1
kafkasourceagent.sources.r1.kafka.consumer.group.id = flumekafkagroupid
kafkasourceagent.channels.c1.type = memory
kafkasourceagent.channels.c1.capacity = 10000
kafkasourceagent.channels.c1.transactionCapacity = 1000
kafkasourceagent.sinks.k1.type = logger
kafkasourceagent.sinks.k1.channel = c1
複製程式碼
首先啟動3個節點的kafka節點服務,在每個kafka節點執行,以後臺方式執行
[root@hadoop03 bin]# ./kafka-server-start.sh -daemon ../config/server.properties
複製程式碼
在kafka節點上建立一個配置好的Topic flumetoptest1,命令如下:
[root@hadoop03 bin]# ./kafka-topics.sh --create --zookeeper 192.168.43.20:2181 --replication-factor 1 --partitions 3 --topic flumetopictest1
Created topic "flumetopictest1".
複製程式碼
建立成功後,啟動一個kafka Source Agent,命令如下:
[root@hadoop01 conf]# flume-ng agent --conf conf --conf-file kafkasource.conf --name kafkasourceagent -Dflume.root.logger=INFO,console
複製程式碼
建立一個Kafka 生產者,進行訊息傳送
root@hadoop03 bin]# ./kafka-console-producer.sh --broker-list 192.168.43.22:9092,192.168.43.23:9092 --topic flumetopictest1
複製程式碼
傳送訊息,此時kafka 就可以接收到訊息:
18/02/03 20:36:57 INFO sink.LoggerSink: Event: { headers:{topic=flumetopictest1,partition=2,timestamp=1517661413068} body: 31 32 33 31 33 32 32 31 12313221 }
18/02/03 20:37:09 INFO sink.LoggerSink: Event: { headers:{topic=flumetopictest1,partition=1,timestamp=1517661428930} body: 77 69 20 61 69 79 6F 75 08 08 08 wi aiyou... }
複製程式碼
5.Source 元件 -Taildir source
監聽一個資料夾或者檔案,通過正則表示式匹配需要監聽的 資料來源檔案,Taildir Source通過將監聽的檔案位置寫入到檔案中來實現斷點續傳,並且能夠保證沒有重複資料的讀取.
- 重要引數
type:source型別TAILDIR
positionFile:儲存監聽檔案讀取位置的檔案路徑
idleTimeout:關閉空閒檔案延遲時間,如果有新的記錄新增到已關閉的空閒檔案
taildir srouce將繼續開啟該空閒檔案,預設值120000毫秒
writePosInterval:向儲存讀取位置檔案中寫入讀取檔案位置的時間間隔,預設值 3000毫秒
batchSize:批量寫入channel最大event數,預設值100
maxBackoffSleep:每次最後一次嘗試沒有獲取到監聽檔案最新資料的最大延遲時 間,預設值5000毫秒
cachePatternMatching:對於監聽的資料夾下通過正則表示式匹配的檔案可能數量 會很多,將匹配成功的監聽檔案列表和讀取檔案列表的順序都新增到快取中,可以提高效能,預設值true
fileHeader :是否新增檔案的絕對路徑到event的header中,預設值false
fileHeaderKey:新增到event header中檔案絕對路徑的鍵值,預設值file
filegroups:監聽的檔案組列表,taildirsource通過檔案組監聽多個目錄或檔案
filegroups.:檔案正則表示式路徑或者監聽指定檔案路徑
channels:Source對接的Channel名稱
- 配置一個taildir Source,具體taildirsource.conf 配置檔案內容如下:
taildiragent.sources=r1
taildiragent.channels=c1
taildiragent.sinks=k1
taildiragent.sources.r1.type=TAILDIR
taildiragent.sources.r1.positionFile=/home/hadoop/apps/flume/taildir/position/taildir_position.json
taildiragent.sources.r1.filegroups=f1 f2
taildiragent.sources.r1.filegroups.f1=/home/hadoop/apps/flume/taildir/test1/test.log
taildiragent.sources.r1.filegroups.f2=/home/hadoop/apps/flume/taildir/test2/.*log.*
taildiragent.sources.r1.channels=c1
taildiragent.channels.c1.type=memory
taildiragent.channels.c1.transcationCapacity=1000
taildiragent.sinks.k1.type=logger
taildiragent.sinks.k1.channel=c1
複製程式碼
啟動一個taildirSource agent,程式碼如下:
[root@hadoop01 conf]# flume-ng agent --conf conf --conf-file taildirsource.conf --name taildiragent -Dflume.root.logger=INFO,console
複製程式碼
開始在test1和test2資料夾寫入檔案,觀察agent訊息接收。
6.Channel元件
- Channel:Channel被設計為event中轉暫存區,儲存Source 收集並且沒有被Sink消費的event ,為了平衡Source收集 和Sink讀取資料的速度,可視為Flume內部的訊息佇列。
- Channel是執行緒安全的並且具有事務性,支援source寫失 敗重複寫和sink讀失敗重複讀等操作
- 常用的Channel型別有:Memory Channel、File Channel、 Kafka Channel、JDBC Channel等
7.Channel元件- Memory Channel
- Memory Channel:使用記憶體作為Channel,Memory Channel讀寫速度 快,但是儲存資料量小,Flume程式掛掉、伺服器停機或者重啟都會 導致資料丟失。部署Flume Agent的線上伺服器記憶體資源充足、不關 心資料丟失的場景下可以使用 關鍵引數:
type :channel型別memory
capacity :channel中儲存的最大event數,預設值100
transactionCapacity :一次事務中寫入和讀取的event最大數,預設值100。
keep-alive:在Channel中寫入或讀取event等待完成的超時時間,預設值3秒
byteCapacityBufferPercentage:緩衝空間佔Channel容量(byteCapacity)的百分比,為event中的頭資訊保留了空間,預設值20(單位百分比)
byteCapacity :Channel佔用記憶體的最大容量,預設值為Flume堆記憶體的80%
複製程式碼
8. Channel元件- File Channel
- File Channel:將event寫入到磁碟檔案中,與Memory Channel相比存 儲容量大,無資料丟失風險。
- File Channle資料儲存路徑可以配置多磁碟檔案路徑,提高寫入檔案效能
- Flume將Event順序寫入到File Channel檔案的末尾,在配置檔案中通 過設定maxFileSize引數設定資料檔案大小上限
- 當一個已關閉的只讀資料檔案中的Event被完全讀取完成,並且Sink已經提交讀取完成的事務,則Flume將刪除儲存該資料檔案
- 通過設定檢查點和備份檢查點在Agent重啟之後能夠快速將File Channle中的資料按順序回放到記憶體中 關鍵引數如下:
type:channel型別為file
checkpointDir:檢查點目錄,預設在啟動flume使用者目錄下建立,建 議單獨配置磁碟路徑
useDualCheckpoints:是否開啟備份檢查點,預設false,建議設定為true開啟備份檢查點,備份檢查點的作用是當Agent意外出錯導致寫 入檢查點檔案異常,在重新啟動File Channel時通過備份檢查點將資料回放到記憶體中,如果不開啟備份檢查點,在資料回放的過程中發現檢查點檔案異常會對所資料進行全回放,全回放的過程相當耗時
backupCheckpointDir:備份檢查點目錄,最好不要和檢查點目錄在同 一塊磁碟上
checkpointInterval:每次寫檢查點的時間間隔,預設值30000毫秒
dataDirs:資料檔案磁碟儲存路徑,建議配置多塊盤的多個路徑,通過磁碟的並行寫入來提高file channel效能,多個磁碟路徑用逗號隔開
transactionCapacity:一次事務中寫入和讀取的event最大數,預設值 10000
maxFileSize:每個資料檔案的最大大小,預設值:2146435071位元組
minimumRequiredSpace:磁碟路徑最小剩餘空間,如果磁碟剩餘空 間小於設定值,則不再寫入資料
capacity:file channel可容納的最大event數
keep-alive:在Channel中寫入或讀取event等待完成的超時時間,預設值3秒
複製程式碼
配置一個FileChannel,filechannel.conf 的配置內容如下:
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.channels = c1
a1.channels.c1.type = file
a1.channels.c1.dataDirs = /home/hadoop/apps/flume/filechannel/data
a1.channels.c1.checkpointDir = /home/hadoop/apps/flume/filechannel/checkpoint
a1.channels.c1.useDualCheckpoints = true
a1.channels.c1.backupCheckpointDir = /home/hadoop/apps/flume/filechannel/backup
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
複製程式碼
啟動一個FileChannel,啟動命令如下:
[root@hadoop01 bin]# flume-ng agent --conf conf --conf-file filechannle.conf --name a1 -Dflume.root.logger=INFO,console
複製程式碼
向配置檔案埠44444傳送資料,觀察Channel記錄情況
telnet localhost asdfasd
複製程式碼
此時可以觀察到控制檯列印監控結果
18/02/04 21:15:44 INFO sink.LoggerSink: Event: { headers:{} body: 61 64 66 61 64 66 61 64 66 61 73 66 0D adfadfadfasf. }
18/02/04 21:15:48 INFO file.EventQueueBackingStoreFile: Start checkpoint for /home/hadoop/apps/flume/filechannel/checkpoint/checkpoint,elements to sync = 1
18/02/04 21:15:48 INFO file.EventQueueBackingStoreFile: Updating checkpoint metadata: logWriteOrderID: 1517749968978,queueSize: 0,queueHead: 0
18/02/04 21:15:48 INFO file.EventQueueBackingStoreFile: Attempting to back up checkpoint.
18/02/04 21:15:48 INFO file.Serialization: Skipping in_use.lock because it is in excludes set
18/02/04 21:15:48 INFO file.Serialization: Deleted the following files:,checkpoint,checkpoint.meta,inflightputs,inflighttakes.
18/02/04 21:15:48 INFO file.Log: Updated checkpoint for file: /home/hadoop/apps/flume/filechannel/data/log-2 position: 170 logWriteOrderID: 1517749968978
18/02/04 21:15:49 INFO file.EventQueueBackingStoreFile: Checkpoint backup completed.
複製程式碼
9.Channel元件- Kafka Channel
Kafka Channel:將分散式訊息佇列kafka作為channel相對於Memory Channel和File Channel儲存容量更大、 容錯能力更強,彌補了其他兩種Channel的短板,如果合理利用Kafka的效能,能夠達到事半功倍的效果。 關鍵引數如下:
type:Kafka Channel型別org.apache.flume.channel.kafka.KafkaChannel
kafka.bootstrap.servers:Kafka broker列表,格式為ip1:port1,ip2:port2…,建 議配置多個值提高容錯能力,多個值之間用逗號隔開
kafka.topic:topic名稱,預設值“flume-channel”
kafka.consumer.group.id:Consumer Group Id,全域性唯一
parseAsFlumeEvent:是否以Avro FlumeEvent模式寫入到Kafka Channel中, 預設值true,event的header資訊與event body都寫入到kafka中
pollTimeout:輪詢超時時間,預設值500毫秒
kafka.consumer.auto.offset.reset:earliest表示從最早的偏移量開始拉取,latest表示從最新的偏移量開始拉取,none表示如果沒有發現該Consumer組之前拉 取的偏移量則拋異常
複製程式碼
配置一個KafakChannel, kafkachannel.conf 配置內容如下:
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.channels = c1
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = 192.168.43.22:9092,192.168.43.23:9092
a1.channels.c1.kafka.topic = flumechannel2
a1.channels.c1.kafka.consumer.group.id = flumecgtest1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
複製程式碼
啟動kafak服務,建立一個kafka主題,命令如下:
[root@hadoop03 bin]# ./kafka-server-start.sh -daemon ../config/server.properties
[root@hadoop03 bin]# ./kafka-topics.sh --create --zookeeper 192.168.43.20:2181 --replication-factor 1 --partitions 3 --topic flumechannel2
複製程式碼
檢視建立的主題資訊
[root@hadoop03 bin]# ./kafka-topics.sh --list --zookeeper 192.168.43.20:2181
__consumer_offsets
flumechannel2
topicnewtest1
複製程式碼
啟動kafka agent,使用telnet傳送資料
[root@hadoop01 conf]# flume-ng agent --conf conf --conf-file kafkachannel.conf --name a1 -Dflume.root.logger=INFO,console
[root@hadoop01 flume]# clear
[root@hadoop01 flume]# telnet localhost 44444
Trying ::1...
telnet: connect to address ::1: Connection refused
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
abc
OK
複製程式碼
監聽資訊如下:
18/02/04 21:39:33 INFO sink.LoggerSink: Event: { headers:{} body: 61 62 63 0D abc. }
複製程式碼
10.Sink元件
- Sink:從Channel消費event,輸出到外部儲存,或者輸出到下一個階段的agent
- 一個Sink只能從一個Channel中消費event
- 當Sink寫出event成功後,就會向Channel提交事務。Sink 事務提交成功,處理完成的event將會被Channel刪除。否 則Channel會等待Sink重新消費處理失敗的event
- Flume提供了豐富的Sink元件,如Avro Sink、HDFS Sink、Kafka Sink、File Roll Sink、HTTP Sink等
11.Sink元件- Avro Sink
- Avro Sink常用於對接下一層的Avro Source,通過傳送RPC請求將Event傳送到下一層的Avro Source
- 為了減少Event傳輸佔用大量的網路資源, Avro Sink提供了端到端的批量壓縮資料傳輸
關鍵引數說明
type:Sink型別為avro。
hostname:繫結的目標Avro Souce主機名稱或者IP
port:繫結的目標Avro Souce埠號
batch-size:批量傳送Event數,預設值100
compression-type:是否使用壓縮,如果使用壓縮設則值為
“deflate”, Avro Sink設定了壓縮那麼Avro Source也應設定相同的 壓縮格式,目前支援zlib壓縮,預設值none
compression-level:壓縮級別,0表示不壓縮,從1到9數字越大壓縮
效果越好,預設值6
複製程式碼
12.Sink元件- HDFS Sink
- HDFS Sink將Event寫入到HDFS中持久化儲存
- HDFS Sink提供了強大的時間戳轉義功能,根據Event頭資訊中的
- timestamp時間戳資訊轉義成日期格式,在HDFS中以日期目錄分層儲存
關鍵引數資訊說明如下:
type:Sink型別為hdfs。
hdfs.path:HDFS儲存路徑,支援按日期時間分割槽。
hdfs.filePrefix:Event輸出到HDFS的檔名字首,預設字首FlumeData
hdfs.fileSuffix:Event輸出到HDFS的檔名字尾
hdfs.inUsePrefix:臨時檔名字首
hdfs.inUseSuffix:臨時檔名字尾,預設值.tmp
hdfs.rollInterval:HDFS檔案滾動生成時間間隔,預設值30秒,該值設定 為0表示檔案不根據時間滾動生成
複製程式碼
配置一個hdfsink.conf檔案,配置內容如下:
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i1.preserveExisting = false
a1.sources.r1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /data/flume/%Y%m%d
a1.sinks.k1.hdfs.filePrefix = hdfssink
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 1
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.callTimeout = 60000
複製程式碼
啟動一個hdfssink agent,命令如下:
[root@hadoop01 conf]# flume-ng agent --conf conf --conf-file hdfssink.conf --name a1 -Dflume.root.logger=INFO,console
複製程式碼
使用telnet 向44444傳送資料,觀察資料寫入結果
[hadoop@hadoop01 root]$ telnet localhost 44444
Trying ::1...
telnet: connect to address ::1: Connection refused
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
abc
OK
2323444
OK
複製程式碼
此時控制檯列印,在HDFS檔案系統生成一個臨時檔案
8/02/04 22:41:52 INFO hdfs.HDFSDataStream: Serializer = TEXT,UseRawLocalFileSystem = false
18/02/04 22:41:52 INFO hdfs.BucketWriter: Creating /data/flume/20180204/hdfssink.1517755312242.tmp
18/02/04 22:42:24 INFO hdfs.BucketWriter: Closing /data/flume/20180204/hdfssink.1517755312242.tmp
18/02/04 22:42:24 INFO hdfs.BucketWriter: Renaming /data/flume/20180204/hdfssink.1517755312242.tmp to /data/flume/20180204/hdfssink.1517755312242
18/02/04 22:42:24 INFO hdfs.HDFSEventSink: Writer callback called.
複製程式碼
值得注意的是:請使用hadoop使用者來執行agent的建立和訊息的傳送,避免因許可權導致HDFS檔案無法寫入
13.Sink元件- Kafka Sink
Flume通過KafkaSink將Event寫入到Kafka指定的主題中 主要引數說明如下:
type:Sink型別,值為KafkaSink類路徑 org.apache.flume.sink.kafka.KafkaSink。
kafka.bootstrap.servers:Broker列表,定義格式host:port,多個Broker之間用逗號隔開,可以配置一個也可以配置多個,用於Producer發現叢集中的Broker,建議配置多個,防止當個Broker出現問題連線 失敗。
kafka.topic:Kafka中Topic主題名稱,預設值flume-topic。
flumeBatchSize:Producer端單次批量傳送的訊息條數,該值應該根據實際環境適當調整,增大批量傳送訊息的條數能夠在一定程度上提高效能,但是同時也增加了延遲和Producer端資料丟失的風險。 預設值100。
kafka.producer.acks:設定Producer端傳送訊息到Borker是否等待接收Broker返回成功送達訊號。0表示Producer傳送訊息到Broker之後不需要等待Broker返回成功送達的訊號,這種方式吞吐量高,但是存 在資料丟失的風險。1表示Broker接收到訊息成功寫入本地log檔案後向Producer返回成功接收的訊號,不需要等待所有的Follower全部同步完訊息後再做迴應,這種方式在資料丟失風險和吞吐量之間做了平衡。all(或者-1)表示Broker接收到Producer的訊息成功寫入本 地log並且等待所有的Follower成功寫入本地log後向Producer返回成功接收的訊號,這種方式能夠保證訊息不丟失,但是效能最差。默 認值1。
useFlumeEventFormat:預設值false,Kafka Sink只會將Event body內 容傳送到Kafka Topic中。如果設定為true,Producer傳送到KafkaTopic中的Event將能夠保留Producer端頭資訊
複製程式碼
配置一個kafkasink.conf,具體配置內容如下:
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.channel = c1
a1.sinks.k1.kafka.topic = FlumeKafkaSinkTopic1
a1.sinks.k1.kafka.bootstrap.servers = 192.168.43.22:9092,192.168.43.23:9092
a1.sinks.k1.kafka.flumeBatchSize = 100
a1.sinks.k1.kafka.producer.acks = 1
複製程式碼
啟動kafka Broker節點22和Broker節點23
[root@hadoop03 bin]# ./kafka-server-start.sh -daemon ../config/server.properties
複製程式碼
按配置檔案建立主題資訊
[root@hadoop03 bin]# ./kafka-topics.sh --create --zookeeper 192.168.43.20:2181 --replication-factor 1 --partitions 3 --topic FlumeKafkaSinkTopic1
Created topic "FlumeKafkaSinkTopic1".
複製程式碼
啟動一個kafkasink agent,啟動命令如下:
[root@hadoop01 conf]# flume-ng agent --conf conf --conf-file kafkasink.conf --name a1 >/dev/null 2>&1 &
複製程式碼
14.Interceptor攔截器
- Source將event寫入到Channel之前呼叫攔截器
- Source和Channel之間可以有多個攔截器,不同的攔截器使用不同的 規則處理Event
- 可選、輕量級、可插拔的外掛
- 通過實現Interceptor介面實現自定義的攔截器
- 內建攔截器:Timestamp Interceptor、Host Interceptor、UUID Interceptor、Static Interceptor、Regex Filtering Interceptor等
15.Timestamp Interceptor
- Flume使用時間戳攔截器在event頭資訊中新增時間戳資訊, Key為timestamp,Value為攔截器攔截Event時的時間戳
- 頭資訊時間戳的作用,比如HDFS儲存的資料採用時間分割槽儲存,Sink可以根據Event頭資訊中的時間戳將Event按照時間分割槽寫入到 HDFS
- 關鍵引數說明:
- type:攔截器型別為timestamp
- preserveExisting:如果頭資訊中存在timestamp時間戳資訊是否保留原來的時間戳資訊,true保留,false使用新的時間戳替換已經存在的時間戳,預設值為false
16.Host Interceptor
- Flume使用主機戳攔截器在Event頭資訊中新增主機名稱或者IP
- 主機攔截器的作用:比如Source將Event按照主機名稱寫入到不同的Channel中便於後續的Sink對不同Channnel中的資料分開處理
- 關鍵引數說明:
- type:攔截器型別為host
- preserveExisting:如果頭資訊中存在timestamp時間戳資訊是否保留原來的時間戳資訊,true保留,false使用新的時間戳替換已經存在的時間戳,預設值為false
- useIP:是否使用IP作為主機資訊寫入都資訊,預設值為false
- hostHeader:設定頭資訊中主機資訊的Key,預設值為host
17.Host InterceptorStatic Interceptor
- Flume使用static interceptor靜態攔截器在evetn頭資訊新增靜態資訊
- 關鍵引數說明:
- type:攔截器型別為static
- preserveExisting:如果頭資訊中存在timestamp時間戳資訊是否保留原來的時間戳資訊,true保留,false使用新的時間戳替換已經 存在的時間戳,預設值為false
- key:頭資訊中的鍵
- value:頭資訊中鍵對應的值
18.Selector選擇器
- Source將event寫入到Channel之前呼叫攔截器,如果配置了Interceptor攔截器,則Selector在攔截器全部處理完之後呼叫。通過 selector決定event寫入Channel的方式
- 內建Replicating Channel Selector複製Channel選擇器、 Multiplexing Channel Selector複用Channel選擇器
19.Replicating Channel Selector
- 如果Channel選擇器沒有指定,預設是Replicating Channel Selector。即一個Source以複製的方式將一個event同時寫入到多個Channel中,不同的Sink可以從不同的Channel中獲取相同的event。
- 關鍵引數說明:
- selector.type:Channel選擇器型別為replicating
- selector.optional:定義可選Channel,當寫入event到可選Channel失敗時,不會向Source丟擲異常,繼續執行。多個可選Channel之 間用空格隔開
一個source將一個event拷貝到多個channel,通過不同的sink消費不同的channel,將相同的event輸出到不同的地方 配置檔案:replicating_selector.conf
a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2
#定義source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#設定複製選擇器
a1.sources.r1.selector.type = replicating
#設定required channel
a1.sources.r1.channels = c1 c2
#設定channel c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000
#設定channel c2
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 1000
#設定kafka sink
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = FlumeSelectorTopic1
a1.sinks.k1.kafka.bootstrap.servers = 192.168.43.22:9092,192.168.23.103:9092
a1.sinks.k1.kafka.flumeBatchSize = 5
a1.sinks.k1.kafka.producer.acks = 1
#設定file sink
a1.sinks.k2.channel = c2
a1.sinks.k2.type = file_roll
a1.sinks.k2.sink.directory = /home/hadoop/apps/flume/selector
a1.sinks.k2.sink.rollInterval = 60
複製程式碼
分別寫入到kafka和檔案中
建立主題FlumeKafkaSinkTopic1
bin/kafka-topics.sh --create --zookeeper 192.168.183.100:2181 --replication-factor 1 --partitions 3 --topic FlumeSelectorTopic1
複製程式碼
啟動flume agent
bin/flume-ng agent --conf conf --conf-file conf/replicating_selector.conf --name a1
複製程式碼
使用telnet傳送資料
telnet localhost 44444
複製程式碼
檢視/home/hadoop/apps/flume/selector路徑下的資料
檢視kafka FlumeSelectorTopic1主題資料
bin/kafka-console-consumer.sh --zookeeper 192.168.183.100:2181 --from-beginning --topic FlumeSelectorTopic1
複製程式碼
20.Multiplexing Channel Selector
-Multiplexing Channel Selector多路複用選擇器根據event的頭資訊中不 同鍵值資料來判斷Event應該被寫入到哪個Channel中
- 三種級別的Channel,分別是必選channle、可選channel、預設channel
- 關鍵引數說明:
selector.type:Channel選擇器型別為multiplexing
selector.header:設定頭資訊中用於檢測的headerName
selector.default:預設寫入的Channel列表
selector.mapping.*:headerName對應的不同值對映的不同Channel列表
selector.optional:可選寫入的Channel列表
複製程式碼
配置檔案multiplexing_selector.conf、avro_sink1.conf、avro_sink2.conf、avro_sink3.conf
向不同的avro_sink對應的配置檔案的agent傳送資料,不同的avro_sink配置檔案通過static interceptor在event頭資訊中寫入不同的靜態資料
multiplexing_selector根據event頭資訊中不同的靜態資料型別分別傳送到不同的目的地
multiplexing_selector.conf
a3.sources = r1
a3.channels = c1 c2 c3
a3.sinks = k1 k2 k3
a3.sources.r1.type = avro
a3.sources.r1.bind = 192.168.183.100
a3.sources.r1.port = 8888
a3.sources.r1.threads= 3
#設定multiplexing selector
a3.sources.r1.selector.type = multiplexing
a3.sources.r1.selector.header = logtype
#通過header中logtype鍵對應的值來選擇不同的sink
a3.sources.r1.selector.mapping.ad = c1
a3.sources.r1.selector.mapping.search = c2
a3.sources.r1.selector.default = c3
a3.sources.r1.channels = c1 c2 c3
a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 1000
a3.channels.c2.type = memory
a3.channels.c2.capacity = 10000
a3.channels.c2.transactionCapacity = 1000
a3.channels.c3.type = memory
a3.channels.c3.capacity = 10000
a3.channels.c3.transactionCapacity = 1000
#分別設定三個sink的不同輸出
a3.sinks.k1.type = file_roll
a3.sinks.k1.channel = c1
a3.sinks.k1.sink.directory = /home/hadoop/apps/flume/multiplexing/k11
a3.sinks.k1.sink.rollInterval = 60
a3.sinks.k2.channel = c2
a3.sinks.k2.type = file_roll
a3.sinks.k2.sink.directory = /home/hadoop/apps/flume/multiplexing/k12
a3.sinks.k2.sink.rollInterval = 60
a3.sinks.k3.channel = c3
a3.sinks.k3.type = file_roll
a3.sinks.k3.sink.directory = /home/hadoop/apps/flume/multiplexing/k13
a3.sinks.k3.sink.rollInterval = 60
複製程式碼
avro_sink1.conf
agent1.sources = r1
agent1.channels = c1
agent1.sinks = k1
agent1.sources.r1.type = netcat
agent1.sources.r1.bind = localhost
agent1.sources.r1.port = 44444
agent1.sources.r1.interceptors = i1
agent1.sources.r1.interceptors.i1.type = static
agent1.sources.r1.interceptors.i1.key = logtype
agent1.sources.r1.interceptors.i1.value = ad
agent1.sources.r1.interceptors.i1.preserveExisting = false
agent1.sources.r1.channels = c1
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 10000
agent1.channels.c1.transactionCapacity = 1000
agent1.sinks.k1.type = avro
agent1.sinks.k1.channel = c1
agent1.sinks.k1.hostname = 192.168.183.100
agent1.sinks.k1.port = 8888
複製程式碼
avro_sink2.conf
agent2.sources = r1
agent2.channels = c1
agent2.sinks = k1
agent2.sources.r1.type = netcat
agent2.sources.r1.bind = localhost
agent2.sources.r1.port = 44445
agent2.sources.r1.interceptors = i1
agent2.sources.r1.interceptors.i1.type = static
agent2.sources.r1.interceptors.i1.key = logtype
agent2.sources.r1.interceptors.i1.value = search
agent2.sources.r1.interceptors.i1.preserveExisting = false
agent2.sources.r1.channels = c1
agent2.channels.c1.type = memory
agent2.channels.c1.capacity = 10000
agent2.channels.c1.transactionCapacity = 1000
agent2.sinks.k1.type = avro
agent2.sinks.k1.channel = c1
agent2.sinks.k1.hostname = 192.168.183.100
agent2.sinks.k1.port = 8888
複製程式碼
avro_sink3.conf
agent3.sources = r1
agent3.channels = c1
agent3.sinks = k1
agent3.sources.r1.type = netcat
agent3.sources.r1.bind = localhost
agent3.sources.r1.port = 44446
agent3.sources.r1.interceptors = i1
agent3.sources.r1.interceptors.i1.type = static
agent3.sources.r1.interceptors.i1.key = logtype
agent3.sources.r1.interceptors.i1.value = other
agent3.sources.r1.interceptors.i1.preserveExisting = false
agent3.sources.r1.channels = c1
agent3.channels.c1.type = memory
agent3.channels.c1.capacity = 10000
agent3.channels.c1.transactionCapacity = 1000
agent3.sinks.k1.type = avro
agent3.sinks.k1.channel = c1
agent3.sinks.k1.hostname = 192.168.183.100
agent3.sinks.k1.port = 8888
複製程式碼
在/home/hadoop/apps/flume/multiplexing目錄下分別建立看k1 k2 k3目錄
bin/flume-ng agent --conf conf --conf-file conf/multiplexing_selector.conf --name a3 -Dflume.root.logger=INFO,console
bin/flume-ng agent --conf conf --conf-file conf/avro_sink1.conf --name agent1 >/dev/null 2>&1 &
bin/flume-ng agent --conf conf --conf-file conf/avro_sink2.conf --name agent2 >/dev/null 2>&1 &
bin/flume-ng agent --conf conf --conf-file conf/avro_sink3.conf --name agent3 >/dev/null 2>&1 &
複製程式碼
使用telnet傳送資料 telnet localhost 44444
21.Sink Processor
- Sink Processor協調多個sink間進行load balance和fail over
- Default Sink Processor只有一個sink,無需建立Sink Processor
- Sink Group:將多個sink放到一個組內,要求組內一個sink消費channel
- Load-Balancing Sink Processor(負載均衡處理器)round_robin(預設)或 random
- Failover Sink Processor(容錯處理器)可定義一個sink優先順序列表,根據優先順序選擇使用的sink
22.Load-Balancing Sink Processor
關鍵引數說明:
sinks:sink組內的子Sink,多個子sink之間用空格隔開
processor.type:設定負載均衡型別load_balance
processor.backoff:設定為true時,如果在系統執行過程中執行的Sink失敗,會將失敗的Sink放進一個冷卻池中。預設值false
processor.selector.maxTimeOut:失敗sink在冷卻池中最大駐留時間,預設值30000ms
processor.selector:負載均衡選擇演演算法,可以使用輪詢“round_robin”、隨機“random”或者是繼承AbstractSinkSelector類的自定義負載均衡實現類
複製程式碼
23.Failover Sink Processor
關鍵引數說明:
sinks:sink組內的子Sink,多個子sink之間用空格隔開
processor.type:設定故障轉移型別“failover”
processor.priority.<sinkName>:指定Sink組內各子Sink的優先順序別,優先順序從高到低,數值越大優先順序越高
processor.maxpenalty:等待失敗的Sink恢復的最長時間,預設值30000毫秒
複製程式碼
24.Failover應用場景
- 分散式日誌收集場景
- 多個agent收集不同機器上相同型別的日誌資料,為了保障高可用,採用分層部署,日誌收集層Collector部署兩個甚至多個,Agent通過Failover SinkProcessor實現其中任何一個collector掛掉不影響系統的日誌收集服務