1. 程式人生 > 資料庫 >使用Streamsets將Oracle資料實時同步到MySQL中

使用Streamsets將Oracle資料實時同步到MySQL中

相關環境:

Oracle 11g:11.2.0.1.0 

MySQL:8.0.22

前期準備:

1、開啟Oracle的logminer

a.在SQL Shell中,以具有DBA的使用者身份登入資料庫:

sqlplus /nolog;

conn / as sysdba;

b.檢查資料庫日誌記錄模式:

select log_mode from v$database;

如果查詢結果是ARCHIVELOG,則以下操作都不用執行,如果命令返回NOARCHIVELOG,請繼續執行以下步驟:

c.關閉資料庫:

shutdown immediate;

d.啟動並掛載資料庫:

startup mount;

f.啟用存檔,開啟資料庫,並使其可寫:

alter database archivelog;
alter database open read write;

這裡要注意,如果是Linux中通過docker容器安裝的Oracle要進入容器中進行修改,外部直接sqlplus連線的話會報錯。

docker exec -it oracle11g bash

2、啟用補充日誌

a.要驗證是否為資料庫啟用了補充日誌記錄,請在SQL Shell中執行以下命令:

SELECT supplemental_log_data_min, supplemental_log_data_pk, supplemental_log_data_all FROM v$database;

b.狀態都為no則執行以下指令:

ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY) COLUMNS;
ALTER SYSTEM SWITCH LOGFILE;

3、建立新Oracle使用者並授權

開啟補充日誌之後為建立新的使用者帳戶用於Oracle CDC客戶端源,併為使用者 streamsets 授權,

CREATE USER streamsets IDENTIFIED BY streamsets;
GRANT create session, alter session, execute_catalog_role, select any dictionary, select any transaction, select any table to streamsets;
GRANT select on GV_$DATABASE to streamsets;
GRANT select on V_$ARCHIVED_LOG to streamsets;
GRANT select on V_$LOGMNR_CONTENTS to streamsets;
GRANT select on v_$logmnr_parameters to streamsets;

後面測試建立表後,插入資料報錯:

ORA-01950: no privileges on tablespace 'USERS'

因為沒有Resource許可權,所以需要執行:

GRANT Resource to streamsets;

GRANT select on <db>.<table> TO <user name>;

Streamsets管道配置:

1、在Streamsets中建立新的管道

a.通過 192.168.XXX.XXX:18630 開啟Streamsets

預設賬號密碼為admin - admin

b.登入後在左邊建立新的管道

2、編輯新建的管道

因為業務是將Oracle的資料實時同步到MySQL中,所以在Origins中選擇Oracle CDC Client,在目的地Destinations中選擇JDBC Producer。然後將兩者相連:

3、匯入所需的jdbc驅動包

在StreamSets介面中,點選右上角的Package Manager,然後右邊的框拉到最下面選擇External Libraries匯入外部的jar包:

如果匯入報錯時,解決步驟可以看這裡:

4、配置 Oracle CDC Client

注意:
A.這裡的要注意一下 Oracle CDC Client支援處理以下資料庫中的資料:
    Oracle 11g,12c,18c和19c
    Oracle Real Application Clusters(RAC)12c,18c和19c
B.需要在Oracle終端啟動LogMiner,總結資料庫活動,使用這些日誌來生成記錄。LogMiner要求資料庫是開啟的,可寫的,並且在啟用歸檔的情況下處於ARCHIVELOG模式。

a.首先在Oracle 11g中建立一個學生表

命名為student:

然後插入一些測試資料:

b.編輯Oracle CDC中的內容

Tables 配置為ORACLE DATABASE中的定義的測試表STREAMSETS.student,注意ORACEL大小寫敏感。

Initial Change配置預設是 From Latest Change,實時資料同步只需要捕獲變化資料。

Operation配置為Hbase對應支援的CRUD操作型別,匹配ORACLE Database中實時變化資料的增刪改操作事務操作。

這裡改一下時區。

c.編輯JDBC配置內容

JDBC Connection String中輸入jdbc的連線資訊:

jdbc:oracle:thin:@192.168.105.77:helowinXDB

d.編輯Credentials

填寫資料庫的使用者名稱與密碼:

5、配置 JDBC Producer 元件

a.編輯General Property

Required Fields:必填欄位,是必須存在於記錄中以允許其進入處理階段的欄位。當記錄不包含所有必填欄位時,將根據為管道配置的錯誤處理對其進行處理。您可以為任何處理器,執行程式和大多數目標階段定義必需的欄位。

Preconditions:必須評估為TRUE以允許記錄進入處理階段的條件。單擊"+"以建立其他前提條件。根據為階段配置的錯誤處理,處理不滿足所有前提條件的記錄。

On Record Error:階段的錯誤記錄處理:

  •     Discard - 丟棄記錄。
  •     Send to Error - 將記錄傳送到管道以進行錯誤處理。
  •     Stop Pipeline- 停止管道。

b.編輯JDBC

JDBC Connection String: 用於連線資料庫的連線字串,各資料庫的連線配置使用以下格式

  •       Mysql: jdbc:mysql://xxxxxxxxxx:3306/資料庫名
  •       Oracle: jdbc:oracle:thin:@xxxxxxxxxx:1521:服務名
  •       PostgreSQL- jdbc:postgresql://<host>:<port>/<database_name>
  •       SQL server: jdbc:sqlserver://xxxxxxxxxx:1433;databaseName=xxx

Schema Name :要使用的可選資料庫或模式名稱。在資料庫需要完全限定的表名時使用。提示: 預設情況下,Oracle會對模式,表和列名稱使用全部大寫。僅當名稱周圍使用引號建立架構,表或列時,名稱可以是低位或大小寫。

Table Name :要使用的資料庫表名稱。使用資料庫所需的表名格式。提示: 預設情況下,Oracle會對模式,表和列名稱使用全部大寫。僅當名稱周圍使用引號建立架構,表或列時,名稱可以是低位或大小寫。

Field to Column Mapping:用於覆蓋預設欄位到列對映。預設情況下,欄位將寫入同名的列。覆蓋對映時,可以定義引數化值,以便在將字元值寫入列之前將SQL函式應用於欄位值。例如,要將欄位值轉換為整數,請為引數化值輸入以下內容:

CAST(? AS INTEGER)

Enclose Object Names:寫入資料庫時​​,將資料庫或模式名稱,表名稱和列名稱括在引號中。允許使用區分大小寫的名稱或帶有特殊字元的名稱。未啟用時,目標使用的JDBC驅動程式將確定名稱的提交方式。Oracle JDBC驅動程式預設將名稱提交為全部大寫。此外,Oracle預設使用模式,表和列名稱的全部大寫。僅當名稱周圍使用引號建立架構,表或列時,名稱可以是低位或大小寫。

Change Log Format:變更捕獲資料的格式。處理更改捕獲資料時使用。

Default Operation:如果未設定sdc.operation.type記錄標頭屬性,則執行的預設增刪改操作。

Unsupported Operation Handling :不支援在sdc.operation.type記錄標頭屬性中定義的CRUD操作型別時要採取的操作:

  • Discard - 丟棄記錄。
  • Send to Error - 將記錄傳送到管道以進行錯誤處理。
  • Use Default Operation - 使用預設操作將記錄寫入目標系統。

Use Multi-Row Operation:確定階段如何處理記錄。選擇以一次啟用插入和刪除多個記錄。在啟用此選項之前,請驗證資料庫是否支援階段使用的多行SQL語句。預設情況下,該階段執行單行操作。

Max Cache Size Per Batch (Entries):定義多行插入的預準備語句中允許的引數數量。使用-1可禁用引數限制。預設值為-1。

Rollback Batch On Error:當批次中發生錯誤時,回滾整個批次。

Additional JDBC Configuration Propertie:要使用的其他JDBC配置屬性。要新增屬性,請單擊“ + ”並定義JDBC屬性名稱和值,使用JDBC所期望的屬性名稱和值。

c.編輯Credentials

填寫資料庫的使用者名稱與密碼:


d.如果jdbc版本低於4.0,則需要配置Legacy Drivers

JDBC Driver Class Name:JDBC驅動程式的類名。早於版本4.0的JDBC版本必須填寫。
Connection Health Test Query :可選查詢,用於測試連線的執行狀況。僅當JDBC版本低於4.0時建議使用。

e.編輯Advanced

Maximum Pool Size:要建立的最大連線數,預設值為1,建議值為1。

Minimum Idle Connections:要建立和維護的最小連線數。要定義固定連線池,請將其設定為與“Maximum Pool Size”相同的值,預設值為1。

Connection Timeout (Seconds):等待連線的最長時間。在表示式中使用時間常量來定義時間增量,預設值為30秒。

Idle Timeout (Seconds):允許連線空閒的最長時間。在表示式中使用時間常量來定義時間增量,使用0可以避免刪除任何空閒連線,預設值為10分鐘.

Max Connection Lifetime (Seconds):連線的最長壽命。在表示式中使用時間常量來定義時間增量,使用0設定無最大生命週期,預設值為30分鐘

Transaction Isolation:用於連線資料庫的事務隔離級別。預設值是為資料庫設定的預設事務隔離級別。您可以通過將級別設定為以下任何一項來覆蓋資料庫預設值:

  • 讀取已提交
  • 讀取未提交
  • 可重複閱讀

序列化
Init Query:在該元件第一次連線到資料庫之後立即執行的SQL查詢

6、啟動管道流:

問題解決:

報錯一:

啟動時候報錯了 OraclORA-12505, TNS:listener does not currently know of SID given in connect descriptor

jdbc連線資料庫的時候,需要使用資料庫的sid_name,而不是資料庫的services_name.而使用plsql連線資料庫的時候,只需要資料庫的services_name即可,所以修改連線字串中的services_name 為sid_name.
查詢sid_name的語句:

select INSTANCE_NAME from v$instance;

在JDBC中修改即可:

報錯二:

我Oracle資料修改以後,並沒有在MySQL資料庫中同步到資料,然後在錯誤中可以看到錯誤資訊:

具體的報錯內容如下:

JDBC_405 - Error while generating records: java.util.concurrent.ExecutionException: com.streamsets.pipeline.api.StageException: JDBC_54 - Column: 'XM' does not exist in table: 'Schema = 'STREAMSETS', Table = 'STUDENT''. This is likely due to a DDL being performed on this table
java.util.concurrent.ExecutionException: com.streamsets.pipeline.api.StageException: JDBC_54 - Column: 'XM' does not exist in table: 'Schema = 'STREAMSETS', Table = 'STUDENT''. This is likely due to a DDL being performed on this table
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at com.streamsets.pipeline.stage.origin.jdbc.cdc.oracle.OracleCDCSource.addRecordsToQueue(OracleCDCSource.java:1151)
    at com.streamsets.pipeline.stage.origin.jdbc.cdc.oracle.OracleCDCSource.generateRecords(OracleCDCSource.java:742)
    at com.streamsets.pipeline.stage.origin.jdbc.cdc.oracle.OracleCDCSource.lambda$startGeneratorThread$5(OracleCDCSource.java:463)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.streamsets.pipeline.api.StageException: JDBC_54 - Column: 'XM' does not exist in table: 'Schema = 'STREAMSETS', Table = 'STUDENT''. This is likely due to a DDL being performed on this table
    at com.streamsets.pipeline.stage.origin.jdbc.cdc.oracle.OracleCDCSource.objectToField(OracleCDCSource.java:1926)
    at com.streamsets.pipeline.stage.origin.jdbc.cdc.oracle.OracleCDCSource.generateRecord(OracleCDCSource.java:930)
    at com.streamsets.pipeline.stage.origin.jdbc.cdc.oracle.OracleCDCSource.lambda$addRecordsToQueue$11(OracleCDCSource.java:1135)
    ... 4 more

將Oracle與MySQL的資料表中的欄位都改為大寫,則成功執行:

在Oracle中新增一條資料後:

MySQL中實時新增一條資料: