1. 程式人生 > >flume資料傳輸到kafka

flume資料傳輸到kafka

flume 簡單介紹

當你看到這篇文章時,應該對flume有一個大概瞭解但是為照顧剛入門的同學所以還是會說下flume,剛開始使用flume時不需要理解太多裡面的東西,只需要理解下面的圖就可以使用flume把日誌資料傳入kafka中,下圖中的hdfs只是有代表性的sink而以,我在實際使用中sink是kafka
這裡寫圖片描述

flume安裝

flume環境準備

  • centos 6.5
  • JDK 1.7+

flume下載安裝

  • flume 1.7 下載連結
  • 安裝flume
    1.tar -zxvf apache-flume-1.7.0-bin.tar.gz
    2.mv apache-flume-1.7.0-bin flume
    3.cp conf/flume-conf.properties.template conf/flume-conf.properties # flume-conf.properties 配置source,channel,sink等資訊
    4.cp conf/flume-env.sh.template conf/flume-env.sh # flume-env.sh配置agent啟動項及JAVA環境變數等

flume配置

  • 配置flume-conf.properties
agent.sources=r1
agent.sinks=k1
agent.channels=c1

agent.sources.r1.type=exec
agent.sources.r1.command=tail -F /data/logs/access.log
agent.sources.r1.restart=true
agent.sources.r1.batchSize=1000
agent.sources.r1.batchTimeout=3000
agent.sources.r1.channels=c1

agent.channels
.c1.type=memory agent.channels.c1.capacity=102400 agent.channels.c1.transactionCapacity=1000 agent.channels.c1.byteCapacity=134217728 agent.channels.c1.byteCapacityBufferPercentage=80 agent.sinks.k1.channel=c1 agent.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink agent.sinks.k1.kafka.topic=xxxxx-kafka agent.sinks
.k1.kafka.bootstrap.servers=x.x.x.x:9092,x.x.x.x:9092 agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder agent.sinks.k1.flumeBatchSize=1000 agent.sinks.k1.useFlumeEventFormat=true

命令規則是 r1->source k1->sink c1->channels agent名稱在你啟動時-n的引數值

  • 配置flume-env.sh
export JAVA_HOME=/data/java/jdk1.8.0_102/

我這邊只配置JAVA_HOME,還有agent啟動的一些JMX選項沒有加,這些可以根據自己的需要新增
* 啟動flume-agent

啟動flume-agent
./bin/flume-ng agent -c conf -f conf/flume-conf.properties -n agent -Dflume.root.logger=INFO,console
-c 配置檔案目錄 -f 指定flume配置檔案 -n flume客戶端名稱 Dflume啟動時將INFO級別的LOG列印在控制檯

總結

1.flume中可以自己定義source,sink,你可以根據自己的需要去做修改或者重新 git地址,從github中pull程式碼如你只是修改某模組程式碼就只需要把之前的jar刪除,把編譯好的jar丟上去就可以,其他的玩法可以看官方文件
2.在使用中memory channels當agent被殺時資料會丟失不會恢復
3.flume在日聚合中是非常靈活的,可以組成各種玩法如我從某tcp埠拿資料傳入別的flume agent中等
4.建議看一篇官方文件