1. 程式人生 > >Flume連線oracle實時推送資料到kafka

Flume連線oracle實時推送資料到kafka

版本號:

RedHat6.5   JDK1.8    flume-1.6.0   kafka_2.11-0.8.2.1

flume安裝

kafka安裝

1、下載flume-ng-sql-source-1.4.3.jar

flume-ng-sql-source-1.4.3.jar是flume用於連線資料庫的重要支撐jar包。

2、把flume-ng-sql-source-1.4.3.jar放到flume的lib目錄下

3、把oracle(此處用的是oracle庫)的驅動包放到flume的lib目錄下

oracle的jdbc驅動包,放在oracle安裝目錄下,路徑為:D:\app\product\11.2.0\dbhome_1\jdbc\lib

如圖:

把ojdbc5.jar放到flume的lib目錄下,如圖:

4、新建flume-sql.conf

在conf目錄新建flume-sql.conf :
  1. touch /usr/local/flume/apache-flume-1.6.0-bin/conf/flume-sql.conf
  2. sudo gedit /usr/local/flume/apache-flume-1.6.0-bin/conf/flume-sql.conf
flume-sql.conf輸入以下內容:
  1. agentOne.channels = channelOne
  2. agentOne.sources = sourceOne
  3. agentOne
    .sinks = sinkOne
  4. ###########sql source#################
  5. # For each one of the sources, the type is defined
  6. agentOne.sources.sourceOne.type = org.keedio.flume.source.SQLSource
  7. agentOne.sources.sourceOne.hibernate.connection.url = jdbc:oracle:thin:@192.168.168.100:1521/orcl
  8. # Hibernate Database connection properties
  9. agentOne.sources.sourceOne.hibernate.connection.user = flume
  10. agentOne.sources.sourceOne.hibernate.connection.password =1234
  11. agentOne.sources.sourceOne.hibernate.connection.autocommit =true
  12. agentOne.sources.sourceOne.hibernate.dialect = org.hibernate.dialect.Oracle10gDialect
  13. agentOne.sources.sourceOne.hibernate.connection.driver_class = oracle.jdbc.driver.OracleDriver
  14. agentOne.sources.sourceOne.run.query.delay=10000
  15. agentOne.sources.sourceOne.status.file.path =/tmp
  16. agentOne.sources.sourceOne.status.file.name = sqlSource.status
  17. # Custom query
  18. agentOne.sources.sourceOne.start.from=0
  19. agentOne.sources.sourceOne.custom.query =select sysdate from dual
  20. agentOne.sources.sourceOne.batch.size =1000
  21. agentOne.sources.sourceOne.max.rows =1000
  22. agentOne.sources.sourceOne.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
  23. agentOne.sources.sourceOne.hibernate.c3p0.min_size=1
  24. agentOne.sources.sourceOne.hibernate.c3p0.max_size=10
  25. ##############################
  26. agentOne.channels.channelOne.type = memory
  27. agentOne.channels.channelOne.capacity =10000
  28. agentOne.channels.channelOne.transactionCapacity =10000
  29. agentOne.channels.channelOne.byteCapacityBufferPercentage =20
  30. agentOne.channels.channelOne.byteCapacity =800000
  31. agentOne.sinks.sinkOne.type = org.apache.flume.sink.kafka.KafkaSink
  32. agentOne.sinks.sinkOne.topic = test
  33. agentOne.sinks.sinkOne.brokerList =192.168.168.200:9092
  34. agentOne.sinks.sinkOne.requiredAcks =1
  35. agentOne.sinks.sinkOne.batchSize =20
  36. agentOne.sinks.sinkOne.channel = channelOne
  37. agentOne.sinks.sinkOne.channel = channelOne
  38. agentOne.sources.sourceOne.channels=channelOne

5、flume-ng啟動flume-sql.conf和測試

  1. cd /usr/local/flume/apache-flume-1.6.0-bin
  2. bin/flume-ng agent --conf conf --conf-file conf/flume-sql.conf --name agentOne -Dflume.root.logger=INFO,console

執行成功日誌如下:

  1. 2017-07-0800:12:55,393(lifecycleSupervisor-1-1)[INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)]Monitored counter groupfor type: SINK, name: sinkOne:Successfully registered newMBean.
  2. 2017-07-0800:12:55,394(lifecycleSupervisor-1-1)[INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)]Component type: SINK, name: sinkOne started
  3. 2017-07-0800:12:55,463(SinkRunner-PollingRunner-DefaultSinkProcessor)[INFO - kafka.utils.Logging$class.info(Logging.scala:68)]Fetching metadata from broker id:0,host:localhost,port:9092with correlation id 0for1 topic(s)Set(test)
  4. 2017-07-0800:12:55,528(SinkRunner-PollingRunner-DefaultSinkProcessor)[INFO - kafka.utils.Logging$class.info(Logging.scala:68)]Connected to localhost:9092for producing
  5. 2017-07-0800:12:55,551(SinkRunner-PollingRunner-DefaultSinkProcessor)[INFO - kafka.utils