1. 程式人生 > >flume實現監控埠和檔案教程

flume實現監控埠和檔案教程

要實現flume的監控首先要了解flume的執行原理

在這裡插入圖片描述

如上圖它有一個管道channel前面的source就是要監控的檔案或埠,而sink 則是監控到的資料傳輸的位置。
而且flume的傳輸單位是一Event為單位,以事件形式將資料從源頭傳送到目標位置。

下面來說一下具體實現方法

解壓然後修改配置檔案

修改flume-env.sh裡的Java路徑
export JAVA_HOME = /home/hadoop/install/jdk1.8.0_152
安裝 監控埠工具
yum install telnet
建立監控的配置檔案flume-telnet.conf新增

# Name the components on this agent 
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source 
a1.sources.r1.type = netcat 
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink 
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1

判斷埠是否被佔用
netstat -tunlp | grep 44444
在flume安裝路徑下監控輸出埠
bin/flume-ng a1--conf conf/ --name a1 --conf-file conf/flume-telnet.conf -Dflume.root.logger==INFO,console
再開一個終端控制檯執行
telnet localhost 44444
在此控制檯輸入一些字元,點回車提交輸入,看監控的控制檯是否有相同字元列印。
如果輸入內容錯誤,可以按住Ctrl鍵同時點刪除。

監控檔案

建立配置檔案在flume-file.conf新增

a1.sources = s1                                                                                                                  
a1.channels = c1                                                                                                                 
a1.sinks = k1                                                                                                                    
                                                                                                                                      
a1.sources.s1.type=exec                                                                                                          
a1.sources.s1.command=tail -F /tmp/logs/kafka.log                                                                                
a1.sources.s1.channels=c1                                                                                                        
a1.channels.c1.type=memory                                                                                                       
a1.channels.c1.capacity=10000                                                                                                    
a1.channels.c1.transactionCapacity=100  
# Describe the sink 
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.s1.channels = c1 
a1.sinks.k1.channel = c1

在flume安裝路徑下監控輸出埠
bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/flume-file.conf -Dflume.root.logger==INFO,console
再開一個終端多次輸入類似下面的語句

echo "aaa" >> /tmp/logs/kafka.log

看監控輸出埠的控制檯變化。

注意:

監控埠格式是netcat,監控檔案是exec,監控資料夾的格式是spooldir
flume的source 與sink對接格式為avro

若有不足,請君指出。共同進步