1. 程式人生 > 實用技巧 >flume採集MongoDB資料到Kafka中

flume採集MongoDB資料到Kafka中

環境說明

  • centos7(運行於vbox虛擬機器)
  • flume1.9.0(自定義了flume連線mongodb的source外掛)
  • jdk1.8
  • kafka(2.11)
  • zookeeper(3.57)
  • mongoDB4.0.0(無密碼)
  • xshell 7

自定義flume外掛

由於flume對資料庫的支援欠缺,flume的source元件中,沒有元件適用於連線關係型資料庫或非關係型資料庫。

對於關係型資料庫(RDB),github中開源外掛flume-ng-sql-source被廣泛用於對接RDB。但是對於非關係型資料庫,不同的非關係型資料庫之間都有些許差別,且沒有一個統一的,或者配對的外掛來支援非關係型資料庫。

因此,需要使用者自定義外掛來適配。

我自定義的flume-ng-mongodb-source的jar包如下:

()

將該jar包放在yourpath/flume/lib下(yourpath指你flume資料夾前面路徑,下同。同理,下文出現的yourhost指你本機的ip地址)

連線mongodb的配置檔案

在mongodb中建立database和collection,用於測試。

建立資料庫:

use flumetest

建立集合(隱式建立):

db.testCollection.insert({id:1,name:"333"})

檢視是否已經建立了資料庫:

> show dbs
admin      0.000GB
config     0.000GB
flumetest  0.000GB
local      0.000GB
test       0.000GB

檢視集合中的資料:

> db.testCollection.findOne()
{ "_id" : ObjectId("5fe29faad5553e6caaa8cbe9"), "id" : 1, "name" : "333" }

此外,我們需要將mongodb相關的驅動jar包放到yourpath/flume/lib

bson-3.12.7.jar
mongo-java-driver-3.12.7.jar
mongodb-driver-core-3.12.7.jar

flume連線mongodb需要先編寫相關的配置檔案,在yourpath/flume/conf裡新增配置檔案mongo-flume.conf

,具體的配置如下:

#This is a model,you can use for test
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = com.wms.flumesource.MongoDBSource
a1.sources.r1.Mongodb.url = yourhost:27017
a1.sources.r1.Mongodb.database=flumetest
a1.sources.r1.Mongodb.collection = testCollection
a1.sources.r1.Mongodb.column= _id
a1.sources.r1.start.from = 0
a1.sources.r1.interval=2000
a1.sources.r1.charset=UTF-8

# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = mongoTopic
a1.sinks.k1.brokerList = yourhost:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

引數說明:

# mongodb的url
a1.sources.r1.Mongodb.url = yourhost:27017
# 要連線的database
a1.sources.r1.Mongodb.database=flumetest
# 要連線的collection
a1.sources.r1.Mongodb.collection = testCollection
# mongodb中每條資料都有預設的_id,用於續傳
a1.sources.r1.Mongodb.column= _id


# sink使用了kafka,flume成功連線之後開啟消費監控就能看到資料了
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
# 接下來用於監控消費的topic名字
a1.sinks.k1.topic = mongoTopic

因為mongodb有叢集操作,所以flume-ng-mongodb-source也支援mongodb叢集,只需要在a1.sources.r1.Mongodb.url裡配置多個url即可,如:

a1.sources.r1.Mongodb.url = yourhost1:port1,yourhost2:port2,yourhost3:port3,......

採集mongodb資料實踐

啟動mongodb和kafka。

啟動flume

bin/flume-ng agent -n a1 -c conf -f conf/mongo-flume.conf -Dflume.root.logger=INFO,console

引數說明:

  • a1:是你在mongo-flume中給agent起的別名
  • conf/mongo-flume.conf:匯入前文所述的配置檔案,配置檔案在yourpath/flume/conf下。

啟動一個kafka消費監控:

bin/kafka-console-consumer.sh --bootstrap-server yourhost:9092 --topic mongoTopic --from-beginning

獲取testCollection中全部資料(下圖不是重複資料,是之前多次測試在topic中留下的資料):

往testCollection中新增一條資料:

db.testCollection.insert({id:7,name:"test",city:"Beijing"})

消費監控中的結果如下:

只讀增量資料

如果不想把collection中所有的資料都讀取出來,請修改flume-ng-mongodb-source原始碼。

在MongoDBSource.java檔案中,找到run方法,取消掉events.clear()的註釋。

再次打包,替換掉lib下flume-ng-mongodb-source的jar包。

然後再次執行上面的啟動操作:

bin/kafka-console-consumer.sh --bootstrap-server yourhost:9092 --topic mongoTopic --from-beginning


bin/kafka-console-consumer.sh --bootstrap-server yourhost:9092 --topic mongoTopic --from-beginning

插入一條資料:

db.testCollection.insert({id:8,name:"增量"})

檢視消費監控:

可以看到只有新增的資料了,不會再讀取所有的資料

再插入一條資料實驗一下:

db.testCollection.insert({id:9,source:"MongoDBSource",channle:"memory",sink:"kafka"})