通過flume把oracle資料匯入到kafka
阿新 • • 發佈:2018-12-07
版本flume 1.6 kafka2.11
flume中缺少flume-ng-sql-source的jar包需要去下載,下載地址可以是:https://github.com/keedio/flume-ng-sql-source.git 但比較麻煩,也可以下載已經弄好的jar:https://download.csdn.net/download/chongxin1/9892184
第一步:把下載好的flume-ng-sql-source的jar包放入到,flume的lib目錄下。
第二步:我用的是oracle所以,就把oracle的jdbc包放到flume的lib目錄下。我放的是ojdbc5.jar
第三步:配置flume的conf配置檔案。
vi sql-kafka.conf 具體配置如下:
agentTest.channels = channelTest agentTest.sources = sourceTest agentTest.sinks = sinkTest ###########sql source################# # For each Test of the sources, the type is defined agentTest.sources.sourceTest.type = org.keedio.flume.source.SQLSource agentTest.sources.sourceTest.hibernate.connection.url = jdbc:oracle:thin:@192.168.200.8:1521/orcl # Hibernate Database connection properties agentTest.sources.sourceTest.hibernate.connection.user = typpcits agentTest.sources.sourceTest.hibernate.connection.password = typpcits agentTest.sources.sourceTest.hibernate.connection.autocommit = true agentTest.sources.sourceTest.hibernate.dialect = org.hibernate.dialect.Oracle10gDialect agentTest.sources.sourceTest.hibernate.connection.driver_class = oracle.jdbc.driver.OracleDriver agentTest.sources.sourceTest.run.query.delay=1 agentTest.sources.sourceTest.status.file.path = /opt/flume agentTest.sources.sourceTest.status.file.name = agentTest.sqlSource.status # Custom query agentTest.sources.sourceTest.custom.query = SELECT * FROM ITS_BASE_AREA agentTest.sources.sourceTest.batch.size = 6000 agentTest.sources.sourceTest.max.rows = 1000 agentTest.sources.sourceTest.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider agentTest.sources.sourceTest.hibernate.c3p0.min_size=1 agentTest.sources.sourceTest.hibernate.c3p0.max_size=10 ############################## agentTest.channels.channelTest.type = memory agentTest.channels.channelTest.capacity = 10000 agentTest.channels.channelTest.transactionCapacity = 10000 agentTest.channels.channelTest.byteCapacityBufferPercentage = 20 agentTest.channels.channelTest.byteCapacity = 1600000 agentTest.sinks.sinkTest.type = org.apache.flume.sink.kafka.KafkaSink agentTest.sinks.sinkTest.topic = TestTopic agentTest.sinks.sinkTest.brokerList = 192.168.72.129:9092,192.168.72.130:9092,192.168.72.131:9092 agentTest.sinks.sinkTest.requiredAcks = 1 agentTest.sinks.sinkTest.batchSize = 20 agentTest.sinks.sinkTest.channel = channelTest agentTest.sinks.sinkTest.channel = channelTest agentTest.sources.sourceTest.channels=channelTest
以上需要替換別忘記換了。
第四步:在flume bin路徑下執行命令:
./bin/flume-ng agent -n agentTest -c conf -f conf/sql-kafka.conf -Dflume.root.logger=INFO,console
第五步:在kafka主題TestTopic上看有沒有資料。在路徑kafka的bin下執行命令:
./kafka-console-consumer.sh --zookeeper 192.168.72.129:2181 --topic TestTopic --from-beginning
如果成功的話,這時候就可以看見你查詢oracle的資料了。