Flume 配置方案
阿新 • • 發佈:2018-11-13
11:Test File Roll Sink
#檔名:case11_fileroll.conf #配置內容: # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.host = localhost a1.sources.r1.channels = c1 # Describe the sink a1.sinks.k1.type = file_roll a1.sinks.k1.channel = c1 a1.sinks.k1.sink.directory = /var/log/flume # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #啟動file roll 配置檔案 flume-ng agent -c . -f case11_fileroll.conf -n a1 -Dflume.root.logger=INFO,console #生成測試log echo "<37>hello via file roll" | nc localhost 5140 echo "<37>hello via file roll 2" | nc localhost 5140 #檢視/var/log/flume下是否生成檔案,預設每30秒生成一個新檔案 -rw-r--r-- 1 root root 20 Jun 2 19:44 1370227443397-1 -rw-r--r-- 1 root root 0 Jun 2 19:44 1370227443397-2 -rw-r--r-- 1 root root 22 Jun 2 19:45 1370227443397-3 cat 1370227443397-1 1370227443397-3 hello via file roll hello via file roll 2
12:Test Replicating Channel Selector
#檔名:case12_replicate_sink.conf、case12_replicate_s1.conf、case12_replicate_s2.conf #配置內容: case12_replicate_sink.conf #2個channel和2個sink的配置檔案 # Name the components on this agent a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # Describe/configure the source a1.sources.r1.type = syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.host = localhost a1.sources.r1.selector.type = replicating a1.sources.r1.channels = c1 c2 # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = 172.25.4.23 a1.sinks.k1.port = 4545 a1.sinks.k2.type = avro a1.sinks.k2.channel = c2 a1.sinks.k2.hostname = 172.25.4.33 a1.sinks.k2.port = 4545 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 case12_replicate_s1.conf # Name the components on this agent a2.sources = r1 a2.sinks = k1 a2.channels = c1 # Describe/configure the source a2.sources.r1.type = avro a2.sources.r1.channels = c1 a2.sources.r1.bind = 172.25.4.23 a2.sources.r1.port = 4545 # Describe the sink a2.sinks.k1.type = logger a2.sinks.k1.channel = c1 # Use a channel which buffers events in memory a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 case12_replicate_s2.conf # Name the components on this agent a3.sources = r1 a3.sinks = k1 a3.channels = c1 # Describe/configure the source a3.sources.r1.type = avro a3.sources.r1.channels = c1 a3.sources.r1.bind = 172.25.4.33 a3.sources.r1.port = 4545 # Describe the sink a3.sinks.k1.type = logger a3.sinks.k1.channel = c1 # Use a channel which buffers events in memory a3.channels.c1.type = memory a3.channels.c1.capacity = 1000 a3.channels.c1.transactionCapacity = 100 #先啟動Avro的Source,監聽埠 flume-ng agent -c . -f case12_replicate_s1.conf -n a2 -Dflume.root.logger=INFO,console flume-ng agent -c . -f case12_replicate_s2.conf -n a3 -Dflume.root.logger=INFO,console #再啟動Avro的Sink flume-ng agent -c . -f case12_replicate_sink.conf -n a1-Dflume.root.logger=INFO,console #檢視是否都建立了連線 2018-06-04 00:01:53,467 (pool-5-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x122a0fad, /172.25.4.32:55518 => /172.25.4.23:4545] BOUND: /172.25.4.23:4545 2018-06-04 00:01:53,467 (pool-5-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x122a0fad, /172.25.4.32:55518 => /172.25.4.23:4545] CONNECTED: /172.25.4.32:55518 2018-06-04 00:01:53,773 (pool-5-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x021881a7, /172.25.4.32:23731 => /172.25.4.33:4545] BOUND: /172.25.4.33:4545 2018-06-04 00:01:53,773 (pool-5-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x021881a7, /172.25.4.32:23731 => /172.25.4.33:4545] CONNECTED: /172.25.4.32:23731 #生成測試log echo "<37>hello via channel selector" | nc localhost 5140 #檢視2個sink是否得到資料 2018-06-04 00:02:06,479 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 68 65 6C 6C 6F 20 76 69 61 20 63 68 61 6E 6E 65 hello via channe } 2018-06-04 00:02:09,788 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 68 65 6C 6C 6F 20 76 69 61 20 63 68 61 6E 6E 65 hello via channe }
13:Test Multiplexing Channel Selector
#檔名:case13_multi_sink.conf、case13_ multi _s1.conf、case13_ multi _s2.conf #配置內容: #2個channel和2個sink的配置檔案 a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # Describe/configure the source a1.sources.r1.type = org.apache.flume.source.http.HTTPSource a1.sources.r1.port = 5140 a1.sources.r1.host = 0.0.0.0 a1.sources.r1.selector.type = multiplexing a1.sources.r1.channels = c1 c2 a1.sources.r1.selector.header = state a1.sources.r1.selector.mapping.CZ = c1 a1.sources.r1.selector.mapping.US = c2 a1.sources.r1.selector.default = c1 # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = 172.25.4.23 a1.sinks.k1.port = 4545 a1.sinks.k2.type = avro a1.sinks.k2.channel = c2 a1.sinks.k2.hostname = 172.25.4.33 a1.sinks.k2.port = 4545 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 case13_ multi _s1.conf # Name the components on this agent a2.sources = r1 a2.sinks = k1 a2.channels = c1 # Describe/configure the source a2.sources.r1.type = avro a2.sources.r1.channels = c1 a2.sources.r1.bind = 172.25.4.23 a2.sources.r1.port = 4545 # Describe the sink a2.sinks.k1.type = logger a2.sinks.k1.channel = c1 # Use a channel which buffers events in memory a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 case13_ multi _s2.conf # Name the components on this agent a3.sources = r1 a3.sinks = k1 a3.channels = c1 # Describe/configure the source a3.sources.r1.type = avro a3.sources.r1.channels = c1 a3.sources.r1.bind = 172.25.4.33 a3.sources.r1.port = 4545 # Describe the sink a3.sinks.k1.type = logger a3.sinks.k1.channel = c1 # Use a channel which buffers events in memory a3.channels.c1.type = memory a3.channels.c1.capacity = 1000 a3.channels.c1.transactionCapacity = 100 #先啟動Avro的Source,監聽埠 flume-ng agent -c . -f case13_ multi _s1.conf -n a2 -Dflume.root.logger=INFO,console flume-ng agent -c . -f case13_ multi _s2.conf -n a3 -Dflume.root.logger=INFO,console #再啟動Avro的Sink flume-ng agent -c . -f case13_multi_sink.conf -n a1-Dflume.root.logger=INFO,console #根據配置檔案生成測試的header 為state的POST請求 curl -X POST -d '[{ "headers" :{"state" : "CZ"},"body" : "TEST1"}]' http://localhost:5140 curl -X POST -d '[{ "headers" :{"state" : "US"},"body" : "TEST2"}]' http://localhost:5140 curl -X POST -d '[{ "headers" :{"state" : "SH"},"body" : "TEST3"}]' http://localhost:5140 #檢視2個sink得到資料是否和配置檔案一致 Sink1: 2018-06-04 23:45:35,296 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{state=CZ} body: 54 45 53 54 31 TEST1 } 2018-06-04 23:45:50,309 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{state=SH} body: 54 45 53 54 33 TEST3 } Sink2: 2018-06-04 23:45:42,293 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{state=US} body: 54 45 53 54 32 TEST2 }
14:Test Failover Sink Processor
#檔名:case14_failover_sink.conf、case14_ failover _s1.conf、case14_ failover _s2.conf
#配置內容:
case14_failover_sink.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 172.25.4.23
a1.sinks.k1.port = 4545
a1.sinks.k2.type = avro
a1.sinks.k2.channel = c2
a1.sinks.k2.hostname = 172.25.4.33
a1.sinks.k2.port = 4545
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
case14_ failover _s1.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.channels = c1
a2.sources.r1.bind = 172.25.4.23
a2.sources.r1.port = 4545
# Describe the sink
a2.sinks.k1.type = logger
a2.sinks.k1.channel = c1
# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
case14_ failover _s2.conf
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.channels = c1
a3.sources.r1.bind = 172.25.4.33
a3.sources.r1.port = 4545
# Describe the sink
a3.sinks.k1.type = logger
a3.sinks.k1.channel = c1
# Use a channel which buffers events in memory
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
#先啟動Avro的Source,監聽埠
flume-ng agent -c . -f case14_ failover _s1.conf -n a2 -Dflume.root.logger=INFO,console
flume-ng agent -c . -f case14_ failover _s2.conf -n a3 -Dflume.root.logger=INFO,console
#再啟動Avro的Sink
flume-ng agent -c . -f case14_ failover _sink.conf -n a1-Dflume.root.logger=INFO,console
#生成測試log
echo "<37>test1 failover" | nc localhost 5140
#在sink2上產生log,sink1由於優先順序小,沒有產生
2018-06-05 00:10:51,194 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 31 20 66 61 69 6C 6F 76 65 72 test1 failover }
#主動關閉sink2,再次生成測試log
echo "<37>test2 failover" | nc localhost 5140
#在sink1上會同時生成test1和test2
2018-06-05 00:11:14,312 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 31 20 66 61 69 6C 6F 76 65 72 test1 failover }
2018-06-05 00:11:14,312 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 32 20 66 61 69 6C 6F 76 65 72 test2 failover }
#再次開啟sink2,log會根據優先順序再到sink2上
echo "<37>test4 failover" | nc localhost 5140
echo "<37>test5 failover" | nc localhost 5140
2018-06-05 00:12:33,071 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 34 20 66 61 69 6C 6F 76 65 72 test4 failover }
2018-06-05 00:12:55,088 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 35 20 66 61 69 6C 6F 76 65 72 test5 failover }
15:Test Load balancing Sink Processor
#檔名:case15_load_sink.conf、case15_ load _s1.conf、case15_ load _s2.conf
#配置內容:
注:load balance type下必須指定同一個channel到不同的sinks,否則不生效
case15_load_sink.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 172.25.4.23
a1.sinks.k1.port = 4545
a1.sinks.k2.type = avro
a1.sinks.k2.channel = c1
a1.sinks.k2.hostname = 172.25.4.33
a1.sinks.k2.port = 4545
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
case15_ load _s1.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.channels = c1
a2.sources.r1.bind = 172.25.4.23
a2.sources.r1.port = 4545
# Describe the sink
a2.sinks.k1.type = logger
a2.sinks.k1.channel = c1
# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
case15_ load _s2.conf
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.channels = c1
a3.sources.r1.bind = 172.25.4.33
a3.sources.r1.port = 4545
# Describe the sink
a3.sinks.k1.type = logger
a3.sinks.k1.channel = c1
# Use a channel which buffers events in memory
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
#先啟動Avro的Source,監聽埠
flume-ng agent -c . -f case15_ load _s1.conf -n a2 -Dflume.root.logger=INFO,console
flume-ng agent -c . -f case15_ load _s2.conf -n a3 -Dflume.root.logger=INFO,console
#再啟動Avro的Sink
flume-ng agent -c . -f case15_ load _sink.conf -n a1-Dflume.root.logger=INFO,console
#生成4個測試log
echo "<37>test2 loadbalance" | nc localhost 5140
echo "<37>test3 loadbalance" | nc localhost 5140
echo "<37>test4 loadbalance" | nc localhost 5140
echo "<37>test5 loadbalance" | nc localhost 5140
#檢視sink輸出結果是否為輪詢模式
Sink1:
2018-06-06 01:36:03,516 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 32 20 6C 6F 61 64 62 61 6C 61 6E 63 test2 loadbalanc }
2018-06-06 01:36:09,769 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 34 20 6C 6F 61 64 62 61 6C 61 6E 63 test4 loadbalanc }
Sink2:
2018-06-06 01:36:05,809 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 33 20 6C 6F 61 64 62 61 6C 61 6E 63 test3 loadbalanc }
2018-06-06 01:36:37,057 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 35 20 6C 6F 61 64 62 61 6C 61 6E 63 test5 loadbalanc }
16:Test Body Event Serializers
#檔名:case16_body.conf
#配置內容:
a1.sources.r1.type = org.apache.flume.source.http.HTTPSource
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume
a1.sinks.k1.sink.serializer = text
a1.sinks.k1.sink.serializer.appendNewline = false
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#啟動
flume-ng agent -c . -f case16_body.conf -n a1-Dflume.root.logger=INFO,console
#生成測試log
curl -X POST -d '[{ "headers" :{"host":"cc-staging-loginmgr2"},"body" : "TEST1 BODY TEXT"}]' http://localhost:5140
curl -X POST -d '[{ "headers" :{"host":"cc-staging-loginmgr2"},"body" : "TEST2 BODY TEXT"}]' http://localhost:5140
curl -X POST -d '[{ "headers" :{"host":"cc-staging-loginmgr2"},"body" : "TEST3 BODY TEXT"}]' http://localhost:5140
#檢視file roll 檔案中的文字內容
cat /var/log/flume/1370675739270-1
TEST1 BODY TEXT
TEST2 BODY TEXT
TEST3 BODY TEXT
#Avro Event Serializer
Alias: avro_event. This interceptor serializes Flume events into an Avro container file
把flume event變成avro 中包含的檔案
17:Test Timestamp Interceptor
#檔名:case17_timestamp_hostname.conf
#配置內容:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.preserveExisting = false
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i2.type = host
a1.sources.r1.interceptors.i2.hostHeader = hostname
a1.sources.r1.interceptors.i2.useIP = false
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = hdfs://master:9000/user/Hadoop/flume/collected/%Y-%m-%d/%H%M
a1.sinks.k1.hdfs.filePrefix = %{hostname}.
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#啟動agent
flume-ng agent -c . -f case17_timestamp_hostname.conf -n a1 -Dflume.root.logger=INFO,console
#生成測試log
echo "<37>test dynamic interceptor" | nc localhost 5140
#檢視hdfs生成的檔案,可以看到timestamp和hostname都已經生成在header裡面,可以根據自定義的格式生成資料夾
./hadoop dfs -ls hdfs://172.25.4.35:9000/user/hadoop/flume/collected/2018-06-16/2331/
Found 1 items
-rw-r--r-- 3 root supergroup 140 2018-06-16 23:32 /user/hadoop/flume/collected/2018-06-16/2331/cc-staging-loginmgr2..1371450697118
18:Test static Interceptor
#檔名:case18_static.conf
#配置內容:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = datacenter
a1.sources.r1.interceptors.i1.value = NEW_YORK
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#啟動agent
flume-ng agent -c . -f case18_static.conf -n a1 -Dflume.root.logger=INFO,console
#生成測試log
echo "<37>test1 static interceptor" | nc localhost 5140
#檢視console輸出結果
2018-06-17 00:15:38,453 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4, datacenter=NEW_YORK} body: 74 65 73 74 31 20 73 74 61 74 69 63 20 69 6E 74 test1 static int }