1. 程式人生 > >flume 的source 、channel和sink 多種組合

flume 的source 、channel和sink 多種組合

樂高積木flume

flume 有三大元件source 、channel和sink,各個元件之間都可以相互組合使用,各元件間耦合度低。使用靈活,方便。

1.多sink

channel 的內容只輸出一次,同一個event 如果sink1 輸出,sink2 不輸出;如果sink1 輸出,sink1 不輸出。 最終 sink1+sink2=channel 中的資料。

配置檔案如下:

a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type
= exec a1.sources.r1.shell = /bin/bash -c a1.sources.r1.channels = c1 a1.sources.r1.command = tail -F /opt/apps/logs/tail4.log # channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #sink1 a1.sinks.k1.channel = c1 a1.sinks.k1.type = org.apache.flume
.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = mytopic a1.sinks.k1.kafka.bootstrap.servers = localhost:9092 a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 a1.sinks.ki.kafka.producer.compression.type = snappy #sink2 a1.sinks.k2.type
= file_roll a1.sinks.k2.channel = c1 #a1.sinks.k2.sink.rollInterval=0 a1.sinks.k2.sink.directory = /opt/apps/tmp

2.多 channel 多sink ,每個sink 輸出內容一致

(memory channel 用於kafka操作,實時性高,file channel 用於 sink file 資料安全性高)
(多channel 單 sink 的情況沒有舉例,個人感覺用處不廣泛。)

配置檔案如下:

a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.shell = /bin/bash -c
a1.sources.r1.channels = c1 c2
a1.sources.r1.command = tail -F /opt/apps/logs/tail4.log
#多個channel 的資料相同
a1.sources.r1.selector.type=replicating


# channel1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#channel2
a1.channels.c2.type = file
a1.channels.c2.checkpointDir = /opt/apps/flume-1.7.0/checkpoint
a1.channels.c2.dataDirs = /opt/apps/flume-1.7.0/data

#sink1
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type = snappy

#sink2
a1.sinks.k2.type = file_roll
a1.sinks.k2.channel = c2
#a1.sinks.k2.sink.rollInterval=0
a1.sinks.k2.sink.directory = /opt/apps/tmp

3. 多source 單 channel 單 sink

多個source 可以讀取多種資訊放在一個channel 然後輸出到同一個地方
配置檔案如下:

a1.sources = r1 r2
a1.sinks = k1
a1.channels = c1

# source1
a1.sources.r1.type = exec
a1.sources.r1.shell = /bin/bash -c

a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/apps/logs/tail4.log

# source2
a1.sources.r2.type = exec
a1.sources.r2.shell = /bin/bash -c
a1.sources.r2.channels = c1
a1.sources.r2.command = tail -F /opt/apps/logs/tail2.log


# channel1  in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100



#sink1
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type = snappy

flume 像樂高積木一樣可以自己隨心所欲將不同的元件進行搭配使用,耦合度低。