flume 正則表示式_Flume 知識點(二)Flume 高階元件
技術標籤:flume 正則表示式
作者:studytime
原文:studytime
Flume NG 高階元件
除了 Source、channel、Sink外,Flume Agent 還允許使用者設定其他元件更靈活地控制資料流,包括 Interceptor,Channel Selector 和 Sink Processor。
Interceptor
Flume 中的攔截器(Interceptor),當 Source 讀取 Event 傳送到 Sink 的 Event 時候,在 Event header 中加入一些有用的資訊,或者對 Event 的內容進行過濾,完成初步的資料清洗。
使用者可配置多個 Interceptor,形成一個 Interceptor 鏈。
a1.sources.r1.interceptors=i1 i2
a1.sources.r1.interceptors.i1.type=regex_filter
a1.sources.r1.interceptors.i1.regex={.*}
a1.sources.r1.interceptors.i2.type=timestamp
這在實際業務場景中非常有用,Flume-ng 1.7 中目前提供了以下攔截器:
- Timestamp Interceptor:該 Interceptor 在每個 Event 頭部插入時間戳,其中key是timestamp,value為當前時刻。
- Host Interceptor:該 Interceptor 在每個 Event 頭部插入當前 Agent 所在機器的host或ip,其中key是host(也可自定義)。
vi host_agent.properties a1.sinks = k1 a1.channels = c1 a1.sources.r1.channels = c1 a1.sources.r1.type = exec a1.sources.r1.channels = c1 a1.sources.r1.command = tail -F /tmp/baihe/hehe.txt a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = host 引數為true時用IP192.168.8.71,引數為false時用主機名bigdata,預設為true a1.sources.r1.interceptors.i1.useIP = false a1.sources.r1.interceptors.i1.hostHeader = agentHost a1.sinks.k1.type=hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = hdfs://bigdata:9000/user/baihe/flume/%y%m%d a1.sinks.k1.hdfs.filePrefix = baihe_%{agentHost} 往生成的檔案加字尾名.log a1.sinks.k1.hdfs.fileSuffix = .log a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.writeFormat = Text a1.sinks.k1.hdfs.rollInterval = 10 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
啟動命令:
bin/flume-ng agent -c conf/ -f conf/host_agent.properties -n a1 -Dflume.root.logger=INFO,console
- Static Interceptor:靜態攔截器,用於在events header中加入一組靜態的key和value。
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = static_key
a1.sources.r1.interceptors.i1.value = static_value
- UUID Interceptor:該 Interceptor 在每個 Event 頭部插入一個128位的全域性唯一標示,例如 b5755073-77a9-43c1-8fad-b7a586fc1b97。
// type的引數不能寫成uuid,得寫具體,否則找不到類
a1.sources.r1.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
// 如果UUID頭已經存在,它應該儲存
a1.sources.r1.interceptors.i1.preserveExisting = true
a1.sources.r1.interceptors.i1.prefix = UUID_
- Regex Filtering Interceptor:該 Interceptor 可根據正則表示式過濾或者保留符合要求的 Event。
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_filter
a1.sources.r1.interceptors.i1.regex = ^bai1234.*
// 該配置表示過濾掉不是以bai1234開頭的events。如果excludeEvents設為true,則表示過濾掉以bai1234開頭的events。
a1.sources.r1.interceptors.i1.excludeEvents = false
- Regex Extractor Interceptor:該 Interceptor 可根據正則表示式取出對應的值,並插入到頭部
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_extractor
a1.sources.r1.interceptors.i1.regex = cookieid is (.*?) and ip is (.*)
a1.sources.r1.interceptors.i1.serializers = s1 s2
a1.sources.r1.interceptors.i1.serializers.s1.name = cookieid
a1.sources.r1.interceptors.i1.serializers.s2.name = ip
該配置從原始events中抽取出 cookieid 和 ip,加入到 events header 中。
Channel Selector
Channel Selector 允許 Flume Source 選擇一個或多個目標 Channel,並將當前 Event 寫入這些 Channel。
Flume 提供了兩種 Channel Selector 實現: - Replicating Channel Selector:將每個 Event 指定多個 Channel,通過該 Selector,Flume 可將相同資料匯入到多套系統中,一遍進行不同地處理。這是Flume 預設採用的 Channel Selector。
demo:
a1.sources = r1
a1.channels = c1 c2 c3
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2 c3
// selector.optional是否可選,寫入失敗,則會被忽略。未設定的失敗後,則導致事件失敗
a1.sources.r1.selector.optional = c3
- Multiplexing Channel Selector:根據 Event 頭部的屬性值,將 Event寫入對應的 Channel
a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
// 指定匹配的header值
a1.sources.r1.selector.header = state
# state 值為CZ 寫入 c1 Channel
a1.sources.r1.selector.mapping.CZ = c1
// state US 寫入 c2 c3 Channel
a1.sources.r1.selector.mapping.US = c2 c3
// 預設寫入 c4
a1.sources.r1.selector.default = c4
Sink Processor
Flume 允許將多個 Sink 組裝在一起形成一個邏輯實體,成為 Sink Group。而 Sink Processor 則在 Sink Group 基礎上提供負載均衡以及容錯功能。當一個 Sink 掛掉了,可由另一個 Sink 接替。
demo:
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
Flume 提供了多種 Sink Processor 實現: - Default Sink Processor:預設的 Sink Processor,僅僅接受一個 Sink,實現了最簡單的 source - channel - sink,每個元件只有一個 - Failover Sink Processor:故障轉移接收器,Sink Group 中每個 Sink 均被賦予一個優先順序,Event 優先由高優先順序的 Sink 傳送,如果高優先順序的 Sink 掛了,則次高優先順序的 Sink 接替
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
// 代表優先順序,值越大優先順序越高
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
// 最大等待時長 (以毫秒為單位)
a1.sinkgroups.g1.processor.maxpenalty = 10000
- Load balancing Sink Processor:負載均衡接收處理器,Channel 中的 Event 通過某種負載均衡機制,交給 Sink Group 中的所有 Sink 傳送,
- 目前 Flume支援兩種負載均衡機制,分別是:round_robin(輪訓),random(隨機)。
demo:
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
// 元件型別名稱需要是load_balance
a1.sinkgroups.g1.processor.type = load_balance
// 失敗的接收器是否退回
a1.sinkgroups.g1.processor.backoff = true
// 選擇機制,必須是round_robin,random
a1.sinkgroups.g1.processor.selector = random