1. 程式人生 > 實用技巧 >基於OGG實現ORACLE同步至KAFKA實施方案

基於OGG實現ORACLE同步至KAFKA實施方案

一、背景

本文基於Oracle OGG,介紹一種將Oracle資料庫的資料實時同步到Kafka訊息佇列的方法。Kafka是一種高效的訊息佇列實現,通過訂閱kafka的訊息佇列,下游系統可以實時獲取線上Oracle系統的資料變更情況,實現從OLTP系統中獲取資料變更,實時同步到下游業務系統。

二、環境介紹

1、元件版本

元件

版本

作業系統

IP地址

描述

源端Oracle

11.2.0.4.0

Red Hat 6.8

192.168.140.186

源端Oracle資料庫

源端OGG

12.1.2.1.0

Red Hat 6.8

192.168.140.186

源端OGG,用於抽取源端Oracle資料變更,並將變更日誌傳送到目標端

目標端OGG

12.3.2.1.1

Red Hat 7.5

192.168.83.227、228、229

目標端OGG,接受源端傳送的Oracle事務變更日誌,並將變更推送到kafka訊息佇列

目標端kafka

2.12-1.1.0

Red Hat 7.5

192.168.83.227、228、229

訊息佇列,接收目標端OGG推送過來的資料

2、整體架構圖

3、名詞解釋

1) OGG Manager:OGG Manager用於配置和管理其它OGG元件,配置資料抽取、資料推送、資料複製,啟動和停止相關元件,檢視相關元件的執行情況。

2) 資料抽取(Extract):抽取源端資料庫的變更(DML, DDL)。資料抽取主要分如下2種類型:

a.本地抽取:從本地資料庫捕獲增量變更資料,寫入到本地Trail檔案

b.初始資料抽取:從資料庫表中匯出全量資料,用於初次資料載入

3) 資料推送(Data Pump):Data Pump是一種特殊的資料抽取(Extract)型別,從本地Trail檔案中讀取資料,並通過網路將資料傳送到目標端OGG

4) 4.Trail檔案:資料抽取從源端資料庫抓取到的事物變更資訊會寫入到Trail檔案。

5) 資料接收(Collector):資料接收程式執行在目標端機器,用於接收Data Pump傳送過來的Trail日誌,並將資料寫入到本地Trail檔案。

6) 資料複製(Replicat):資料複製執行在目標端機器,從Trail檔案讀取資料變更,並將變更資料應用到目標端資料儲存系統。本案例中,資料複製將資料推送到kafka訊息佇列。

7) 檢查點(Checkpoint):檢查點用於記錄資料庫事物變更。

二、操作步驟

1、源端安裝配置(192.168.140.186)

(1)源端安裝了ORACLE 版本11.2.0.4

alter system set enable_goldengate_replication=TRUE;

Alter database add supplemental log data;

alter database force logging;

Select supplemental_log_data_min from v$database;

Create user ogg identified by "Qwer!234" Default tablespace users temporary tablespace temp profile DEFAULT;

Grant connect to ogg;

grant resource to ogg;

grant dba to ogg;

grant alter session to ogg;

grant create session to ogg;

grant select any dictionary to ogg;

grant select any table to ogg;

grant insert any table to ogg;

grant delete any table to ogg;

grant update any table to ogg;

grant alter any table to ogg;

grant create table to ogg;

grant lock any table to ogg;

grant flashback any table to ogg;

Grant unlimited tablespace to ogg;

(2)源端安裝了OGG版本12.1.2.1.0

--建立使用者

$useradd -g oinstall gg

--設定gg環境變數

將oracle的環境變數拷貝至gg使用者

--建立目錄

Mkdir /oracle/gg

Chown oracle:oinstall /oracle/gg

--解壓安裝

Unzip 121210_fbo_ggs_Linux_x64_shiphome.zip

vi /oracle/gg/fbo_ggs_Linux_x64_shiphome/Disk1/response/oggcore.rsp

INSTALL_OPTION=ORA11g

SOFTWARE_LOCATION=/oracle/gg

START_MANAGER=false

cd /oracle/gg/fbo_ggs_Linux_x64_shiphome/Disk1

./runInstaller -responseFile /oracle/gg/fbo_ggs_Linux_x64_shiphome/Disk1/response/oggcore.rsp -silent -ignoreSysPrereqs -ignorePrereq –local

--建立子目錄

./ggsci

create SUBDIRS

start mgr

(3)源端配置OGG

/*網路擷取資料

GGSCI (zwjfdb3) 7> view param EZWJFBOR

EXTRACT EZWJFBOR

SETENV (NLS_LANG="AMERICAN_AMERICA.AL32UTF8")

SETENV (ORACLE_HOME = "/u01/app/oracle/product/11.2.0/db_1")

SETENV (ORACLE_SID = "zwjfdb3")

--捕獲 truncate 操作

gettruncates

--定義discardfile檔案位置,如果處理中有記錄出錯會寫入到此檔案中

DISCARDFILE ./dirrpt/ezwjfbor.dsc, APPEND, MEGABYTES 1024

--動態解析表名

DYNAMICRESOLUTION

--獲取更新之前資料

GETUPDATEBEFORES

--當抽取程序遇到一個沒有使用的欄位時只生成一個警告,程序會繼續執行而不會被異常終止(abend)

DBOPTIONS ALLOWUNUSEDCOLUMN

--每隔30分鐘報告一次從程式開始到現在的抽取程序或者複製程序的事物記錄數,並彙報程序的統計資訊

REPORTCOUNT EVERY 30 MINUTES, RATE

--每隔3分鐘檢查一下大事務,超過2小時還沒結束的進行報告

WARNLONGTRANS 2h,CHECKINTERVAL 3m

--不會從閃回日誌中獲取資料

FETCHOPTIONS NOUSESNAPSHOT

USERID xxxxxx,PASSWORD xxxxxx

EXTTRAIL ./dirdat/zb

TABLE xx.xx;

TABLE xx.xx;

#新增抽取程序

GGSCI (zwjfdb3) 11> add extract EZWJFBOR,TRANLOG, BEGIN NOW

EXTRACT added.

#定義trail檔案

GGSCI (zwjfdb3) 12> ADD EXTTRAIL ./dirdat/zb,EXTRACT EZWJFBOR, MEGABYTES 200

EXTTRAIL added.

#pump extract程序

GGSCI (zwjfdb3) 8> view param PZWJFBOR

EXTRACT PZWJFBOR

SETENV (NLS_LANG = "AMERICAN_AMERICA.AL32UTF8")

PASSTHRU

DYNAMICRESOLUTION

RMTHOST xx.xx.xx.xx,MGRPOT 7809

RMTTRAIL ./dirdat/zb

TABLE xx.xx;

TABLE xx.xx;

#新增pump捕獲組

GGSCI (zwjfdb3) 23> ADD EXTRACT PZWJFBOR,EXTTRAILSOURCE ./dirdat/zb

EXTRACT added.

#定義pump trail檔案

GGSCI (zwjfdb3) 25> ADD RMTTRAIL ./dirdat/zb,EXTRACT PZWJFBOR, MEGABYTES 200

RMTTRAIL added.

*/

1、ORACLE建立測試表

create tablespace TBSDATA datafile size 1G;

create user linq identified by "Qwer!234" default tablespace TBSDATA;

grant connect to linq;

grant resource to linq;

create table linq.test_ogg(id number ,name varchar2(200),primary key(id));

2、配置MGR

>edit params mgr

PORT 7809

PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3

AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 2

 3、配置捕獲程序
 #一定要記得同步之前要開啟表的全補充日誌
#alter table linq.test_ogg add supplemental log data (all) columns;

>dblogin userid ogg@ORCL,password Qwer!234

>add extract E_LINQ,tranlog,begin now

>ADD EXTTRAIL ./dirdat/lq, EXTRACT E_LINQ

>add trandata linq.test_ogg

>info trandata linq.test_ogg

>edit params E_LINQ

extract E_LINQ

--動態解析表名

dynamicresolution

SETENV (ORACLE_SID = "newdb")

SETENV (NLS_LANG = "AMERICAN_AMERICA.ZHS16GBK")

--SETENV (NLS_LANG = "american_america.AL32UTF8")

userid ogg@ORCL,password Qwer!234

--獲取更新之前全部欄位資料(如果註釋,before顯示空,需與UPDATERECORDFORMAT COMPACT一起配置)

GETUPDATEBEFORES

--獲取刪除前非主鍵欄位(如果註釋,before只會顯示主鍵)

NOCOMPRESSDELETES

--獲取更新前非主鍵欄位(目前沒發現用途)

NOCOMPRESSUPDATES

--獲取更新的前後映象資訊(如果註釋,before顯示空,需與GETUPDATEBEFORES一起配置)

UPDATERECORDFORMAT COMPACT

--定義discardfile檔案位置,如果處理中有記錄出錯會寫入到此檔案中

discardfile ./dirrpt/E_LINQ.dsc,purge,megabytes 20000

warnlongtrans 2h,checkinterval 3m

exttrail ./dirdat/lq

PURGEOLDEXTRACTS ./dirdat/lq*,usecheckpoints, minkeepdays 3

TRANLOGOPTIONS DBLOGREADER

numfiles 3000

allocfiles 200

table linq.test_ogg;

4、配置傳輸程序

>add extract P_LINQ,exttrailsource ./dirdat/lq

> ADD EXTTRAIL ./dirdat/lq, EXTRACT P_LINQ

>add rmttrail ./dirdat/lq,extract P_LINQ

>edit params P_LINQ

extract P_LINQ

SETENV (NLS_LANG = "AMERICAN_AMERICA.ZHS16GBK")

passthru

dynamicresolution

rmthost 192.168.73.227,mgrport 7809 ,compress

rmttrail ./dirdat/lq

numfiles 3000

table linq.test_ogg;

5、配置define定義檔案

> edit param test_ogg

defsfile ./dirdef/linq.test_ogg

userid ogg@ORCL,password Qwer!234

table linq.test_ogg;

在OGG主目錄下執行:

./defgen paramfile dirprm/test_ogg.prm

 
注:目標端安裝後,將生成的./dirdef/linq.test_ogg傳送的目標端ogg目錄下的dirdef裡

2、目標端安裝配置(192.168.73.227)

(1)目標端安裝kafka,版本2.12-1.1.0,已安裝
 
(2)目標端安裝OGG,版本12.3.2.1.1
 groupadd gg
 useradd -g gg -G gg gg
 passwd gg
su - gg
unzip
unzip OGG_BigData_Linux_x64_12.3.2.1.1.zip
tar xf OGG_BigData_Linux_x64_12.3.2.1.1.tar -C /data/gg/
 
vi /home/gg/.bash_profile
export OGG_HOME=/data/gg 
export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so:$OGG_HOME/lib 
export PATH=$OGG_HOME:$PATH
. .bash_profile

測試ggsci



建立子目錄create subdirs


(3)定義檔案拷貝
從源端192.168.140.128的定義檔案/oracle/gg/dirdef/linq.test_ogg拷貝
到目標端/data/gg/dirdef/linq.test_ogg
scp /oracle/gg/dirdef/linq.test_ogg [email protected]:/data/gg/dirdef/
(4)kafka建立主題
kafka-topics.sh --create --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --replication-factor 3 --partitions 3 --topic testogg2
kafka-topics.sh --describe --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --topic testogg2
(5)開啟kafka程序
--83.227開啟
 kafka-server-start.sh -daemon /data/kfdata/kafka/config/server.properties
(6)配置管理器mgr
>edit param mgr 
PORT 7809 
DYNAMICPORTLIST 7810-7909 
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3

(7)配置checkpoint
>edit param ./GLOBALS
CHECKPOINTTABLE test_ogg.checkpoint
(8)配置replicate程序
>edit param rekafka
REPLICAT rekafka
sourcedefs /data/gg/dirdef/linq.test_ogg 
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props 
REPORTCOUNT EVERY 1 MINUTES, RATE 
GROUPTRANSOPS 10000 
MAP linq.test_ogg, TARGET linq.test_ogg;
說明:REPLICATE rekafka定義rep程序名稱;sourcedefs即在4.6中在源伺服器上做的表對映檔案;TARGETDB LIBFILE即定義kafka一些適配性的庫檔案以及配置檔案,配置檔案位於OGG主目錄下的dirprm/kafka.props;REPORTCOUNT即複製任務的報告生成頻率;GROUPTRANSOPS為以事務傳輸時,事務合併的單位,減少IO操作;MAP即源端與目標端的對映關係。
(9)配置kafka.props(備註不能配進去)
cd /data/gg/dirprm/
vi kafka.props
/*
gg.handlerlist=kafkahandler //handler型別 
gg.handler.kafkahandler.type=kafka gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties //kafka相關配置 
gg.handler.kafkahandler.topicMappingTemplate=test_ogg //kafka的topic名稱,可先手動建立好,預設建立的話partition數只有1
gg.handler.kafkahandler.format=json //傳輸檔案的格式,支援json,xml等 
gg.handler.kafkahandler.format.includePrimaryKeys=true --包含主鍵
gg.handler.kafkahandler.mode=op //OGG for Big Data中傳輸模式,即op為一次SQL傳輸一次,tx為一次事務傳輸一次 
gg.classpath=dirprm/:/data/kfdata/kafka/libs/*:/data/gg/:/data/gg/lib/*
*/
無備註版:
gg.handlerlist=kafkahandler
gg.handler.kafkahandler.type=kafka 
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
gg.handler.kafkahandler.topicMappingTemplate=test_ogg
gg.handler.kafkahandler.format=json
gg.handler.kafkahandler.format.includePrimaryKeys=true 
gg.handler.kafkahandler.mode=op
gg.classpath=dirprm/:/data/kfdata/kafka/libs/*:/data/gg/:/data/gg/lib/*

vi custom_kafka_producer.properties
/*
bootstrap.servers=192.168.83.227:9092 //kafkabroker的地址 
acks=1 
compression.type=gzip //壓縮型別 
reconnect.backoff.ms=1000 //重連延時 
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer 
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer 
batch.size=102400 
linger.ms=10000
*/
無備註版:
bootstrap.servers=192.168.83.227:9092
acks=1 
compression.type=gzip
reconnect.backoff.ms=1000
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer 
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer 
batch.size=102400 
linger.ms=10000
(10)新增trail檔案到replicate程序
add replicat rekafka exttrail /data/gg/dirdat/lq,checkpointtable test_ogg.checkpoint
(11)開啟源端與目標端ogg
 
(12)測試
1、源端資料入庫
conn linq/Qwer!234
insert into test_ogg values(2,'go');
commit;
2、檢視目標端kafka主題是否建立
kafka-topics.sh --list --zookeeper localhost:2181


3、進行消費測試
kafka-console-consumer.sh --bootstrap-server 192.168.83.227:9092 --from-beginning --topic test_ogg
(用kafka埠9092,consumer的資訊將會存放在kafka之中,推薦)
或:
kafka-console-consumer.sh --zookeeper 192.168.83.227:2181 --from-beginning --topic test_ogg
(用zookeeper埠2181,consumer的資訊將會存放在zk之中)
測試結果如下(中文也是支援的):

[root@kafka3 data]# kafka-console-consumer.sh --bootstrap-server 192.168.83.227:9092 --from-beginning --topic test_ogg
{"table":"LINQ.TEST_OGG","op_type":"I","op_ts":"2020-03-06 17:12:03.648091","current_ts":"2020-03-06T17:12:10.737000","pos":"00000000000000001432","primary_keys":["ID"],"after":{"ID":2,"NAME":"go"}}
{"table":"LINQ.TEST_OGG","op_type":"I","op_ts":"2020-03-06 17:21:23.652647","current_ts":"2020-03-06T17:21:31.368000","pos":"00000000000000001569","primary_keys":["ID"],"after":{"ID":3,"NAME":"ok"}}
{"table":"LINQ.TEST_OGG","op_type":"U","op_ts":"2020-03-06 17:22:10.653057","current_ts":"2020-03-06T17:22:17.411000","pos":"00000000000000001701","primary_keys":["ID"],"before":{"ID":3,"NAME":"ok"},"after":{"ID":3,"NAME":"ok3"}}
{"table":"LINQ.TEST_OGG","op_type":"D","op_ts":"2020-03-06 17:22:58.653488","current_ts":"2020-03-06T17:23:05.454000","pos":"00000000000000001858","primary_keys":["ID"],"before":{"ID":1,"NAME":"begin"}}
{"table":"LINQ.TEST_OGG","op_type":"I","op_ts":"2020-03-07 19:21:07.411700","current_ts":"2020-03-07T19:21:12.465000","pos":"00000000000000001994","primary_keys":["ID"],"after":{"ID":1,"NAME":"begin"}}
{"table":"LINQ.TEST_OGG","op_type":"I","op_ts":"2020-03-10 15:23:20.354624","current_ts":"2020-03-10T15:23:27.371000","pos":"00000000000000002130","primary_keys":["ID"],"after":{"ID":4,"NAME":"linq"}}
{"table":"LINQ.TEST_OGG","op_type":"U","op_ts":"2020-03-10 15:30:19.357882","current_ts":"2020-03-10T15:30:25.697000","pos":"00000000000000002266","primary_keys":["ID"],"before":{"ID":4,"NAME":"linq"},"after":{"ID":4,"NAME":"林勤"}}