1. 程式人生 > >多路分流技術flume

多路分流技術flume

[[email protected] apache-flume-1.8.0-bin]# cat duokdmultiplexing.conf
#example.conf: A single-node Flume configuration

#Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
#分流
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = orgId
a1.sources.r1.selector.mapping.100010039 = c1
a1.sources.r1.selector.mapping.100010040 = c1
a1.sources.r1.selector.default = c2
#a1.sources.r1.useFlumeEventFormat = true
a1.sources.r1.useAvroEventFormat = true
#攔截器
a1.sources.r1.interceptors =i1
# 自定義攔截器
a1.sources.r1.interceptors.i1.type =com.springboot.MongoDB.flume.kafkaInterceptor$Builder

#Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = hdp4:6667,hdp5:6667,hdp6:6667
a1.sources.r1.kafka.consumer.group.id = flume1
# 定義kafka所在zk的地址
a1.sources.r1.zookeeperConnect = hdp1.hdp:2181,hdp2.hdp:2181,hdp3.hdp:2181
a1.sources.r1.kafka.topics = IOT_DS_DATA_BACK2
# 消費超時時間
a1.sources.r1.kafka.consumer.timeout.ms = 10000
#向channel寫入訊息的最多條數
a1.sources.r1.batchSize = 50000
#向channel書寫的最大時間 (毫秒)
a1.sources.r1.batchDurationMillis = 2000
#9.7mb
a1.sources.r1.kafka.consumer.max.partition.fetch.bytes = 10240000

# Describe the sink1
a1.sinks.k1.type = hdfs
# hdfs://hdp1.hdp:8020/IOT/*/%y-%m-%d/%H-%M
a1.sinks.k1.hdfs.path = hdfs://hdp1.hdp:8020/IOT/111/20%y/%m/%d-%H-%M
#用本地時間格式化目錄
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#下沉後, 生成的檔案型別,預設是Sequencefile,可用DataStream,則為普通文字  a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.fileType = DataStream
#fileType: 預設值為SequenceFile,檔案格式主要包括:SequenceFile, DataStream, CompressedStream;
#當使用DataStream時,檔案不會被壓縮,則不需要設定hdfs.codeC; 當使用CompressedStream時,則必須設定一個正確的hdfs.codeC值
#a1.sinks.k1.hdfs.fileType = CompressedStream
#codeC: 檔案壓縮格式,包括:gzip, bzip2, lzo, lzop, snappy;
#a1.sinks.k1.hdfs.codeC = lzop
#字首
a1.sinks.k1.hdfs.filePrefix = data
#字尾
a1.sinks.k1.hdfs.fileSuffix=.json
#檔案回滾之前等待的時時間單位 秒
a1.sinks.k1.hdfs.rollInterval = 100
#檔案滾動大的大小限制(bytes)8000000  /1024/1024
a1.sinks.k1.hdfs.rollSize =  90000000
#寫入多少個 event 資料後滾動檔案(事件個數)
a1.sinks.k1.hdfs.rollCount = 70000
#hdfs sink啟動的操作HDFS的執行緒數。
a1.sinks.k1.hdfs.threadsPoolSize = 100
#預設值:1,hdfs sink 啟動的根據時間滾動檔案的執行緒數。
a1.sinks.k1.hdfs.rollTimerPoolSize = 10
# 事件就往裡面寫入
a1.sinks.k1.hdfs.batchSize = 100000
a1.sinks.k1.hdfs.txnEventMax = 100000


# 10 分鐘就建立檔案
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
#預設值為0, 當目前被開啟的臨時檔案在該引數指定的時間(秒)內,沒有任何資料寫入,則將該臨時檔案關閉並重命名成目標檔案
#a1.sinks.k1.hdfs.idleTimeout=300
#預設值為HDFS副本數;寫入HDFS檔案塊的最小副本數,該引數會影響檔案的滾動配置,一般將該引數配置成1,才可以按照配置正確滾動檔案;
a1.sinks.k1.hdfs.minBlockReplicas=1
#預設值5000,最大允許開啟的HDFS檔案數,當開啟的檔案數達到該值,最早開啟的檔案將會被關閉;
a1.sinks.k1.hdfs.maxOpenFiles=5000
#預設值:10000,執行HDFS操作的超時時間(單位:毫秒);
a1.sinks.k1.hdfs.callTimeout=100000

# Describe the sink2
a1.sinks.k2.type = hdfs
# hdfs://hdp1.hdp:8020/IOT/*/%y-%m-%d/%H-%M
a1.sinks.k2.hdfs.path = hdfs://hdp1.hdp:8020/IOT/112/20%y/%m/%d-%H-%M
#用本地時間格式化目錄
a1.sinks.k2.hdfs.useLocalTimeStamp = true
#下沉後, 生成的檔案型別,預設是Sequencefile,可用DataStream,則為普通文字  a1.sinks.k2.hdfs.fileType = DataStream
a1.sinks.k2.hdfs.writeFormat = Text
a1.sinks.k2.hdfs.fileType = DataStream
#fileType: 預設值為SequenceFile,檔案格式主要包括:SequenceFile, DataStream, CompressedStream;
#當使用DataStream時,檔案不會被壓縮,則不需要設定hdfs.codeC; 當使用CompressedStream時,則必須設定一個正確的hdfs.codeC值
#a1.sinks.k2.hdfs.fileType = CompressedStream
#codeC: 檔案壓縮格式,包括:gzip, bzip2, lzo, lzop, snappy;
#a1.sinks.k2.hdfs.codeC = lzop
#字首
a1.sinks.k2.hdfs.filePrefix = data
#字尾
a1.sinks.k2.hdfs.fileSuffix=.json
#檔案回滾之前等待的時時間單位 秒
a1.sinks.k2.hdfs.rollInterval = 100
#檔案滾動大的大小限制(bytes)8000000  /1024/1024
a1.sinks.k2.hdfs.rollSize =  90000000
#寫入多少個 event 資料後滾動檔案(事件個數)
a1.sinks.k2.hdfs.rollCount = 70000
#hdfs sink啟動的操作HDFS的執行緒數。
a1.sinks.k2.hdfs.threadsPoolSize = 100
#預設值:1,hdfs sink 啟動的根據時間滾動檔案的執行緒數。
a1.sinks.k2.hdfs.rollTimerPoolSize = 10
# 事件就往裡面寫入
a1.sinks.k2.hdfs.batchSize = 100000
a1.sinks.k2.hdfs.txnEventMax = 100000


# 10 分鐘就建立檔案
a1.sinks.k2.hdfs.round = true
a1.sinks.k2.hdfs.roundValue = 10
a1.sinks.k2.hdfs.roundUnit = minute
#預設值為0, 當目前被開啟的臨時檔案在該引數指定的時間(秒)內,沒有任何資料寫入,則將該臨時檔案關閉並重命名成目標檔案
#a1.sinks.k2.hdfs.idleTimeout=300
#預設值為HDFS副本數;寫入HDFS檔案塊的最小副本數,該引數會影響檔案的滾動配置,一般將該引數配置成1,才可以按照配置正確滾動檔案;
a1.sinks.k2.hdfs.minBlockReplicas=1
#預設值5000,最大允許開啟的HDFS檔案數,當開啟的檔案數達到該值,最早開啟的檔案將會被關閉;
a1.sinks.k2.hdfs.maxOpenFiles=5000
#預設值:10000,執行HDFS操作的超時時間(單位:毫秒);
a1.sinks.k2.hdfs.callTimeout=100000


# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 800000000
a1.channels.c1.transactionCapacity = 500000
a1.channels.c1.keep-alive = 50

# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 800000000
a1.channels.c2.transactionCapacity = 500000
a1.channels.c2.keep-alive = 50


# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2