1. 程式人生 > >通過flume把oracle資料匯入到kafka

通過flume把oracle資料匯入到kafka

版本flume 1.6 kafka2.11

flume中缺少flume-ng-sql-source的jar包需要去下載,下載地址可以是:https://github.com/keedio/flume-ng-sql-source.git   但比較麻煩,也可以下載已經弄好的jar:https://download.csdn.net/download/chongxin1/9892184


第一步:把下載好的flume-ng-sql-source的jar包放入到,flume的lib目錄下。


第二步:我用的是oracle所以,就把oracle的jdbc包放到flume的lib目錄下。我放的是ojdbc5.jar


第三步:配置flume的conf配置檔案。

vi sql-kafka.conf   具體配置如下:

agentTest.channels = channelTest
agentTest.sources = sourceTest
agentTest.sinks = sinkTest
###########sql source#################
# For each Test of the sources, the type is defined
agentTest.sources.sourceTest.type = org.keedio.flume.source.SQLSource
agentTest.sources.sourceTest.hibernate.connection.url = jdbc:oracle:thin:@192.168.200.8:1521/orcl
# Hibernate Database connection properties
agentTest.sources.sourceTest.hibernate.connection.user = typpcits
agentTest.sources.sourceTest.hibernate.connection.password = typpcits
agentTest.sources.sourceTest.hibernate.connection.autocommit = true
agentTest.sources.sourceTest.hibernate.dialect = org.hibernate.dialect.Oracle10gDialect
agentTest.sources.sourceTest.hibernate.connection.driver_class = oracle.jdbc.driver.OracleDriver
agentTest.sources.sourceTest.run.query.delay=1
agentTest.sources.sourceTest.status.file.path = /opt/flume
agentTest.sources.sourceTest.status.file.name = agentTest.sqlSource.status
# Custom query
agentTest.sources.sourceTest.custom.query = SELECT * FROM ITS_BASE_AREA 
agentTest.sources.sourceTest.batch.size = 6000
agentTest.sources.sourceTest.max.rows = 1000
agentTest.sources.sourceTest.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
agentTest.sources.sourceTest.hibernate.c3p0.min_size=1
agentTest.sources.sourceTest.hibernate.c3p0.max_size=10
##############################
agentTest.channels.channelTest.type = memory
agentTest.channels.channelTest.capacity = 10000
agentTest.channels.channelTest.transactionCapacity = 10000
agentTest.channels.channelTest.byteCapacityBufferPercentage = 20
agentTest.channels.channelTest.byteCapacity = 1600000
 
agentTest.sinks.sinkTest.type = org.apache.flume.sink.kafka.KafkaSink
agentTest.sinks.sinkTest.topic = TestTopic
agentTest.sinks.sinkTest.brokerList = 192.168.72.129:9092,192.168.72.130:9092,192.168.72.131:9092
agentTest.sinks.sinkTest.requiredAcks = 1
agentTest.sinks.sinkTest.batchSize = 20
agentTest.sinks.sinkTest.channel = channelTest
 
agentTest.sinks.sinkTest.channel = channelTest
agentTest.sources.sourceTest.channels=channelTest

以上需要替換別忘記換了。

第四步:在flume bin路徑下執行命令:

./bin/flume-ng agent -n agentTest  -c conf -f conf/sql-kafka.conf -Dflume.root.logger=INFO,console

第五步:在kafka主題TestTopic上看有沒有資料。在路徑kafka的bin下執行命令:

./kafka-console-consumer.sh --zookeeper 192.168.72.129:2181 --topic TestTopic --from-beginning

如果成功的話,這時候就可以看見你查詢oracle的資料了。