flume蒐集日誌:如何解決實時不斷追加的日誌檔案及不斷增加的檔案個數問題
在蒐集日誌的過程中,日誌檔案的個數及日誌檔案需要不斷的追加。flume1.6中,可以使用tail -f可以解決不斷追加的檔案,但是由於日誌檔案的個數是變化的,不可能只產生一個檔案。所以tail -f就已經不能解決這個蒐集日誌的問題。
需求:
需要能夠監控不斷增加的檔案,並且單個檔案也是不斷追加的
解決辦法:
這時候flume1.7就產生了,很好的通過 TAILDIRl解決了這個問題。TAILDIRl可以監控一個目錄下的檔案。
官網地址:http://flume.apache.org/FlumeUserGuide.html
官網文件截圖:
上面加粗為常用屬性。
這裡我們只使用了下面兩個屬性
a1.sources.source1.filegroups.f1 = /data/aboutyunlog/.*log.*
a1.sources.source1.type = TAILDIR
一、Flume安裝
1. 壓縮安裝包
?
1 2 |
tar -zxvf ~ /jar/apache-flume-1 .7.0-bin. tar .gz -C /data mv /data/apache-flume-1 .7.0-bin/ /data/flume-1 .7.0 # 重新命名 |
2. 配置環境變數
?
1 2 |
echo -e "export FLUME_HOME=/data/flume-1.7.0nexport PATH=$FLUME_HOME/bin:$PATH" >> ~/.bashrc source ~/.bashrc |
3. 配置flume
?
1 2 |
cp flume- env .sh.template flume- env .sh修改JAVA_HOME export JAVA_HOME= /data/jdk1 .8.0_111 |
4. 驗證安裝
?
1 |
flume-ng version |
二、Flume使用
一個agent由source、channel、sink組成。這兒我們使用Spooling Directory Source、File Channel、Kafka Sink。
1. 單節點的agent
1) 增加配置檔案
?
1 2 |
cd $FLUME_HOME /conf vim single_agent.conf |
將以下內容拷貝進去
?
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# agent的名稱為a1 a1.sources = source1 a1.channels = channel1 a1.sinks = sink1 # set source #a1.sources.source1.type = spooldir a1.sources.source1. type = TAILDIR a1.sources.source1.filegroups = f1 a1.sources.source1.filegroups.f1 = /data/aboutyunlog/ .*log.* #a1.sources.source1.spoolDir=/data/aboutyunlog a1sources.source1.fileHeader = flase # set sink a1.sinks.sink1. type = org.apache.flume.sink.kafka.KafkaSink #a1.sinks.sink1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092 a1.sinks.sink1.brokerList= master:9092,slave1:9092,slave2:9092 a1.sinks.sink1.topic= aboutyunlog a1.sinks.sink1.kafka.flumeBatchSize = 20 a1.sinks.sink1.kafka.producer.acks = 1 a1.sinks.sink1.kafka.producer.linger.ms = 1 a1.sinks.sink1.kafka.producer.compression. type = snappy # set channel a1.channels.channel1. type = file a1.channels.channel1.checkpointDir = /data/flume_data/checkpoint a1.channels.channel1.dataDirs= /data/flume_data/data # bind a1.sources.source1.channels = channel1 a1.sinks.sink1.channel = channel1 |
2. 建立所需檔案
?
1 2 3 |
mkdir -p /data/aboutyunlog mkdir -p /data/flume_data/checkpoint mkdir -p /data/flume_data/data |
3. 檢視kafka現有的topic
?
1 |
kafka-topics.sh --zookeeper master:2181,slave1:2181,slave2:2181 --list |
4. 在kafka上建立名為aboutyunlog的topic
?
1 |
kafka-topics.sh --zookeeper master:2181,slave1:2181,slave2:2181 --create --topic aboutyunlog --replication-factor 1 --partitions 3 |
5. 啟動flume
?
1 |
flume-ng agent --conf- file /data/flume-1 .6.0 /conf/single_agent .conf --name a1 -Dflume.root.logger=INFO,console |
啟動過程中控制檯會輸出很多日誌。
6. 建立一個kafka的consumer
?
1 |
kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --topic aboutyunlog --from-beginning |
這條命令的意思是說建立aboutyunlog這個topic下的消費者,消費時從最開始的一條資訊開始消費。
上圖說明該消費者建立成功,由於本地/data/aboutyunlog目錄下沒有新檔案加入,造成aboutyunlog這個topic沒有資訊輸入,所以消費者沒有得到一條資訊。
7. 新增檔案到flume source目錄
?
1 2 |
echo -e "this is a test file! n[url]http://www.aboutyun.com20170820[/url]" mv log.1 /data/aboutyunlog/ |
為:echo -e "this is a test file! nhttp://www.aboutyun.com20170820">log.1
再次執行
?
1 |
echo -e "this is a test file! n[url]http://www.aboutyun.com20170820[/url]" >log.2 |
然後我們看到
master上
注意:需要通過xshell連結兩個master。也就是開啟兩個master介面
8. 再次檢視kafka consumer
切換到建立kafka consumer的shell介面,會看到我們log.1中檔案的內容被列印在螢幕上。
上圖說明我們已經成功使用flume監控/data/aboutyunlog目錄,並將監控目錄中的內容傳送到kafka的aboutyunlog主題中。
注意:如果使用flume1.6會找不到類。
?
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
17 /08/17 19:21:08 ERROR node.PollingPropertiesFileConfigurationProvider: Failed to load configuration data. Exception follows. org.apache.flume.FlumeException: Unable to load source type : TAILDIR, class: TAILDIR at org.apache.flume. source .DefaultSourceFactory.getClass(DefaultSourceFactory.java:69) at org.apache.flume. source .DefaultSourceFactory.create(DefaultSourceFactory.java:42) at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:322) at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:97) at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: TAILDIR at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:264) at org.apache.flume. source .DefaultSourceFactory.getClass(DefaultSourceFactory.java:67) |