1. 程式人生 > >Flume同步kafka配置檔案

Flume同步kafka配置檔案

到flume官網下載flume,解壓

cd $FLUME_HOME/conf
cp flume-conf.properties.template applog-conf.properties

修改applog-conf.properties屬性


agent.sources = KafkaApplog
agent.channels = ApplogChannel
agent.sinks = ApplogSink

# For each one of the sources, the type is defined

agent.sources.KafkaApplog.channels
= ApplogChannel agent.sinks.ApplogSink.channel = ApplogChannel agent.sources.KafkaApplog.type = org.apache.flume.source.kafka.KafkaSource agent.sources.KafkaApplog.batchSize = 1000 agent.sources.KafkaApplog.batchDurationMillis = 20000 agent.sources.KafkaApplog.kafka.bootstrap.servers = dn120:9092,dn121:9092
,dn122:9092 agent.sources.KafkaApplog.kafka.topics = applog agent.sources.KafkaApplog.kafka.consumer.group.id = flume agent.sources.KafkaApplog.kafka.consumer.auto.offset.reset=earliest agent.channels.ApplogChannel.type = memory agent.channels.ApplogChannel.capacity=1000000 agent.channels.ApplogChannel.transactionCapacity
=2000 agent.channels.ApplogChannel.keep-alive = 60 agent.sinks.ApplogSink.type = hdfs # 注意, 我們輸出到下面一個子資料夾datax中 agent.sinks.ApplogSink.hdfs.path = hdfs://adups:8020/user/kafka/flume/ota_app_log/pt=%Y-%m-%d agent.sinks.ApplogSink.hdfs.writeFormat = Text agent.sinks.ApplogSink.hdfs.fileType = DataStream agent.sinks.ApplogSink.hdfs.callTimeout= 300000 agent.sinks.ApplogSink.hdfs.rollSize = 10240000 agent.sinks.ApplogSink.hdfs.rollCount = 20000 agent.sinks.ApplogSink.hdfs.rollInterval = 300 agent.sinks.ApplogSink.hdfs.inUsePrefix = _

進入FLUME_HOME目錄,執行啟動命令

bin/flume-ng agent -c conf  -f conf/applog-conf.properties -n agent &

本過程消費kafka中applog同步到HDFS目錄
目錄結構如下:
這裡寫圖片描述

它會先生成臨時檔案tmp以”_”為開頭,再轉變為正式檔案.