flume 的source 、channel和sink 多種組合
阿新 • • 發佈:2019-01-01
樂高積木flume
flume 有三大元件source 、channel和sink,各個元件之間都可以相互組合使用,各元件間耦合度低。使用靈活,方便。
1.多sink
channel 的內容只輸出一次,同一個event 如果sink1 輸出,sink2 不輸出;如果sink1 輸出,sink1 不輸出。 最終 sink1+sink2=channel 中的資料。
配置檔案如下:
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.shell = /bin/bash -c
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/apps/logs/tail4.log
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#sink1
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume .sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type = snappy
#sink2
a1.sinks.k2.type = file_roll
a1.sinks.k2.channel = c1
#a1.sinks.k2.sink.rollInterval=0
a1.sinks.k2.sink.directory = /opt/apps/tmp
2.多 channel 多sink ,每個sink 輸出內容一致
(memory channel 用於kafka操作,實時性高,file channel 用於 sink file 資料安全性高)
(多channel 單 sink 的情況沒有舉例,個人感覺用處不廣泛。)
配置檔案如下:
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.shell = /bin/bash -c
a1.sources.r1.channels = c1 c2
a1.sources.r1.command = tail -F /opt/apps/logs/tail4.log
#多個channel 的資料相同
a1.sources.r1.selector.type=replicating
# channel1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#channel2
a1.channels.c2.type = file
a1.channels.c2.checkpointDir = /opt/apps/flume-1.7.0/checkpoint
a1.channels.c2.dataDirs = /opt/apps/flume-1.7.0/data
#sink1
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type = snappy
#sink2
a1.sinks.k2.type = file_roll
a1.sinks.k2.channel = c2
#a1.sinks.k2.sink.rollInterval=0
a1.sinks.k2.sink.directory = /opt/apps/tmp
3. 多source 單 channel 單 sink
多個source 可以讀取多種資訊放在一個channel 然後輸出到同一個地方
配置檔案如下:
a1.sources = r1 r2
a1.sinks = k1
a1.channels = c1
# source1
a1.sources.r1.type = exec
a1.sources.r1.shell = /bin/bash -c
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/apps/logs/tail4.log
# source2
a1.sources.r2.type = exec
a1.sources.r2.shell = /bin/bash -c
a1.sources.r2.channels = c1
a1.sources.r2.command = tail -F /opt/apps/logs/tail2.log
# channel1 in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#sink1
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type = snappy
flume 像樂高積木一樣可以自己隨心所欲將不同的元件進行搭配使用,耦合度低。