Flume攔截器---實現按照時間生成資料目錄
Flume中的攔截器(interceptor),當Source讀取events傳送到Sink的時候,在events header中加入一些有用的資訊,或者對events的內容進行過濾,完成初步的資料清洗。這在實際業務場景中非常有用,Flume-ng 1.7中目前提供了以下攔截器:
Timestamp Interceptor;
Host Interceptor;
Static Interceptor;
UUID Interceptor;
Morphline Interceptor;
Search and Replace Interceptor;
Regex Filtering Interceptor;
Regex Extractor Interceptor;
可以對一個source指定多個攔截器,按先後順序依次處理。如:
a1.sources.r1.interceptors=i1 i2
a1.sources.r1.interceptors.i1.type=regex_filter
a1.sources.r1.interceptors.i1.regex=\\{.*\\}
a1.sources.r1.interceptors.i2.type=timestamp
Timestamp Interceptor
時間戳攔截器,將當前時間戳(毫秒)加入到events header中,key名字為:timestamp,值為當前時間戳。用的不是很多。比如在使用HDFS Sink時候,根據events的時間戳生成結果檔案,
hdfs.path = hdfs://cdh5/tmp/dap/%Y%m%d
hdfs.filePrefix = log_%Y%m%d_%H
會根據時間戳將資料寫入相應的檔案中。
但可以用其他方式代替(設定useLocalTimeStamp = true)。
[[email protected] conf]$ vi timestamp.conf a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = exec a1.sources.r1.channels = c1 a1.sources.r1.command = tail -F /home/hadoop/hui/hehe.txt a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = timestamp a1.sinks.k1.type=hdfs a1.sinks.k1.channel=c1 a1.sinks.k1.hdfs.path=hdfs://h71:9000/hui/%y-%m-%d/%H a1.sinks.k1.hdfs.filePrefix = log_%Y%m%d_%H #配置fileType和writeFormat為下面的引數才能保證匯入hdfs中的資料為文字格式 a1.sinks.k1.hdfs.fileType=DataStream a1.sinks.k1.hdfs.writeFormat=Text #當有資料儲存時會每10秒滾動將檔名的字尾.tmp去掉,預設值為30秒 a1.sinks.k1.hdfs.rollInterval=10 #上面是按時間滾動,下面這個是按檔案大小進行滾動 #a1.sinks.k1.hdfs.rollSize=1024 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
啟動flume程序:
[[email protected] apache-flume-1.6.0-cdh5.5.2-bin]$ bin/flume-ng agent -c conf/ -f conf/timestamp.conf -n a1 -Dflume.root.logger=INFO,console
產生資料:
[[email protected] hui]$ echo "hello world" >> hehe.txt
檢視結果:
[[email protected] hui]$ hadoop fs -lsr /hui
drwxr-xr-x - hadoop supergroup 0 2017-03-18 02:41 /hui/17-03-18
drwxr-xr-x - hadoop supergroup 0 2017-03-18 02:41 /hui/17-03-18/02
-rw-r--r-- 2 hadoop supergroup 12 2017-03-18 02:41 /hui/17-03-18/02/log_20170318_02.1489776083025.tmp
10秒中之後:(檔案中的字尾1489776083025為時間戳)
[[email protected] hui]$ hadoop fs -lsr /hui
drwxr-xr-x - hadoop supergroup 0 2017-03-18 02:41 /hui/17-03-18
drwxr-xr-x - hadoop supergroup 0 2017-03-18 02:41 /hui/17-03-18/02
-rw-r--r-- 2 hadoop supergroup 12 2017-03-18 02:41 /hui/17-03-18/02/log_20170318_02.1489776083025
Host Interceptor
主機名攔截器。將執行Flume agent的主機名或者IP地址加入到events header中,key名字為:host(也可自定義)。
[[email protected] conf]$ vi host.conf
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /home/hadoop/hui/hehe.txt
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host
#引數為true時用IP192.168.8.71,引數為false時用主機名h71,預設為true
a1.sources.r1.interceptors.i1.useIP = false
a1.sources.r1.interceptors.i1.hostHeader = agentHost
a1.sinks.k1.type=hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = hdfs://h71:9000/hui/%y%m%d
a1.sinks.k1.hdfs.filePrefix = qiang_%{agentHost}
#往生成的檔案加字尾名.log
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 10
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動flume程序:
[[email protected] apache-flume-1.6.0-cdh5.5.2-bin]$ bin/flume-ng agent -c conf/ -f conf/host.conf -n a1 -Dflume.root.logger=INFO,console
報錯:Caused by: java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
解決:在host.conf檔案中加這麼一行a1.sinks.k1.hdfs.useLocalTimeStamp = true
產生資料:
[[email protected] hui]$ echo "hello world" >> hehe.txt
檢視結果:
[[email protected] apache-flume-1.6.0-cdh5.5.2-bin]$ hadoop fs -lsr /hui
drwxr-xr-x - hadoop supergroup 0 2017-03-18 03:36 /hui/170318
-rw-r--r-- 2 hadoop supergroup 2 2017-03-18 03:36 /hui/170318/qiang_h71.1489779401946.log
說明:Timestamp Interceptor和Host Interceptor這兩個實驗有毒啊。。。我一開始做的時候還正常,在重做一次的時候啟動flume程序SINK, name: k1 started後就莫名其妙的卡在哪裡不動了,也不報錯,死活不好使,我也是醉了。。。
Static Interceptor
靜態攔截器,用於在events header中加入一組靜態的key和value。
[[email protected] conf]$ vi static.conf
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /home/hadoop/hui/hehe.txt
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = static_key
a1.sources.r1.interceptors.i1.value = static_value
a1.sinks.k1.type=hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = hdfs://h71:9000/hui/
a1.sinks.k1.hdfs.filePrefix = qiang_%{static_key}
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
檢視結果:
[[email protected] apache-flume-1.6.0-cdh5.5.2-bin]$ hadoop fs -lsr /hui
drwxr-xr-x - hadoop supergroup 0 2017-03-18 03:36 /hui/
-rw-r--r-- 2 hadoop supergroup 2 2017-03-18 03:36 /hui/qiang_static_value.1489779401946
UUID Interceptor
UUID攔截器,用於在每個events header中生成一個UUID字串,例如:b5755073-77a9-43c1-8fad-b7a586fc1b97。生成的UUID可以在sink中讀取並使用。
[[email protected] conf]$ vi uuid.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /home/hadoop/hui/hehe.txt
a1.sources.r1.interceptors = i1
#type的引數不能寫成uuid,得寫具體,否則找不到類
a1.sources.r1.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
#如果UUID頭已經存在,它應該儲存
a1.sources.r1.interceptors.i1.preserveExisting = true
a1.sources.r1.interceptors.i1.prefix = UUID_
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
執行flume程序後可看到:
Event: { headers:{id=UUID_1cb50ac7-fef0-4385-99da-45530cb50271} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 hello world }
Morphline Interceptor
Morphline攔截器,該攔截器使用Morphline對每個events資料做相應的轉換。關於Morphline的使用,可參考
http://kitesdk.org/docs/current/morphlines/morphlines-reference-guide.html
後續再研究這塊。
Search and Replace Interceptor
該攔截器用於將events中的正則匹配到的內容做相應的替換。
[[email protected] conf]$ vi search.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /home/hadoop/hui/hehe.txt
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = search_replace
a1.sources.r1.interceptors.i1.searchPattern = [0-9]+
a1.sources.r1.interceptors.i1.replaceString = xiaoqiang
a1.sources.r1.interceptors.i1.charset = UTF-8
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動flume程序:
[[email protected] apache-flume-1.6.0-cdh5.5.2-bin]$ bin/flume-ng agent -c conf/ -f conf/search.conf -n a1 -Dflume.root.logger=INFO,console
產生資料:
[[email protected] hui]$ echo "message 1" >> hehe.txt
[[email protected] hui]$ echo "message 23" >> hehe.txt
在控制檯可看到:
Event: { headers:{} body: 6D 65 73 73 61 67 65 20 78 69 61 6F 71 69 61 6E message xiaoqian }
Event: { headers:{} body: 6D 65 73 73 61 67 65 20 78 69 61 6F 71 69 61 6E message xiaoqian }
Regex Filtering Interceptor
該攔截器使用正則表示式過濾原始events中的內容。
[[email protected] conf]$ vi filter.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /home/hadoop/hui/hehe.txt
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_filter
a1.sources.r1.interceptors.i1.regex = ^lxw1234.*
#該配置表示過濾掉不是以lxw1234開頭的events。如果excludeEvents設為true,則表示過濾掉以lxw1234開頭的events。
a1.sources.r1.interceptors.i1.excludeEvents = false
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
原始events內容為:
[[email protected] hui]$ echo "message 1" >> hehe.txt
[[email protected] hui]$ echo "lxw1234 message 3" >> hehe.txt
[[email protected] hui]$ echo "message 2" >> hehe.txt
[[email protected] hui]$ echo "lxw1234 message 4" >> hehe.txt
攔截後的events內容為:
Event: { headers:{} body: 6C 78 77 31 32 33 34 20 6D 65 73 73 61 67 65 20 lxw1234 message }
Event: { headers:{} body: 6C 78 77 31 32 33 34 20 6D 65 73 73 61 67 65 20 lxw1234 message }
Regex Extractor Interceptor
該攔截器使用正則表示式抽取原始events中的內容,並將該內容加入events header中。
[[email protected] conf]$ vi extractor.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /home/hadoop/hui/hehe.txt
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_extractor
a1.sources.r1.interceptors.i1.regex = cookieid is (.*?) and ip is (.*)
a1.sources.r1.interceptors.i1.serializers = s1 s2
a1.sources.r1.interceptors.i1.serializers.s1.name = cookieid
a1.sources.r1.interceptors.i1.serializers.s2.name = ip
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
注意:1.把原部落格中的a1.sources.r1.interceptors.i1.serializers.s1.type = default這兩個刪除掉,否則會報錯:
Caused by: java.lang.ClassNotFoundException: default
2.正則表示式cookieid is (.?) and ip is (.?)改為cookieid is (.?) and ip is (.),否則無法匹配IP,events header中IP為空
該配置從原始events中抽取出cookieid和ip,加入到events header中。
原始的events內容為:
[[email protected] hui]$ echo "cookieid is c_1 and ip is 127.0.0.1" >> hehe.txt
[[email protected] hui]$ echo "cookieid is c_2 and ip is 127.0.0.2" >> hehe.txt
[[email protected] hui]$ echo "cookieid is c_3 and ip is 127.0.0.3" >> hehe.txt
events header中的內容為:
Event: { headers:{cookieid=c_1, ip=127.0.0.1} body: 63 6F 6F 6B 69 65 69 64 20 69 73 20 63 5F 31 20 cookieid is c_1 }
Event: { headers:{cookieid=c_2, ip=127.0.0.2} body: 63 6F 6F 6B 69 65 69 64 20 69 73 20 63 5F 32 20 cookieid is c_2 }
Event: { headers:{cookieid=c_3, ip=127.0.0.3} body: 63 6F 6F 6B 69 65 69 64 20 69 73 20 63 5F 33 20 cookieid is c_3 }
Flume的攔截器可以配合Sink完成許多業務場景需要的功能,
比如:按照時間及主機生成目標檔案目錄及檔名;
配合Kafka Sink完成多分割槽的寫入等等。
原文:https://blog.csdn.net/m0_37739193/article/details/77584909