1. 程式人生 > >Flume 配置方案

Flume 配置方案

11:Test File Roll Sink

#檔名:case11_fileroll.conf

#配置內容:

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1



# Describe/configure the source

a1.sources.r1.type = syslogtcp

a1.sources.r1.port = 5140

a1.sources.r1.host = localhost

a1.sources.r1.channels = c1



# Describe the sink

a1.sinks.k1.type = file_roll

a1.sinks.k1.channel = c1

a1.sinks.k1.sink.directory = /var/log/flume



# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100



#啟動file roll 配置檔案

flume-ng agent -c . -f case11_fileroll.conf -n a1 -Dflume.root.logger=INFO,console



#生成測試log

echo "<37>hello via file roll" | nc localhost 5140

echo "<37>hello via file roll 2" | nc localhost 5140



#檢視/var/log/flume下是否生成檔案,預設每30秒生成一個新檔案

-rw-r--r-- 1 root root 20 Jun 2 19:44 1370227443397-1

-rw-r--r-- 1 root root 0 Jun 2 19:44 1370227443397-2

-rw-r--r-- 1 root root 22 Jun 2 19:45 1370227443397-3



cat 1370227443397-1 1370227443397-3

hello via file roll

hello via file roll 2

12:Test Replicating Channel Selector

#檔名:case12_replicate_sink.conf、case12_replicate_s1.conf、case12_replicate_s2.conf

#配置內容:

case12_replicate_sink.conf

#2個channel和2個sink的配置檔案

# Name the components on this agent

a1.sources = r1

a1.sinks = k1 k2

a1.channels = c1 c2



# Describe/configure the source

a1.sources.r1.type = syslogtcp

a1.sources.r1.port = 5140

a1.sources.r1.host = localhost

a1.sources.r1.selector.type = replicating

a1.sources.r1.channels = c1 c2



# Describe the sink

a1.sinks.k1.type = avro

a1.sinks.k1.channel = c1

a1.sinks.k1.hostname = 172.25.4.23

a1.sinks.k1.port = 4545



a1.sinks.k2.type = avro

a1.sinks.k2.channel = c2

a1.sinks.k2.hostname = 172.25.4.33

a1.sinks.k2.port = 4545



# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100



a1.channels.c2.type = memory

a1.channels.c2.capacity = 1000

a1.channels.c2.transactionCapacity = 100



case12_replicate_s1.conf

# Name the components on this agent

a2.sources = r1

a2.sinks = k1

a2.channels = c1



# Describe/configure the source

a2.sources.r1.type = avro

a2.sources.r1.channels = c1

a2.sources.r1.bind = 172.25.4.23

a2.sources.r1.port = 4545



# Describe the sink

a2.sinks.k1.type = logger

 a2.sinks.k1.channel = c1



# Use a channel which buffers events in memory

a2.channels.c1.type = memory

a2.channels.c1.capacity = 1000

a2.channels.c1.transactionCapacity = 100



case12_replicate_s2.conf

# Name the components on this agent

a3.sources = r1

a3.sinks = k1

a3.channels = c1



# Describe/configure the source

a3.sources.r1.type = avro

a3.sources.r1.channels = c1

a3.sources.r1.bind = 172.25.4.33

a3.sources.r1.port = 4545



# Describe the sink

a3.sinks.k1.type = logger

 a3.sinks.k1.channel = c1



# Use a channel which buffers events in memory

a3.channels.c1.type = memory

a3.channels.c1.capacity = 1000

a3.channels.c1.transactionCapacity = 100



#先啟動Avro的Source,監聽埠

flume-ng agent -c . -f case12_replicate_s1.conf -n a2 -Dflume.root.logger=INFO,console

flume-ng agent -c . -f case12_replicate_s2.conf -n a3 -Dflume.root.logger=INFO,console



#再啟動Avro的Sink

flume-ng agent -c . -f case12_replicate_sink.conf -n a1-Dflume.root.logger=INFO,console



#檢視是否都建立了連線

2018-06-04 00:01:53,467 (pool-5-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x122a0fad, /172.25.4.32:55518 => /172.25.4.23:4545] BOUND: /172.25.4.23:4545

2018-06-04 00:01:53,467 (pool-5-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x122a0fad, /172.25.4.32:55518 => /172.25.4.23:4545] CONNECTED: /172.25.4.32:55518



2018-06-04 00:01:53,773 (pool-5-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x021881a7, /172.25.4.32:23731 => /172.25.4.33:4545] BOUND: /172.25.4.33:4545

2018-06-04 00:01:53,773 (pool-5-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x021881a7, /172.25.4.32:23731 => /172.25.4.33:4545] CONNECTED: /172.25.4.32:23731



#生成測試log

echo "<37>hello via channel selector" | nc localhost 5140



#檢視2個sink是否得到資料

2018-06-04 00:02:06,479 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 68 65 6C 6C 6F 20 76 69 61 20 63 68 61 6E 6E 65 hello via channe }



2018-06-04 00:02:09,788 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 68 65 6C 6C 6F 20 76 69 61 20 63 68 61 6E 6E 65 hello via channe }

13:Test Multiplexing Channel Selector

#檔名:case13_multi_sink.conf、case13_ multi _s1.conf、case13_ multi _s2.conf

#配置內容:

#2個channel和2個sink的配置檔案

a1.sources = r1

a1.sinks = k1 k2

a1.channels = c1 c2



# Describe/configure the source

a1.sources.r1.type = org.apache.flume.source.http.HTTPSource

a1.sources.r1.port = 5140

a1.sources.r1.host = 0.0.0.0

a1.sources.r1.selector.type = multiplexing

a1.sources.r1.channels = c1 c2



a1.sources.r1.selector.header = state

a1.sources.r1.selector.mapping.CZ = c1

a1.sources.r1.selector.mapping.US = c2

a1.sources.r1.selector.default = c1



# Describe the sink

a1.sinks.k1.type = avro

a1.sinks.k1.channel = c1

a1.sinks.k1.hostname = 172.25.4.23

a1.sinks.k1.port = 4545



a1.sinks.k2.type = avro

a1.sinks.k2.channel = c2

a1.sinks.k2.hostname = 172.25.4.33

a1.sinks.k2.port = 4545

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100



a1.channels.c2.type = memory

a1.channels.c2.capacity = 1000

a1.channels.c2.transactionCapacity = 100



case13_ multi _s1.conf

# Name the components on this agent

a2.sources = r1

a2.sinks = k1

a2.channels = c1



# Describe/configure the source

a2.sources.r1.type = avro

a2.sources.r1.channels = c1

a2.sources.r1.bind = 172.25.4.23

a2.sources.r1.port = 4545



# Describe the sink

a2.sinks.k1.type = logger

 a2.sinks.k1.channel = c1



# Use a channel which buffers events in memory

a2.channels.c1.type = memory

a2.channels.c1.capacity = 1000

a2.channels.c1.transactionCapacity = 100



case13_ multi _s2.conf

# Name the components on this agent

a3.sources = r1

a3.sinks = k1

a3.channels = c1



# Describe/configure the source

a3.sources.r1.type = avro

a3.sources.r1.channels = c1

a3.sources.r1.bind = 172.25.4.33

a3.sources.r1.port = 4545



# Describe the sink

a3.sinks.k1.type = logger

 a3.sinks.k1.channel = c1



# Use a channel which buffers events in memory

a3.channels.c1.type = memory

a3.channels.c1.capacity = 1000

a3.channels.c1.transactionCapacity = 100



#先啟動Avro的Source,監聽埠

flume-ng agent -c . -f case13_ multi _s1.conf -n a2 -Dflume.root.logger=INFO,console

flume-ng agent -c . -f case13_ multi _s2.conf -n a3 -Dflume.root.logger=INFO,console



#再啟動Avro的Sink

flume-ng agent -c . -f case13_multi_sink.conf -n a1-Dflume.root.logger=INFO,console



#根據配置檔案生成測試的header 為state的POST請求

curl -X POST -d '[{ "headers" :{"state" : "CZ"},"body" : "TEST1"}]' http://localhost:5140

curl -X POST -d '[{ "headers" :{"state" : "US"},"body" : "TEST2"}]' http://localhost:5140

curl -X POST -d '[{ "headers" :{"state" : "SH"},"body" : "TEST3"}]' http://localhost:5140



#檢視2個sink得到資料是否和配置檔案一致

Sink1:

2018-06-04 23:45:35,296 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{state=CZ} body: 54 45 53 54 31 TEST1 }

2018-06-04 23:45:50,309 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{state=SH} body: 54 45 53 54 33 TEST3 }



Sink2:

2018-06-04 23:45:42,293 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{state=US} body: 54 45 53 54 32 TEST2 }

14:Test Failover Sink Processor

#檔名:case14_failover_sink.conf、case14_ failover _s1.conf、case14_ failover _s2.conf

#配置內容:

case14_failover_sink.conf

# Name the components on this agent

a1.sources = r1

a1.sinks = k1 k2

a1.channels = c1 c2



a1.sinkgroups = g1

a1.sinkgroups.g1.sinks = k1 k2

a1.sinkgroups.g1.processor.type = failover

a1.sinkgroups.g1.processor.priority.k1 = 5

a1.sinkgroups.g1.processor.priority.k2 = 10

a1.sinkgroups.g1.processor.maxpenalty = 10000



# Describe/configure the source

a1.sources.r1.type = syslogtcp

a1.sources.r1.port = 5140

a1.sources.r1.host = localhost

a1.sources.r1.selector.type = replicating

a1.sources.r1.channels = c1 c2



# Describe the sink

a1.sinks.k1.type = avro

a1.sinks.k1.channel = c1

a1.sinks.k1.hostname = 172.25.4.23

a1.sinks.k1.port = 4545



a1.sinks.k2.type = avro

a1.sinks.k2.channel = c2

a1.sinks.k2.hostname = 172.25.4.33

a1.sinks.k2.port = 4545

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100



a1.channels.c2.type = memory

a1.channels.c2.capacity = 1000

a1.channels.c2.transactionCapacity = 100



case14_ failover _s1.conf 

# Name the components on this agent

a2.sources = r1

a2.sinks = k1

a2.channels = c1



# Describe/configure the source

a2.sources.r1.type = avro

a2.sources.r1.channels = c1

a2.sources.r1.bind = 172.25.4.23

a2.sources.r1.port = 4545



# Describe the sink

a2.sinks.k1.type = logger

 a2.sinks.k1.channel = c1



# Use a channel which buffers events in memory

a2.channels.c1.type = memory

a2.channels.c1.capacity = 1000

a2.channels.c1.transactionCapacity = 100



case14_ failover _s2.conf 

# Name the components on this agent

a3.sources = r1

a3.sinks = k1

a3.channels = c1



# Describe/configure the source

a3.sources.r1.type = avro

a3.sources.r1.channels = c1

a3.sources.r1.bind = 172.25.4.33

a3.sources.r1.port = 4545



# Describe the sink

a3.sinks.k1.type = logger

 a3.sinks.k1.channel = c1



# Use a channel which buffers events in memory

a3.channels.c1.type = memory

a3.channels.c1.capacity = 1000

a3.channels.c1.transactionCapacity = 100



#先啟動Avro的Source,監聽埠

flume-ng agent -c . -f case14_ failover _s1.conf -n a2 -Dflume.root.logger=INFO,console

flume-ng agent -c . -f case14_ failover _s2.conf -n a3 -Dflume.root.logger=INFO,console



#再啟動Avro的Sink

flume-ng agent -c . -f case14_ failover _sink.conf -n a1-Dflume.root.logger=INFO,console



#生成測試log

echo "<37>test1 failover" | nc localhost 5140



#在sink2上產生log,sink1由於優先順序小,沒有產生

2018-06-05 00:10:51,194 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 31 20 66 61 69 6C 6F 76 65 72 test1 failover }



#主動關閉sink2,再次生成測試log

echo "<37>test2 failover" | nc localhost 5140



#在sink1上會同時生成test1和test2

2018-06-05 00:11:14,312 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 31 20 66 61 69 6C 6F 76 65 72 test1 failover }

2018-06-05 00:11:14,312 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 32 20 66 61 69 6C 6F 76 65 72 test2 failover }



#再次開啟sink2,log會根據優先順序再到sink2上

echo "<37>test4 failover" | nc localhost 5140

echo "<37>test5 failover" | nc localhost 5140



2018-06-05 00:12:33,071 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 34 20 66 61 69 6C 6F 76 65 72 test4 failover }

2018-06-05 00:12:55,088 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 35 20 66 61 69 6C 6F 76 65 72 test5 failover }

15:Test Load balancing Sink Processor

#檔名:case15_load_sink.conf、case15_ load _s1.conf、case15_ load _s2.conf

#配置內容:

注:load balance type下必須指定同一個channel到不同的sinks,否則不生效

case15_load_sink.conf

# Name the components on this agent

a1.sources = r1

a1.sinks = k1 k2

a1.channels = c1



a1.sinkgroups = g1

a1.sinkgroups.g1.sinks = k1 k2

a1.sinkgroups.g1.processor.type = load_balance

a1.sinkgroups.g1.processor.backoff = true

a1.sinkgroups.g1.processor.selector = round_robin



# Describe/configure the source

a1.sources.r1.type = syslogtcp

a1.sources.r1.port = 5140

a1.sources.r1.host = localhost

a1.sources.r1.channels = c1



# Describe the sink

a1.sinks.k1.type = avro

a1.sinks.k1.channel = c1

a1.sinks.k1.hostname = 172.25.4.23

a1.sinks.k1.port = 4545



a1.sinks.k2.type = avro

a1.sinks.k2.channel = c1

a1.sinks.k2.hostname = 172.25.4.33

a1.sinks.k2.port = 4545



# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100



case15_ load _s1.conf

# Name the components on this agent

a2.sources = r1

a2.sinks = k1

a2.channels = c1



# Describe/configure the source

a2.sources.r1.type = avro

a2.sources.r1.channels = c1

a2.sources.r1.bind = 172.25.4.23

a2.sources.r1.port = 4545



# Describe the sink

a2.sinks.k1.type = logger

 a2.sinks.k1.channel = c1



# Use a channel which buffers events in memory

a2.channels.c1.type = memory

a2.channels.c1.capacity = 1000

a2.channels.c1.transactionCapacity = 100



case15_ load _s2.conf 

# Name the components on this agent

a3.sources = r1

a3.sinks = k1

a3.channels = c1



# Describe/configure the source

a3.sources.r1.type = avro

a3.sources.r1.channels = c1

a3.sources.r1.bind = 172.25.4.33

a3.sources.r1.port = 4545



# Describe the sink

a3.sinks.k1.type = logger

 a3.sinks.k1.channel = c1



# Use a channel which buffers events in memory

a3.channels.c1.type = memory

a3.channels.c1.capacity = 1000

a3.channels.c1.transactionCapacity = 100



#先啟動Avro的Source,監聽埠

flume-ng agent -c . -f case15_ load _s1.conf -n a2 -Dflume.root.logger=INFO,console

flume-ng agent -c . -f case15_ load _s2.conf -n a3 -Dflume.root.logger=INFO,console



#再啟動Avro的Sink

flume-ng agent -c . -f case15_ load _sink.conf -n a1-Dflume.root.logger=INFO,console



#生成4個測試log

echo "<37>test2 loadbalance" | nc localhost 5140

echo "<37>test3 loadbalance" | nc localhost 5140

echo "<37>test4 loadbalance" | nc localhost 5140

echo "<37>test5 loadbalance" | nc localhost 5140



#檢視sink輸出結果是否為輪詢模式

Sink1:

2018-06-06 01:36:03,516 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 32 20 6C 6F 61 64 62 61 6C 61 6E 63 test2 loadbalanc }

2018-06-06 01:36:09,769 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 34 20 6C 6F 61 64 62 61 6C 61 6E 63 test4 loadbalanc }



Sink2:

2018-06-06 01:36:05,809 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 33 20 6C 6F 61 64 62 61 6C 61 6E 63 test3 loadbalanc }

2018-06-06 01:36:37,057 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 35 20 6C 6F 61 64 62 61 6C 61 6E 63 test5 loadbalanc }

16:Test Body Event Serializers

#檔名:case16_body.conf

#配置內容:

a1.sources.r1.type = org.apache.flume.source.http.HTTPSource

a1.sources.r1.port = 5140

a1.sources.r1.host = localhost

a1.sources.r1.channels = c1



# Describe the sink

a1.sinks.k1.type = file_roll

a1.sinks.k1.channel = c1

a1.sinks.k1.sink.directory = /var/log/flume

a1.sinks.k1.sink.serializer = text

a1.sinks.k1.sink.serializer.appendNewline = false



# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100



#啟動

flume-ng agent -c . -f case16_body.conf -n a1-Dflume.root.logger=INFO,console



#生成測試log

curl -X POST -d '[{ "headers" :{"host":"cc-staging-loginmgr2"},"body" : "TEST1 BODY TEXT"}]' http://localhost:5140

curl -X POST -d '[{ "headers" :{"host":"cc-staging-loginmgr2"},"body" : "TEST2 BODY TEXT"}]' http://localhost:5140

curl -X POST -d '[{ "headers" :{"host":"cc-staging-loginmgr2"},"body" : "TEST3 BODY TEXT"}]' http://localhost:5140



#檢視file roll 檔案中的文字內容

cat /var/log/flume/1370675739270-1

TEST1 BODY TEXT

TEST2 BODY TEXT

TEST3 BODY TEXT



#Avro Event Serializer

Alias: avro_event. This interceptor serializes Flume events into an Avro container file

把flume event變成avro 中包含的檔案

17:Test Timestamp Interceptor

#檔名:case17_timestamp_hostname.conf

#配置內容:

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1



# Describe/configure the source

a1.sources.r1.type = syslogtcp

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 5140

a1.sources.r1.channels = c1



a1.sources.r1.interceptors = i1 i2

a1.sources.r1.interceptors.i1.preserveExisting = false

a1.sources.r1.interceptors.i1.type = timestamp

a1.sources.r1.interceptors.i2.type = host

a1.sources.r1.interceptors.i2.hostHeader = hostname

a1.sources.r1.interceptors.i2.useIP = false



# Describe the sink

a1.sinks.k1.type = hdfs

a1.sinks.k1.channel = c1

a1.sinks.k1.hdfs.path = hdfs://master:9000/user/Hadoop/flume/collected/%Y-%m-%d/%H%M

a1.sinks.k1.hdfs.filePrefix = %{hostname}.



# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100



#啟動agent

flume-ng agent -c . -f case17_timestamp_hostname.conf -n a1 -Dflume.root.logger=INFO,console



#生成測試log

echo "<37>test dynamic interceptor" | nc localhost 5140



#檢視hdfs生成的檔案,可以看到timestamp和hostname都已經生成在header裡面,可以根據自定義的格式生成資料夾

./hadoop dfs -ls hdfs://172.25.4.35:9000/user/hadoop/flume/collected/2018-06-16/2331/

Found 1 items

-rw-r--r-- 3 root supergroup 140 2018-06-16 23:32 /user/hadoop/flume/collected/2018-06-16/2331/cc-staging-loginmgr2..1371450697118

18:Test static Interceptor

#檔名:case18_static.conf

#配置內容:

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1



# Describe/configure the source

a1.sources.r1.type = syslogtcp

a1.sources.r1.port = 5140

a1.sources.r1.host = localhost

a1.sources.r1.channels = c1

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = static

a1.sources.r1.interceptors.i1.key = datacenter

a1.sources.r1.interceptors.i1.value = NEW_YORK



# Describe the sink

a1.sinks.k1.type = logger



# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100



# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1



#啟動agent

flume-ng agent -c . -f case18_static.conf -n a1 -Dflume.root.logger=INFO,console



#生成測試log

echo "<37>test1 static interceptor" | nc localhost 5140



#檢視console輸出結果

2018-06-17 00:15:38,453 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4, datacenter=NEW_YORK} body: 74 65 73 74 31 20 73 74 61 74 69 63 20 69 6E 74 test1 static int }