使用ogg實現oracle到kafka的增量資料實時同步
阿新 • • 發佈:2022-04-11
使用ogg實現oracle到kafka的增量資料實時同步
彬彬
2022.04.07
一、OGG概述
OGG全稱為Oracle GoldenGate,是由Oracle官方提供的用於解決異構資料環境中資料複製的一個商業工具。相比於其它遷移工具OGG的優勢在於可以直接解析源端Oracle的redolog,因此能夠實現在不需要對原表結構做太多調整的前提下完成資料增量部分的同步。基於Oracle OGG,介紹一種將Oracle資料庫的資料實時同步到Kafka訊息佇列的方法。
1、OGG邏輯架構
2、 OGG概念
Manager程序:需要源端跟目標端同時執行,主要作用是監控管理其它程序,報告錯誤,分配及清理資料儲存空間,釋出閾值報告等。 Extract程序:執行在資料庫源端,主要用於捕獲資料的變化,負責全量、增量資料的抽取。 Data Pump程序:執行在資料庫源端,屬於Extract程序的一個輔助程序,,從本地Trail檔案中讀取資料,並通過網路將資料傳送到目標端OGG。 Collector程序:資料接收程式執行在目標端機器,用於接收Data Pump傳送過來的Trail日誌,並將資料寫入到本地Trail檔案。 Replicat程序:資料複製(Replicat):資料複製執行在目標端機器,從Trail檔案讀取資料變更,並將變更資料應用到目標端資料儲存系統。本案例中,資料複製將資料推送到kafka訊息佇列。*** Trails檔案:臨時存放在磁碟上的資料檔案。
3、OGG檢查點
作為一個複製軟體,首要是考察是它的可靠性,確保事務的完整性,在複製的過程中,源端和目標端的一致性。在日常運維可能會發生各種故障:程序故障、trail檔案故障、網路故障、伺服器故障等等。然後OGG各種故障的解決辦法:一是靠程序的自動重啟機制,二是靠checkpoint機制,保證在各種故障情況下不丟資料。
OGG檢查點:記錄程序的讀、寫的位置,在恢復時需要使用,保證事務的完整性。
OGG兩種儲存方式:
1)存放在dirchk下的檔案中
2)存放在指定的checkpoint table
對比: 1)nodbcheckpoint:效能較高 2)checkpointtable:檢查點資訊儲存在資料庫表中,和實際事務作為一個事務提交,可以從資料表中找到更多的資訊
檢查點分為Startup檢查點資訊、Recovery檢查點、Current檢查點
1)Startup檢查點資訊:程序啟動時,會建立startup檢查點
2)Recovery檢查點:程序恢復時,需要從哪個點開始恢復
3)Current檢查點:程序當前(最近的)檢查點資訊
檢視指令:info ext1, showch
3.1 檢查點-extract程序
1、讀檢查點:讀到哪個日誌檔案及相對位移值
1)有startup、recovery、current checkpoint
2)一般是修改current checkpoint來調整日誌檔案讀的位置alter extract ext1, [thread n,] extseqno , extrba 0
2、寫檢查點:正在寫到的trail檔案編號及相對位移值
1)有current checkpoint
2)修改寫檢查點:重啟程序或者etrollover
alter extract ext1, etrollover
(執行完成之後注意需要手工設定pmp程序的讀檢查點位置:
info ext1, showch確認新的寫檢查點的trail檔案編號為N,然後
alter pmp1, extseqno N, extrba 0)
3.2 檢查點-pump程序
1、讀檢查點:讀到的trail檔案的編號和具體位元組位置
1)有startup、current checkpoint
2)通過alter pmp1, extseqno N, extrba 0來修改
2、寫檢查點:正在寫的remote trail檔案編號及相對位移值
1)有current checkpoint
2)通過alter pmp1, etrollover來修改但是修改後同樣需要手工調整replicate程序讀的位置:alter rep1, extseqno N, extrba 0 (N為新的pmp1程序寫檢查點remote trial檔案編號)
3.3 檢查點-replicat程序
1、讀檢查點:讀到的trail檔案的編號和具體位元組位置
1)有startup、current checkpoint
2)修改當前讀檢查點位置
alter rep1, extseqno N, extrba 0
2、寫檢查點:無
二、OGG配置
1、環境資訊
**元件 ** | 版本 | **IP地址 ** | 描述 |
---|---|---|---|
源端Oracle | 11.2.0.4.0 | 192.168.152.101 | 源端Oracle資料庫 |
源端OGG | 12.3.2.1.1 | 192.168.152.101 | 源端OGG,用於抽取源端Oracle資料變更,並將變更日誌傳送到目標端 |
目標端OGG | 12.3.2.1.1 | 192.168.152.101 | 目標端OGG,接受源端傳送的Oracle事務變更日誌,並將變更推送到kafka訊息佇列 |
目標端Kafka | 2.11-0.11.0.0 | 192.168.152.101、102、103 | 訊息佇列,接收目標端OGG推送過來的資料 |
2、源端OGG配置
源端OGG 管理程序(mgr)配置:
PORT 7809
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3
說明
PORT 即mgr的預設監聽埠;
DYNAMICPORTLIST 動態埠列表,當指定的mgr埠不可用時,會在這個埠列表中選擇一個,最大指定範圍為256個;
AUTORESTART 重啟引數設定表示重啟所有EXTRACT程序,最多5次,每次間隔3分鐘;
PURGEOLDEXTRACTS 即TRAIL檔案的定期清理
源端OGG 抽取程序(extract)配置
extract extkafka
dynamicresolution
SETENV (ORACLE_SID = "orcl")
SETENV (NLS_LANG = "american_america.AL32UTF8")
userid ogg,password ogg
exttrail /opt/ogg/dirdat/to
table test_ogg.test_ogg;
說明
第一行指定extract程序名稱;
dynamicresolution 動態解析;
SETENV 設定環境變數,這裡分別設定了Oracle資料庫以及字符集;
userid ogg,password ogg 即OGG連線Oracle資料庫的帳號密碼
exttrail 定義trail檔案的儲存位置以及檔名,注意這裡檔名只能是2個字母,其餘部分OGG會補齊;
table 即複製表的表名,支援*通配,必須以;結尾
源端OGG 傳輸程序(pump)配置
extract pukafka
passthru
dynamicresolution
userid ogg,password ogg
rmthost 192.168.152.131 mgrport 7909
rmttrail /home/gg/ogg/dirdat/go
table test_ogg.test_ogg;
說明
第一行指定extract程序名稱;
Passthru 即禁止OGG與Oracle互動,我們這裡使用pump邏輯傳輸,故禁止即可;
Dynamicresolution 動態解析;
userid ogg,password ogg 即OGG連線Oracle資料庫的帳號密碼
rmthost和mgrhost 即目標端(kafka)OGG的mgr服務的地址以及監聽埠;
rmttrail 即目標端trail檔案儲存位置以及名稱。
3、目標端OGG配置
目標端OGG管理程序(mgr)配置:
PORT 7909
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3
說明
PORT 即mgr的預設監聽埠;
DYNAMICPORTLIST 動態埠列表,當指定的mgr埠不可用時,會在這個埠列表中選擇一個,最大指定範圍為256個;
AUTORESTART 重啟引數設定表示重啟所有EXTRACT程序,最多5次,每次間隔3分鐘;
PURGEOLDEXTRACTS 即TRAIL檔案的定期清理
目標端OGG複製程序(replicat)配置:
REPLICAT rekafka
sourcedefs /home/gg/ogg/dirdef/test_ogg.test_ogg
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP test_ogg.test_ogg, TARGET test_ogg.test_ogg;
說明
REPLICATE rekafka 定義rep程序名稱;
Sourcedefs 即在源伺服器上做的表對映檔案;
TARGETDB LIBFILE 即定義kafka一些適配性的庫檔案以及配置檔案,配置檔案位於OGG主目錄下的dirprm/kafka.props;
REPORTCOUNT 即複製任務的報告生成頻率;
GROUPTRANSOPS 為以事務傳輸時,事務合併的單位,減少IO操作;
MAP 即源端與目標端的對映關係
三、資料測試
啟動所有程序
在源端和目標端的OGG命令列下使用start [程序名]的形式啟動所有程序。
啟動順序按照源mgr——目標mgr——源extract——源pump——目標replicate來完成。
全部需要在ogg目錄下執行ggsci目錄進入ogg命令列。
源端依次是
start mgr
start extkafka
start pukafka
目標端依次是
start mgr
start rekafka
可以通過info all 或者info [程序名] 檢視狀態,所有的程序都為RUNNING才算成功
如果有不是RUNNING可通過檢視日誌的方法檢查解決問題
ogg命令列,以rekafka程序為例
view report rekafka
源端執行sql語句
insert into test_ogg(id,name) values('1','test');
commit;
update test_ogg set name=‘zhangsan ' where id='1';
commit;
delete test_ogg where id='1';
commit;
檢視源端trail檔案狀態
ls -l /opt/ogg/dirdat/to*
檢視目標端trail檔案狀態
ls -l /home/gg/ogg/dirdat/go*
通過消費者看是否有同步訊息
bin/kafka-console-consumer.sh --bootstrap-server bigdata02:9092 --topic test_ogg
本文由部落格一文多發平臺 OpenWrite 釋出!