1. 程式人生 > 其它 >flume 正則表示式_Flume 知識點(二)Flume 高階元件

flume 正則表示式_Flume 知識點(二)Flume 高階元件

技術標籤:flume 正則表示式

作者:studytime
原文:studytime

Flume NG 高階元件

除了 Source、channel、Sink外,Flume Agent 還允許使用者設定其他元件更靈活地控制資料流,包括 Interceptor,Channel Selector 和 Sink Processor。

87b9deffe8978e37872bcd8732a1eb13.png

Interceptor

Flume 中的攔截器(Interceptor),當 Source 讀取 Event 傳送到 Sink 的 Event 時候,在 Event header 中加入一些有用的資訊,或者對 Event 的內容進行過濾,完成初步的資料清洗。

使用者可配置多個 Interceptor,形成一個 Interceptor 鏈。

a1.sources.r1.interceptors=i1 i2  
a1.sources.r1.interceptors.i1.type=regex_filter  
a1.sources.r1.interceptors.i1.regex={.*}  
a1.sources.r1.interceptors.i2.type=timestamp

這在實際業務場景中非常有用,Flume-ng 1.7 中目前提供了以下攔截器:

  • Timestamp Interceptor:該 Interceptor 在每個 Event 頭部插入時間戳,其中key是timestamp,value為當前時刻。
  • Host Interceptor:該 Interceptor 在每個 Event 頭部插入當前 Agent 所在機器的host或ip,其中key是host(也可自定義)。
vi host_agent.properties

a1.sinks = k1
a1.channels = c1
a1.sources.r1.channels = c1

a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /tmp/baihe/hehe.txt
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host

引數為true時用IP192.168.8.71,引數為false時用主機名bigdata,預設為true
a1.sources.r1.interceptors.i1.useIP = false
a1.sources.r1.interceptors.i1.hostHeader = agentHost

a1.sinks.k1.type=hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = hdfs://bigdata:9000/user/baihe/flume/%y%m%d

a1.sinks.k1.hdfs.filePrefix = baihe_%{agentHost}
往生成的檔案加字尾名.log
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 10

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

啟動命令:

bin/flume-ng agent -c conf/ -f conf/host_agent.properties -n a1 -Dflume.root.logger=INFO,console

  • Static Interceptor:靜態攔截器,用於在events header中加入一組靜態的key和value。
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = static_key
a1.sources.r1.interceptors.i1.value = static_value
  • UUID Interceptor:該 Interceptor 在每個 Event 頭部插入一個128位的全域性唯一標示,例如 b5755073-77a9-43c1-8fad-b7a586fc1b97。
// type的引數不能寫成uuid,得寫具體,否則找不到類
a1.sources.r1.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
// 如果UUID頭已經存在,它應該儲存
a1.sources.r1.interceptors.i1.preserveExisting = true
a1.sources.r1.interceptors.i1.prefix = UUID_
  • Regex Filtering Interceptor:該 Interceptor 可根據正則表示式過濾或者保留符合要求的 Event。
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_filter
a1.sources.r1.interceptors.i1.regex = ^bai1234.*
// 該配置表示過濾掉不是以bai1234開頭的events。如果excludeEvents設為true,則表示過濾掉以bai1234開頭的events。
a1.sources.r1.interceptors.i1.excludeEvents = false
  • Regex Extractor Interceptor:該 Interceptor 可根據正則表示式取出對應的值,並插入到頭部
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_extractor
a1.sources.r1.interceptors.i1.regex = cookieid is (.*?) and ip is (.*)
a1.sources.r1.interceptors.i1.serializers = s1 s2
a1.sources.r1.interceptors.i1.serializers.s1.name = cookieid
a1.sources.r1.interceptors.i1.serializers.s2.name = ip

該配置從原始events中抽取出 cookieid 和 ip,加入到 events header 中。

Channel Selector

Channel Selector 允許 Flume Source 選擇一個或多個目標 Channel,並將當前 Event 寫入這些 Channel。

Flume 提供了兩種 Channel Selector 實現: - Replicating Channel Selector:將每個 Event 指定多個 Channel,通過該 Selector,Flume 可將相同資料匯入到多套系統中,一遍進行不同地處理。這是Flume 預設採用的 Channel Selector。

demo:

a1.sources = r1
a1.channels = c1 c2 c3
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2 c3
// selector.optional是否可選,寫入失敗,則會被忽略。未設定的失敗後,則導致事件失敗
a1.sources.r1.selector.optional = c3
  • Multiplexing Channel Selector:根據 Event 頭部的屬性值,將 Event寫入對應的 Channel
a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
// 指定匹配的header值
a1.sources.r1.selector.header = state
# state 值為CZ 寫入 c1 Channel
a1.sources.r1.selector.mapping.CZ = c1
// state US 寫入  c2 c3 Channel
a1.sources.r1.selector.mapping.US = c2 c3
// 預設寫入 c4
a1.sources.r1.selector.default = c4

Sink Processor

Flume 允許將多個 Sink 組裝在一起形成一個邏輯實體,成為 Sink Group。而 Sink Processor 則在 Sink Group 基礎上提供負載均衡以及容錯功能。當一個 Sink 掛掉了,可由另一個 Sink 接替。

demo:

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance

Flume 提供了多種 Sink Processor 實現: - Default Sink Processor:預設的 Sink Processor,僅僅接受一個 Sink,實現了最簡單的 source - channel - sink,每個元件只有一個 - Failover Sink Processor:故障轉移接收器,Sink Group 中每個 Sink 均被賦予一個優先順序,Event 優先由高優先順序的 Sink 傳送,如果高優先順序的 Sink 掛了,則次高優先順序的 Sink 接替

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
// 代表優先順序,值越大優先順序越高
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
// 最大等待時長 (以毫秒為單位)
a1.sinkgroups.g1.processor.maxpenalty = 10000
  • Load balancing Sink Processor:負載均衡接收處理器,Channel 中的 Event 通過某種負載均衡機制,交給 Sink Group 中的所有 Sink 傳送,
  • 目前 Flume支援兩種負載均衡機制,分別是:round_robin(輪訓),random(隨機)。

demo:

a1.sinkgroups  =  g1 
a1.sinkgroups.g1.sinks  =  k1 k2 
// 元件型別名稱需要是load_balance
a1.sinkgroups.g1.processor.type  =  load_balance 
// 失敗的接收器是否退回
a1.sinkgroups.g1.processor.backoff  =  true
// 選擇機制,必須是round_robin,random
a1.sinkgroups.g1.processor.selector  =  random