Flume連線oracle實時推送資料到kafka
阿新 • • 發佈:2019-01-06
版本號:
RedHat6.5 JDK1.8 flume-1.6.0 kafka_2.11-0.8.2.1flume安裝
kafka安裝
1、下載flume-ng-sql-source-1.4.3.jar
flume-ng-sql-source-1.4.3.jar是flume用於連線資料庫的重要支撐jar包。
2、把flume-ng-sql-source-1.4.3.jar放到flume的lib目錄下
3、把oracle(此處用的是oracle庫)的驅動包放到flume的lib目錄下
oracle的jdbc驅動包,放在oracle安裝目錄下,路徑為:D:\app\product\11.2.0\dbhome_1\jdbc\lib
如圖:
把ojdbc5.jar放到flume的lib目錄下,如圖:
4、新建flume-sql.conf
在conf目錄新建flume-sql.conf :- touch /usr/local/flume/apache-flume-1.6.0-bin/conf/flume-sql.conf
- sudo gedit /usr/local/flume/apache-flume-1.6.0-bin/conf/flume-sql.conf
- agentOne.channels = channelOne
- agentOne.sources = sourceOne
- agentOne
- ###########sql source#################
- # For each one of the sources, the type is defined
- agentOne.sources.sourceOne.type = org.keedio.flume.source.SQLSource
- agentOne.sources.sourceOne.hibernate.connection.url = jdbc:oracle:thin:@192.168.168.100:1521/orcl
- # Hibernate Database connection properties
- agentOne.sources.sourceOne.hibernate.connection.user = flume
- agentOne.sources.sourceOne.hibernate.connection.password =1234
- agentOne.sources.sourceOne.hibernate.connection.autocommit =true
- agentOne.sources.sourceOne.hibernate.dialect = org.hibernate.dialect.Oracle10gDialect
- agentOne.sources.sourceOne.hibernate.connection.driver_class = oracle.jdbc.driver.OracleDriver
- agentOne.sources.sourceOne.run.query.delay=10000
- agentOne.sources.sourceOne.status.file.path =/tmp
- agentOne.sources.sourceOne.status.file.name = sqlSource.status
- # Custom query
- agentOne.sources.sourceOne.start.from=0
- agentOne.sources.sourceOne.custom.query =select sysdate from dual
- agentOne.sources.sourceOne.batch.size =1000
- agentOne.sources.sourceOne.max.rows =1000
- agentOne.sources.sourceOne.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
- agentOne.sources.sourceOne.hibernate.c3p0.min_size=1
- agentOne.sources.sourceOne.hibernate.c3p0.max_size=10
- ##############################
- agentOne.channels.channelOne.type = memory
- agentOne.channels.channelOne.capacity =10000
- agentOne.channels.channelOne.transactionCapacity =10000
- agentOne.channels.channelOne.byteCapacityBufferPercentage =20
- agentOne.channels.channelOne.byteCapacity =800000
- agentOne.sinks.sinkOne.type = org.apache.flume.sink.kafka.KafkaSink
- agentOne.sinks.sinkOne.topic = test
- agentOne.sinks.sinkOne.brokerList =192.168.168.200:9092
- agentOne.sinks.sinkOne.requiredAcks =1
- agentOne.sinks.sinkOne.batchSize =20
- agentOne.sinks.sinkOne.channel = channelOne
- agentOne.sinks.sinkOne.channel = channelOne
- agentOne.sources.sourceOne.channels=channelOne
5、flume-ng啟動flume-sql.conf和測試
- cd /usr/local/flume/apache-flume-1.6.0-bin
- bin/flume-ng agent --conf conf --conf-file conf/flume-sql.conf --name agentOne -Dflume.root.logger=INFO,console
執行成功日誌如下:
- 2017-07-0800:12:55,393(lifecycleSupervisor-1-1)[INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)]Monitored counter groupfor type: SINK, name: sinkOne:Successfully registered newMBean.
- 2017-07-0800:12:55,394(lifecycleSupervisor-1-1)[INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)]Component type: SINK, name: sinkOne started
- 2017-07-0800:12:55,463(SinkRunner-PollingRunner-DefaultSinkProcessor)[INFO - kafka.utils.Logging$class.info(Logging.scala:68)]Fetching metadata from broker id:0,host:localhost,port:9092with correlation id 0for1 topic(s)Set(test)
- 2017-07-0800:12:55,528(SinkRunner-PollingRunner-DefaultSinkProcessor)[INFO - kafka.utils.Logging$class.info(Logging.scala:68)]Connected to localhost:9092for producing
- 2017-07-0800:12:55,551(SinkRunner-PollingRunner-DefaultSinkProcessor)[INFO - kafka.utils