1. 程式人生 > >flume蒐集日誌:如何解決實時不斷追加的日誌檔案及不斷增加的檔案個數問題

flume蒐集日誌:如何解決實時不斷追加的日誌檔案及不斷增加的檔案個數問題

在蒐集日誌的過程中,日誌檔案的個數及日誌檔案需要不斷的追加。flume1.6中,可以使用tail -f可以解決不斷追加的檔案,但是由於日誌檔案的個數是變化的,不可能只產生一個檔案。所以tail -f就已經不能解決這個蒐集日誌的問題。

需求:

需要能夠監控不斷增加的檔案,並且單個檔案也是不斷追加的

解決辦法:

這時候flume1.7就產生了,很好的通過 TAILDIRl解決了這個問題。TAILDIRl可以監控一個目錄下的檔案。

官網地址:http://flume.apache.org/FlumeUserGuide.html

官網文件截圖:

上面加粗為常用屬性。

這裡我們只使用了下面兩個屬性

a1.sources.source1.filegroups.f1 = /data/aboutyunlog/.*log.*

a1.sources.source1.type = TAILDIR

一、Flume安裝

1. 壓縮安裝包

?

1

2

tar -zxvf ~ /jar/apache-flume-1 .7.0-bin. tar .gz -C /data

mv /data/apache-flume-1 .7.0-bin/ /data/flume-1 .7.0 # 重新命名

2. 配置環境變數

?

1

2

echo -e "export FLUME_HOME=/data/flume-1.7.0nexport PATH=$FLUME_HOME/bin:$PATH" >> ~/.bashrc

source ~/.bashrc

3. 配置flume

?

1

2

cp flume- env .sh.template flume- env .sh修改JAVA_HOME

export JAVA_HOME= /data/jdk1 .8.0_111

4. 驗證安裝

?

1

flume-ng version

二、Flume使用

一個agent由source、channel、sink組成。這兒我們使用Spooling Directory Source、File Channel、Kafka Sink。

1. 單節點的agent

1) 增加配置檔案

?

1

2

cd $FLUME_HOME /conf

vim single_agent.conf

將以下內容拷貝進去

?

01

02

03

04

05

06

07

08

09

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

# agent的名稱為a1

a1.sources = source1

a1.channels = channel1

a1.sinks = sink1

# set source

#a1.sources.source1.type = spooldir

a1.sources.source1. type = TAILDIR

a1.sources.source1.filegroups = f1

a1.sources.source1.filegroups.f1 = /data/aboutyunlog/ .*log.*

#a1.sources.source1.spoolDir=/data/aboutyunlog

a1sources.source1.fileHeader = flase

# set sink

a1.sinks.sink1. type = org.apache.flume.sink.kafka.KafkaSink

#a1.sinks.sink1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092

a1.sinks.sink1.brokerList= master:9092,slave1:9092,slave2:9092

a1.sinks.sink1.topic= aboutyunlog

a1.sinks.sink1.kafka.flumeBatchSize = 20

a1.sinks.sink1.kafka.producer.acks = 1

a1.sinks.sink1.kafka.producer.linger.ms = 1

a1.sinks.sink1.kafka.producer.compression. type = snappy

# set channel

a1.channels.channel1. type = file

a1.channels.channel1.checkpointDir = /data/flume_data/checkpoint

a1.channels.channel1.dataDirs= /data/flume_data/data

# bind

a1.sources.source1.channels = channel1

a1.sinks.sink1.channel = channel1

2. 建立所需檔案

?

1

2

3

mkdir -p /data/aboutyunlog

mkdir -p /data/flume_data/checkpoint

mkdir -p /data/flume_data/data

3. 檢視kafka現有的topic

?

1

kafka-topics.sh --zookeeper master:2181,slave1:2181,slave2:2181 --list

4. 在kafka上建立名為aboutyunlog的topic

?

1

kafka-topics.sh --zookeeper master:2181,slave1:2181,slave2:2181 --create --topic aboutyunlog --replication-factor 1 --partitions 3

5. 啟動flume

?

1

flume-ng agent --conf- file /data/flume-1 .6.0 /conf/single_agent .conf --name a1 -Dflume.root.logger=INFO,console

啟動過程中控制檯會輸出很多日誌。

6. 建立一個kafka的consumer

?

1

kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --topic aboutyunlog --from-beginning

這條命令的意思是說建立aboutyunlog這個topic下的消費者,消費時從最開始的一條資訊開始消費。

上圖說明該消費者建立成功,由於本地/data/aboutyunlog目錄下沒有新檔案加入,造成aboutyunlog這個topic沒有資訊輸入,所以消費者沒有得到一條資訊。

7. 新增檔案到flume source目錄

?

1

2

echo -e "this is a test file! n[url]http://www.aboutyun.com20170820[/url]"

mv log.1 /data/aboutyunlog/

為:echo -e "this is a test file! nhttp://www.aboutyun.com20170820">log.1

再次執行

?

1

echo -e "this is a test file! n[url]http://www.aboutyun.com20170820[/url]" >log.2

然後我們看到

master上

注意:需要通過xshell連結兩個master。也就是開啟兩個master介面

8. 再次檢視kafka consumer

切換到建立kafka consumer的shell介面,會看到我們log.1中檔案的內容被列印在螢幕上。

上圖說明我們已經成功使用flume監控/data/aboutyunlog目錄,並將監控目錄中的內容傳送到kafka的aboutyunlog主題中。

注意:如果使用flume1.6會找不到類。

?

01

02

03

04

05

06

07

08

09

10

11

12

13

14

15

16

17

18

19

20

21

22

23

17 /08/17 19:21:08 ERROR node.PollingPropertiesFileConfigurationProvider: Failed to load configuration data. Exception follows.

org.apache.flume.FlumeException: Unable to load source type : TAILDIR, class: TAILDIR

at org.apache.flume. source .DefaultSourceFactory.getClass(DefaultSourceFactory.java:69)

at org.apache.flume. source .DefaultSourceFactory.create(DefaultSourceFactory.java:42)

at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:322)

at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:97)

at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)

at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)

at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.ClassNotFoundException: TAILDIR

at java.net.URLClassLoader.findClass(URLClassLoader.java:381)

at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)

at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

at java.lang.Class.forName0(Native Method)

at java.lang.Class.forName(Class.java:264)

at org.apache.flume. source .DefaultSourceFactory.getClass(DefaultSourceFactory.java:67)