1. 程式人生 > 其它 >使用ogg同步資料給kafka

使用ogg同步資料給kafka

使用ogg同步資料給kafka

2017-06-20 14:55:3123011收藏9 分類專欄:大資料-Kafka 大資料-Kafka專欄收錄該內容 5 篇文章0 訂閱 訂閱專欄

為更好的配合公司自然人專案的實施,我利用虛機搭了一個測試環境,把生產庫中的部分表的資料通過ogg軟體同步給kafka

1 測試環境說明
1)目標
源資料庫hr使用者下的一張表t1,當發生dml操作時,操作資料寫到kafka叢集裡,並顯示出來。
2)環境構成
192.168.60.88 tdb1
192.168.60.89 reps
192.168.60.91 kafka01
192.168.60.92 kafka02
192.168.60.93 kafka03

tdb1是源,Oracle資料庫,版本是10.2.0.4,sid:tdb
reps是介面伺服器,用於安裝ogg for bigdata軟體,接收源端ogg pump程序發過來的資料,並把這些資料寫入kafka叢集
kafka01-03這三臺組成了一個kafka的叢集
所有這些伺服器的os都是rhel 6.8
tdb1,kafka01-03的jdk版本是1.7.0.79
reps的jdk版本是1.8.0.121,因ogg for bigdata 12.2.xxx的版本需要jdk的版本是1.8以上,jdk 1.7會報版本不足的錯
這五臺伺服器都需關掉防火牆,關掉selinux

2 kafka叢集的安裝

1)下載軟體
zookeeper,版本3.4.10,檔名:zookeeper-3.4.10.tar.gz,下載地址:http://zookeeper.apache.org/releases.html
kafka,版本:2.10-0.10.2.0,檔名:kafka_2.10-0.10.2.0.tgz,下載地址:http://kafka.apache.org/downloads
2)準備工作
kafka01-03這三臺機器調整/etc/hosts檔案

  1. [root@kafka01 ~]# cd /etc
  2. [root@kafka01 etc]# cat hosts
  3. 127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
  4. ::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
  5. 192.168.60.91 kafka01
  6. 192.168.60.92 kafka02
  7. 192.168.60.93 kafka03

kafka01-03這三臺機器都建一個名字為kafka的使用者,這三臺機器的kafka使用者需要做ssh互信任,如何做可以baidu。
kafka01主機,kafka使用者示例:

  1. [kafka@kafka01 ~]$ pwd
  2. /home/kafka
  3. [kafka@kafka01 ~]$ id
  4. uid=500(kafka) gid=501(kafka) groups=501(kafka)
  5. [kafka@kafka01 ~]$
  6. [kafka@kafka01 ~]$ cat .bash_profile
  7. # .bash_profile
  8. # Get the aliases and functions
  9. if [ -f ~/.bashrc ]; then
  10. . ~/.bashrc
  11. fi
  12. # User specific environment and startup programs
  13. PATH=$PATH:$HOME/bin
  14. export PATH
  15. export JAVA_HOME=/usr/java/jdk1.7.0_79
  16. export JRE_HOME=/usr/java/jdk1.7.0_79/jre
  17. export CLASSPATH=.:$JAVA_HOME/lib:$JRE_HOME/lib:$CLASSPATH
  18. export ZOOKEEPER_HOME=/home/kafaka/zookeeper
  19. export PATH=$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:$PATH
  20. [kafka@kafka01 ~]$

3)安裝配置zookeeper
如下操作在kafka01-03都要做,如下kafka01上做示例
a.解壓
zookeeper-3.4.10.tar.gz解壓後,目錄重新命名為:zookeeper,並挪到/home/kafka/下,效果如下:

  1. [kafka@kafka01 zookeeper]$ pwd
  2. /home/kafka/zookeeper
  3. [kafka@kafka01 zookeeper]$ ls
  4. bin conf data docs ivy.xml LICENSE.txt README_packaging.txt recipes zookeeper-3.4.10.jar zookeeper-3.4.10.jar.md5 zookeeper.out
  5. build.xml contrib dist-maven ivysettings.xml lib NOTICE.txt README.txt src zookeeper-3.4.10.jar.asc zookeeper-3.4.10.jar.sha1
  6. [kafka@kafka01 zookeeper]$

b.配置zoo.cfg

  1. cd /home/kafka/ zookeeper
  2. cp zoo_sample.cfg zoo.cfg

編輯zoo.cfg,內容如下:

  1. [kafka@kafka01 conf]$ pwd
  2. /home/kafka/zookeeper/conf
  3. zoo.cfg設定如下引數:
  4. dataDir=/home/kafka/zookeeper/data
  5. clientPort=2181
  6. server.1=kafka01:2888:3888
  7. server.2=kafka02:2888:3888
  8. server.3=kafka03:2888:3888

c.設定節點標識

  1. cd /home/kafka/zookeeper
  2. mkdir data
  3. cd data
  4. vi myid
  5. 輸入1
  6. [kafka@kafka01 data]$ pwd
  7. /home/kafka/zookeeper/data
  8. [kafka@kafka01 data]$ cat myid
  9. 1

d.啟動Zookeeper

  1. [kafka@kafka01 bin]$ pwd
  2. /home/kafka/zookeeper/bin
  3. ./zkServer.sh start
  4. 看狀態:
  5. [kafka@kafka01 bin]$ ./zkServer.sh status
  6. ZooKeeper JMX enabled by default
  7. Using config: /home/kafka/zookeeper/bin/../conf/zoo.cfg
  8. Mode: follower
  9. 三臺中一臺Mode是leader,其餘兩臺Mode為follower
  10. 排錯:
  11. 如果沒啟動,可以使用./zkServer.sh start-foreground啟動,螢幕上會顯示日誌資訊,能看出哪塊出了問題。

4)安裝配置kafka
如下操作在kafka01-03都要做,kafka01上示例如下:
a.解壓
kafka_2.10-0.10.2.0.tgz解壓後,目錄重新命名為:kafka,並挪到/home/kafka/下,效果如下:

  1. [kafka@kafka02 kafka]$ pwd
  2. /home/kafka/kafka
  3. [kafka@kafka02 kafka]$ ls
  4. bin config libs LICENSE logs NOTICE site-docs

b.修改Kafka Servre配置檔案

  1. cd /home/kafka/kafka/config
  2. vi server.properties
  3. [kafka@kafka01 config]$ cat server.properties --注:不需改的條目去掉了
  4. broker.id=1 #kafka01為1,kafka02為2,kafka03為3
  5. host.name=kafka01 #按主機名相應調整
  6. listeners=PLAINTEXT://kafka01:9092 #按主機名相應調整
  7. advertised.listeners=PLAINTEXT://kafka01:9092 #按主機名相應調整
  8. log.dirs=/home/kafka/kafka/logs
  9. num.partitions=4

c.後臺啟動kakfa
在叢集中的這三個節點上分別後臺啟動Kafka,分別執行如下命令:

  1. cd /home/kafka/kafka/bin
  2. ./kafka-server-start.sh/home/kafka/kafka/config/server.properties &

d.測試
建立一個名稱為oggtest的Topic,4個分割槽,並且複製因子為3:
可以任意一節點

  1. ./kafka-topics.sh -create -zookeeper kafka01:2181,kafka02:2181,kafka03:2181 -replication-factor 3 -partitions 3 –topic oggtest

檢視建立的Topic,執行如下命令:
可以任意一節點

  1. ./kafka-topics.sh -describe -zookeeper kafka01:2181,kafka02:2181,kafka03:2181 -topic oggtest
  2. 檢視所有topic:
  3. ./kafka-topics.sh -describe -zookeeper kafka01:2181,kafka02:2181,kafka03:2181

可以通過Kafka自帶的bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh指令碼,來驗證演示如何釋出訊息、消費訊息:
在一個終端,啟動Producer,並向我們上面建立的名稱為oggtest的Topic中生產訊息,執行如下指令碼:

  1. ./kafka-console-producer.sh -broker-list kafka01:9092,kafka02:9092,kafka03:9092 -topic oggtest

在另一個終端,啟動Consumer,並訂閱我們上面建立的Topic中生產的訊息:

  1. ./kafka-console-consumer.sh –zookeeperkafka01:2181,kafka02:2181,kafka03:2181 –from-beginning –topic oggtest

如果kafka叢集配置的沒有問題,隨便在producer視窗敲入一些資訊回車後,在consumer視窗便能看到相應的訊息

3 ogg源端(sdb1主機)的安裝配置
1)準備工作
a.ogg 軟體
軟體名:Oracle GoldenGate V11.2.1.0.3 for Oracle 11g on Linux x86-64.zip,在https://edelivery.oracle.com可以下載到
b.源資料庫要開歸檔,置成force logging,開追加日誌

  1. [oracle@tdb1 ~]$ sqlplus / as sysdba
  2. SQL> archive log list
  3. Database log mode Archive Mode
  4. Automatic archival Enabled
  5. Archive destination /oracle/arc
  6. Oldest online log sequence 9
  7. Next log sequence to archive 11
  8. Current log sequence 11
  9. SQL>
  10. SQL> Select
  11. 2 SUPPLEMENTAL_LOG_DATA_MIN
  12. 3 ,SUPPLEMENTAL_LOG_DATA_PK
  13. 4 ,SUPPLEMENTAL_LOG_DATA_UI
  14. 5 ,SUPPLEMENTAL_LOG_DATA_ALL
  15. 6 , FORCE_LOGGING from v$database;
  16. SUPPLEME SUP SUP SUP FOR
  17. -------- --- --- --- ---
  18. YES NO NO NO YES
  19. SQL>

c.推薦新增一ogg使用者,以避免對oracle使用者的影響,放在oracle使用者的主group,根據資料extract的量為ogg使用者建立一個合適大小的檔案系統
ogg使用者最終效果示例:

  1. [root@tdb1 ~]# su - ogg
  2. [ogg@tdb1 ~]$ id
  3. uid=501(ogg) gid=500(dba) groups=500(dba)
  4. [ogg@tdb1 ~]$
  5. [ogg@tdb1 ~]$ cat .bash_profile
  6. # .bash_profile
  7. # Get the aliases and functions
  8. if [ -f ~/.bashrc ]; then
  9. . ~/.bashrc
  10. fi
  11. # User specific environment and startup programs
  12. PATH=$PATH:$HOME/bin
  13. export PATH
  14. umask 022
  15. export ORACLE_BASE=/oracle/app/oracle
  16. export ORACLE_HOME=$ORACLE_BASE/product/10.2.0
  17. export ORACLE_SID=tdb
  18. export PATH=$ORACLE_HOME/bin:$PATH:.
  19. export NLS_LANG=AMERICAN_AMERICA.ZHS16GBK
  20. export NLS_DATE_FORMAT=YYYYMMDDHH24MISS
  21. export DISPLAY=192.168.60.1:0.0
  22. #ogg
  23. export GG_HOME=/ogg
  24. #export PATH=$PATH:$GG_HOME
  25. export LD_LIBRARY_PATH=/ogg:$ORACLE_HOME/lib
  26. [ogg@tdb1 ~]$
  27. [ogg@tdb1 ~]$ sqlplus / as sysdba
  28. SQL*Plus: Release 10.2.0.5.0 - Production on Thu Apr 13 11:10:59 2017
  29. Copyright (c) 1982, 2010, Oracle. All Rights Reserved.
  30. Connected to:
  31. Oracle Database 10g Enterprise Edition Release 10.2.0.5.0 - 64bit Production
  32. With the Partitioning, Data Mining and Real Application Testing options
  33. SQL>

d.資料庫內建立ogg使用者,並給其授權

  1. create tablespace ogg datafile '/oracle/oradata/tdb/ogg.dbf' size 1G;
  2. create user ogg identified by gg_888 default tablespace ogg;
  3. grant connect,resource to ogg;
  4. grant dba to ogg; --如果不做ddl trigger,dba許可權可以不給
  5. GRANT CREATE SESSION TO ogg;
  6. GRANT ALTER SESSION TO ogg;
  7. GRANT SELECT ANY DICTIONARY TO ogg;
  8. GRANT SELECT ANY TABLE TO ogg;
  9. GRANT ALTER ANY TABLE TO ogg; --使用者配置表級追加日誌
  10. GRANT FLASHBACK ANY TABLE TO ogg;
  11. GRANT EXECUTE on DBMS_FLASHBACK package TO ogg;
  12. GRANT EXECUTE on DBMS_FLASHBACK TO ogg;
  13. GRANT EXECUTE ON utl_file TO ogg;
  14. execute DBMS_STREAMS_AUTH.GRANT_ADMIN_PRIVILEGE('OGG');
  15. grant execute on sys.dbms_lob to ogg;
  16. --如下pl/sql塊是在oracle 11g之上版本用的,10g版本不需要執行
  17. BEGIN
  18. DBMS_GOLDENGATE_AUTH.GRANT_ADMIN_PRIVILEGE(
  19. Grantee => 'OGG',
  20. privilege_type => 'CAPTURE',
  21. grant_select_privileges => TRUE,
  22. do_grants => TRUE);
  23. END;
  24. /

e.為了測試,我建立了一個hr使用者,並在其下面建了一個t1的表

  1. -- Create table
  2. create table T1
  3. (
  4. id NUMBER not null,
  5. name VARCHAR2(100)
  6. )
  7. tablespace USERS;
  8. -- Create/Recreate primary, unique and foreign key constraints
  9. alter table T1
  10. add constraint PK_T1_ID primary key (ID)
  11. using index
  12. tablespace USERS;

2)配置ogg
a.解壓縮ogg軟體,放在$GG_HOME下
效果類似如下:

  1. [ogg@tdb1 ogg]$ ls -l gg*
  2. -rwxr-x--- 1 ogg dba 6577392 Aug 24 2012 ggcmd
  3. -rw-r----- 1 ogg dba 1841 Apr 12 15:58 gglog-defgen.dmp
  4. -rw-r----- 1 ogg dba 1239 Apr 12 16:40 gglog-DPE_TEST-43680.dmp
  5. -rw-r----- 1 ogg dba 962 Apr 12 16:49 gglog-DPE_TEST-43782.dmp
  6. -rw-r----- 1 ogg dba 0 Apr 12 16:40 gglog-DPE_TEST.dmp
  7. -rw-r----- 1 ogg dba 1280336 Aug 24 2012 ggMessage.dat
  8. -rwxr-x--- 1 ogg dba 13899588 Aug 24 2012 ggsci
  9. -rw-rw-rw- 1 ogg dba 21819 Apr 13 08:47 ggserr.log
  10. [ogg@tdb1 ogg]$

b.建立ogg相關子目錄

  1. [ogg@tdb1 ogg]$ pwd
  2. /ogg
  3. [ogg@tdb1 ogg]$ ./ggsci
  4. GGSCI>create subdirs

ggsci報錯處理:

  1. [ogg@tdb1 ogg]$ ggsci
  2. ggsci: error while loading shared libraries: libnnz11.so: cannot open shared object file: No such file or directory
  3. [ogg@tdb1 ogg]$ ldd ggsci
  4. linux-vdso.so.1 => (0x00007ffd3db73000)
  5. libdl.so.2 => /lib64/libdl.so.2 (0x00000035bbc00000)
  6. libgglog.so => /ogg/./libgglog.so (0x00007ff824130000)
  7. libggrepo.so => /ogg/./libggrepo.so (0x00007ff823fdc000)
  8. libdb-5.2.so => /ogg/./libdb-5.2.so (0x00007ff823d3b000)
  9. libicui18n.so.38 => /ogg/./libicui18n.so.38 (0x00007ff8239da000)
  10. libicuuc.so.38 => /ogg/./libicuuc.so.38 (0x00007ff8236a1000)
  11. libicudata.so.38 => /ogg/./libicudata.so.38 (0x00007ff8226c5000)
  12. libpthread.so.0 => /lib64/libpthread.so.0 (0x00000035bc400000)
  13. libxerces-c.so.28 => /ogg/./libxerces-c.so.28 (0x00007ff8221ad000)
  14. libantlr3c.so => /ogg/./libantlr3c.so (0x00007ff822097000)
  15. libnnz11.so => not found
  16. libclntsh.so.11.1 => not found
  17. libstdc++.so.6 => /usr/lib64/libstdc++.so.6 (0x00000035c7400000)
  18. libm.so.6 => /lib64/libm.so.6 (0x00000035bcc00000)
  19. libgcc_s.so.1 => /lib64/libgcc_s.so.1 (0x00000035c7000000)
  20. libc.so.6 => /lib64/libc.so.6 (0x00000035bc000000)
  21. /lib64/ld-linux-x86-64.so.2 (0x00000035bb800000)
  22. [oracle@tdb1 ~]$ cd $ORACLE_HOME/lib
  23. [oracle@tdb1 lib]$ ln -s libnnz10.so libnnz11.so
  24. [oracle@tdb1 lib]$ ln -s libclntsh.so libclntsh.so.11.1
  25. [oracle@tdb1 lib]$ ls -l libclntsh.so.11.1
  26. lrwxrwxrwx 1 oracle dba 12 Apr 11 22:33 libclntsh.so.11.1 -> libclntsh.so
  27. [oracle@tdb1 lib]$ ls -l libnnz11.so
  28. lrwxrwxrwx 1 oracle dba 11 Apr 11 22:31 libnnz11.so -> libnnz10.so

c.開啟hr.t1表級附加日誌

  1. [ogg@tdb1 ogg]$ ./ggsci
  2. GGSCI>Dblogin userid ogg, password gg_888
  3. Add trandata hr.t1

d.配置ogg manager

  1. [ogg@tdb1 ogg]$ ./ggsci
  2. GGSCI>edit params mgr
  3. 內容如下,儲存
  4. PORT 7809
  5. DYNAMICPORTLIST 7810-7860
  6. AUTORESTART EXTRACT *, RETRIES 5, WAITMINUTES 3, RESETMINUTES 60
  7. PURGEOLDEXTRACTS ./dirdat/*, usecheckpoints, minkeepdays 1
  8. LAGREPORTHOURS 1
  9. LAGINFOMINUTES 30
  10. LAGCRITICALMINUTES 45
  11. 啟動OGG manager
  12. GGSCI>start mgr
  13. 檢視manager程序狀態,正確的形態如下:
  14. GGSCI (tdb1) 1> info mgr
  15. Manager is running (IP port tdb1.7809).

e.建立Extract

  1. GGSCI>edit params ext_test
  2. 內容如下,儲存
  3. EXTRACT ext_test
  4. Setenv (NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
  5. USERID ogg, PASSWORD gg_888
  6. gettruncates
  7. DISCARDFILE ./dirrpt/ext_test.dsc, APPEND, MEGABYTES 1024
  8. DBOPTIONS ALLOWUNUSEDCOLUMN
  9. REPORTCOUNT EVERY 1 MINUTES, RATE
  10. WARNLONGTRANS 2h,CHECKINTERVAL 3m
  11. FETCHOPTIONS NOUSESNAPSHOT
  12. TRANLOGOPTIONS CONVERTUCS2CLOBS
  13. EXTTRAIL ./dirdat/te
  14. WILDCARDRESOLVE DYNAMIC
  15. GETUPDATEBEFORES
  16. NOCOMPRESSUPDATES
  17. NOCOMPRESSDELETES
  18. dynamicresolution
  19. table hr.t1;
  20. 新增抽取程序組:
  21. GGSCI>add extract ext_test, TRANLOG, BEGIN NOW
  22. 定義trail檔案:
  23. GGSCI> ADD EXTTRAIL ./dirdat/te, EXTRACT ext_test, MEGABYTES 200

f.pump extract程序

  1. GGSCI>edit param dpe_test
  2. 內容如下,儲存
  3. EXTRACT dpe_test
  4. PASSTHRU
  5. RMTHOST 192.168.60.89, MGRPORT 7809
  6. RMTTRAIL ./dirdat/te
  7. DYNAMICRESOLUTION
  8. TABLE hr.t1;
  9. 新增pump捕獲組:
  10. GGSCI> ADD EXTRACT dpe_test, EXTTRAILSOURCE ./dirdat/te
  11. 定義pump trail檔案:
  12. GGSCI> ADD RMTTRAIL ./dirdat/te, EXTRACT dpe_test, MEGABYTES 200

g.啟動捕獲程序

  1. GGSCI> start extract ext_test;
  2. GGSCI> start extract dpe_test;
  3. 看狀態,如果如正就是對的:
  4. GGSCI> info all
  5. Program Status Group Lag at Chkpt Time Since Chkpt
  6. MANAGER RUNNING
  7. EXTRACT RUNNING DPE_TEST 00:00:00 00:00:03
  8. EXTRACT RUNNING EXT_TEST 00:00:00 00:00:01

4 介面機reps安裝配置
1)安裝OGG for Big Data
a.如源端類以,解壓縮ogg for big data軟體,放在$GG_HOME下
b./etc/hosts檔案

  1. [root@reps etc]# cd /etc
  2. [root@reps etc]# cat hosts
  3. 127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
  4. ::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
  5. 192.168.60.89 reps
  6. 192.168.60.91 kafka01
  7. 192.168.60.92 kafka02
  8. 192.168.60.93 kafka03
  9. [root@reps etc]#

c.安裝jdk 1.8及之以的版本
ogg for big data 12.2.xx需要jdk 1.8以上的版本,我這裡用的是1.8.0_121

  1. [ogg@reps ogg]$ java -version
  2. java version "1.8.0_121"
  3. Java(TM) SE Runtime Environment (build 1.8.0_121-b13)
  4. Java HotSpot(TM) 64-Bit Server VM (build 25.121-b13, mixed mode)

d.建立ogg使用者,配置環境變數,安裝kafka軟體

  1. [root@reps etc]# su - ogg
  2. [ogg@reps ~]$ id
  3. uid=500(ogg) gid=501(ogg) groups=501(ogg)
  4. [ogg@reps ~]$ cat .bash_profile
  5. # .bash_profile
  6. # Get the aliases and functions
  7. if [ -f ~/.bashrc ]; then
  8. . ~/.bashrc
  9. fi
  10. # User specific environment and startup programs
  11. PATH=$PATH:$HOME/bin
  12. export PATH
  13. export OGG_HOME=/ogg
  14. export PATH=$PATH:$GG_HOME
  15. export LD_LIBRARY_PATH=$OGG_HOME:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64
  16. [ogg@reps ~]$
  17. [ogg@reps ~]$ ls -l
  18. total 8
  19. drwxrwxr-x 2 ogg ogg 4096 Apr 11 22:56 install
  20. drwxr-xr-x 6 ogg ogg 4096 Feb 15 01:28 kafka --把kafka軟體包解壓到這,也可以從kafka主機拷貝這個目錄
  21. [ogg@reps ~]$
  22. [ogg@reps ~]$ cd /ogg
  23. [ogg@reps ogg]$ ls -l ggsci
  24. -rwxr-x--- 1 ogg ogg 39120528 Oct 20 07:05 ggsci
  25. [ogg@reps ogg]$

2)配置OGG for kafka
a.啟動ogg,並建立相關子目錄

  1. ./ggsci
  2. GGSCI>create subdirs

b.複製example

  1. cp $OGG_HOME/AdapterExamples/big-data/kafka/* $OGG_HOME/dirprm/

c.配置manager

  1. GGSCI>edit params mgr
  2. 內容如下:
  3. PORT 7809
  4. DYNAMICPORTLIST 7810-7860
  5. AUTORESTART REPLICAT *, RETRIES 5, WAITMINUTES 3, RESETMINUTES 60
  6. PURGEOLDEXTRACTS ./dirdat/*, usecheckpoints, minkeepdays 1
  7. LAGREPORTHOURS 1
  8. LAGINFOMINUTES 30
  9. LAGCRITICALMINUTES 45

d.配置kafka.props
內容如下:

  1. [ogg@reps dirprm]$ cat kafka.props
  2. gg.handlerlist = kafkahandler
  3. gg.handler.kafkahandler.type = kafka
  4. gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
  5. gg.handler.kafkahandler.TopicName =oggtest
  6. gg.handler.kafkahandler.format =avro_op
  7. gg.handler.kafkahandler.format=delimitedtext
  8. gg.handler.kafkahandler.format.fieldDelimiter=|
  9. gg.handler.kafkahandler.SchemaTopicName=myoggtest
  10. gg.handler.kafkahandler.BlockingSend =false
  11. gg.handler.kafkahandler.includeTokens=false
  12. gg.handler.kafkahandler.mode =op
  13. #gg.handler.kafkahandler.maxGroupSize =100, 1Mb
  14. #gg.handler.kafkahandler.minGroupSize =50, 500Kb
  15. goldengate.userexit.timestamp=utc
  16. goldengate.userexit.writers=javawriter
  17. javawriter.stats.display=TRUE
  18. javawriter.stats.full=TRUE
  19. gg.log=log4j
  20. gg.log.level=INFO
  21. gg.report.time=30sec
  22. #Sample gg.classpath for Apache Kafka
  23. gg.classpath=dirprm/:/home/ogg/kafka/libs/*
  24. #Sample gg.classpath for HDP
  25. #gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/*
  26. javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar

說明:
gg.handler.kafkahandler.TopicName必須指定kafka端定義的topic
gg.handler.kafkahandler.format下面配置使用文字,並用”|”相隔,kafka最終接收到如下格式的訊息。
gg.classpath須指定相應的lib路徑
e.配置custom_kafka_producer.properties
內容如下:

  1. bootstrap.servers=kafka01:9092,kafka02:9092,kafka03:9092
  2. acks=1
  3. compression.type=gzip
  4. reconnect.backoff.ms=1000
  5. value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
  6. key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
  7. # 100KB per partition
  8. batch.size=102400
  9. linger.ms=10000

3)表結構傳遞
源端:

  1. GGSCI> ggsci
  2. GGSCI> edit param defgen
  3. 內容
  4. DEFSFILE dirdef/source.def, PURGE
  5. USERID ogg, PASSWORD gg_888
  6. TABLE hr.t1 ;
  7. [ogg@tdb1 ogg]$ defgen paramfile dirprm/defgen.prm --shell命令

把defgen.prm放到介面機(reps)的/ogg/dirdef/下
4)定義replication
a.定義引數

  1. ./ggsci
  2. GGSCI>edit params rep_test
  3. 輸入如下內容:
  4. REPLICAT rep_test
  5. TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
  6. SOURCEDEFS dirdef/source.def
  7. REPORTCOUNT EVERY 1 MINUTES, RATE
  8. GROUPTRANSOPS 10000
  9. MAP hr.*, TARGET hr.*;

b.指定Trail檔案

  1. GGSCI> add replicat rep_test, exttrail ./dirdat/te

c.啟動replicat程序,並檢查狀態

  1. GGSCI> start replicat rep_test
  2. 檢查狀態,類似如下輸出表示正常
  3. GGSCI (reps) 1> info all
  4. Program Status Group Lag at Chkpt Time Since Chkpt
  5. MANAGER RUNNING
  6. REPLICAT RUNNING REP_TEST 00:00:00 00:00:05

其它:如果replicat程序啟動不了,可以使用如下命令啟動,以方便診斷問題:

  1. cd $OGG_HOME
  2. ./replicat paramfile dirprm/rep_test.prm

5 測試驗證
1)啟動kafka consumerconsole
kafka任一結點:

  1. ./kafka-console-consumer.sh -zookeeper :2181 -topic oggtest -from-beginning

2)在源端測試表中插入資料

  1. sqlplus hr/hr
  2. SQL> insert into t1 values(5,'shihaifeng');
  3. 1 row created.
  4. SQL> commit;
  5. Commit complete.

3)檢視kafka消費控制檯是否接收到該行資料
我的有如下顯示:

  1. I|HR.T1|2017-04-13 03:31:03.835147|2017-04-13T11:31:08.973000|00000000000000001478|5|shihaifeng