Flume同步kafka配置檔案
阿新 • • 發佈:2018-12-31
到flume官網下載flume,解壓
cd $FLUME_HOME/conf
cp flume-conf.properties.template applog-conf.properties
修改applog-conf.properties屬性
agent.sources = KafkaApplog
agent.channels = ApplogChannel
agent.sinks = ApplogSink
# For each one of the sources, the type is defined
agent.sources.KafkaApplog.channels = ApplogChannel
agent.sinks.ApplogSink.channel = ApplogChannel
agent.sources.KafkaApplog.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.KafkaApplog.batchSize = 1000
agent.sources.KafkaApplog.batchDurationMillis = 20000
agent.sources.KafkaApplog.kafka.bootstrap.servers = dn120:9092,dn121:9092 ,dn122:9092
agent.sources.KafkaApplog.kafka.topics = applog
agent.sources.KafkaApplog.kafka.consumer.group.id = flume
agent.sources.KafkaApplog.kafka.consumer.auto.offset.reset=earliest
agent.channels.ApplogChannel.type = memory
agent.channels.ApplogChannel.capacity=1000000
agent.channels.ApplogChannel.transactionCapacity =2000
agent.channels.ApplogChannel.keep-alive = 60
agent.sinks.ApplogSink.type = hdfs
# 注意, 我們輸出到下面一個子資料夾datax中
agent.sinks.ApplogSink.hdfs.path = hdfs://adups:8020/user/kafka/flume/ota_app_log/pt=%Y-%m-%d
agent.sinks.ApplogSink.hdfs.writeFormat = Text
agent.sinks.ApplogSink.hdfs.fileType = DataStream
agent.sinks.ApplogSink.hdfs.callTimeout= 300000
agent.sinks.ApplogSink.hdfs.rollSize = 10240000
agent.sinks.ApplogSink.hdfs.rollCount = 20000
agent.sinks.ApplogSink.hdfs.rollInterval = 300
agent.sinks.ApplogSink.hdfs.inUsePrefix = _
進入FLUME_HOME目錄,執行啟動命令
bin/flume-ng agent -c conf -f conf/applog-conf.properties -n agent &
本過程消費kafka中applog同步到HDFS目錄
目錄結構如下:
它會先生成臨時檔案tmp以”_”為開頭,再轉變為正式檔案.