使用Streamsets將Oracle資料實時同步到MySQL中
相關環境:
Oracle 11g:11.2.0.1.0
MySQL:8.0.22
前期準備:
1、開啟Oracle的logminer
a.在SQL Shell中,以具有DBA的使用者身份登入資料庫:
sqlplus /nolog;
conn / as sysdba;
b.檢查資料庫日誌記錄模式:
select log_mode from v$database;
如果查詢結果是ARCHIVELOG,則以下操作都不用執行,如果命令返回NOARCHIVELOG,請繼續執行以下步驟:
c.關閉資料庫:
shutdown immediate;
d.啟動並掛載資料庫:
startup mount;
f.啟用存檔,開啟資料庫,並使其可寫:
alter database archivelog;
alter database open read write;
這裡要注意,如果是Linux中通過docker容器安裝的Oracle要進入容器中進行修改,外部直接sqlplus連線的話會報錯。
docker exec -it oracle11g bash
2、啟用補充日誌
a.要驗證是否為資料庫啟用了補充日誌記錄,請在SQL Shell中執行以下命令:
SELECT supplemental_log_data_min, supplemental_log_data_pk, supplemental_log_data_all FROM v$database;
b.狀態都為no則執行以下指令:
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY) COLUMNS;
ALTER SYSTEM SWITCH LOGFILE;
3、建立新Oracle使用者並授權
開啟補充日誌之後為建立新的使用者帳戶用於Oracle CDC客戶端源,併為使用者 streamsets 授權,
CREATE USER streamsets IDENTIFIED BY streamsets; GRANT create session, alter session, execute_catalog_role, select any dictionary, select any transaction, select any table to streamsets; GRANT select on GV_$DATABASE to streamsets; GRANT select on V_$ARCHIVED_LOG to streamsets; GRANT select on V_$LOGMNR_CONTENTS to streamsets; GRANT select on v_$logmnr_parameters to streamsets;
後面測試建立表後,插入資料報錯:
ORA-01950: no privileges on tablespace 'USERS'
因為沒有Resource許可權,所以需要執行:
GRANT Resource to streamsets;
GRANT select on <db>.<table> TO <user name>;
Streamsets管道配置:
1、在Streamsets中建立新的管道
a.通過 192.168.XXX.XXX:18630 開啟Streamsets
預設賬號密碼為admin - admin
b.登入後在左邊建立新的管道
2、編輯新建的管道
因為業務是將Oracle的資料實時同步到MySQL中,所以在Origins中選擇Oracle CDC Client,在目的地Destinations中選擇JDBC Producer。然後將兩者相連:
3、匯入所需的jdbc驅動包
在StreamSets介面中,點選右上角的Package Manager,然後右邊的框拉到最下面選擇External Libraries匯入外部的jar包:
如果匯入報錯時,解決步驟可以看這裡:
4、配置 Oracle CDC Client
注意:
A.這裡的要注意一下 Oracle CDC Client支援處理以下資料庫中的資料:
Oracle 11g,12c,18c和19c
Oracle Real Application Clusters(RAC)12c,18c和19c
B.需要在Oracle終端啟動LogMiner,總結資料庫活動,使用這些日誌來生成記錄。LogMiner要求資料庫是開啟的,可寫的,並且在啟用歸檔的情況下處於ARCHIVELOG模式。
a.首先在Oracle 11g中建立一個學生表
命名為student:
然後插入一些測試資料:
b.編輯Oracle CDC中的內容
Tables 配置為ORACLE DATABASE中的定義的測試表STREAMSETS.student,注意ORACEL大小寫敏感。
Initial Change配置預設是 From Latest Change,實時資料同步只需要捕獲變化資料。
Operation配置為Hbase對應支援的CRUD操作型別,匹配ORACLE Database中實時變化資料的增刪改操作事務操作。
這裡改一下時區。
c.編輯JDBC配置內容
JDBC Connection String中輸入jdbc的連線資訊:
jdbc:oracle:thin:@192.168.105.77:helowinXDB
d.編輯Credentials
填寫資料庫的使用者名稱與密碼:
5、配置 JDBC Producer 元件
a.編輯General Property
Required Fields:必填欄位,是必須存在於記錄中以允許其進入處理階段的欄位。當記錄不包含所有必填欄位時,將根據為管道配置的錯誤處理對其進行處理。您可以為任何處理器,執行程式和大多數目標階段定義必需的欄位。
Preconditions:必須評估為TRUE以允許記錄進入處理階段的條件。單擊"+"以建立其他前提條件。根據為階段配置的錯誤處理,處理不滿足所有前提條件的記錄。
On Record Error:階段的錯誤記錄處理:
- Discard - 丟棄記錄。
- Send to Error - 將記錄傳送到管道以進行錯誤處理。
- Stop Pipeline- 停止管道。
b.編輯JDBC
JDBC Connection String: 用於連線資料庫的連線字串,各資料庫的連線配置使用以下格式
- Mysql: jdbc:mysql://xxxxxxxxxx:3306/資料庫名
- Oracle: jdbc:oracle:thin:@xxxxxxxxxx:1521:服務名
- PostgreSQL- jdbc:postgresql://<host>:<port>/<database_name>
- SQL server: jdbc:sqlserver://xxxxxxxxxx:1433;databaseName=xxx
Schema Name :要使用的可選資料庫或模式名稱。在資料庫需要完全限定的表名時使用。提示: 預設情況下,Oracle會對模式,表和列名稱使用全部大寫。僅當名稱周圍使用引號建立架構,表或列時,名稱可以是低位或大小寫。
Table Name :要使用的資料庫表名稱。使用資料庫所需的表名格式。提示: 預設情況下,Oracle會對模式,表和列名稱使用全部大寫。僅當名稱周圍使用引號建立架構,表或列時,名稱可以是低位或大小寫。
Field to Column Mapping:用於覆蓋預設欄位到列對映。預設情況下,欄位將寫入同名的列。覆蓋對映時,可以定義引數化值,以便在將字元值寫入列之前將SQL函式應用於欄位值。例如,要將欄位值轉換為整數,請為引數化值輸入以下內容:
CAST(? AS INTEGER)
Enclose Object Names:寫入資料庫時,將資料庫或模式名稱,表名稱和列名稱括在引號中。允許使用區分大小寫的名稱或帶有特殊字元的名稱。未啟用時,目標使用的JDBC驅動程式將確定名稱的提交方式。Oracle JDBC驅動程式預設將名稱提交為全部大寫。此外,Oracle預設使用模式,表和列名稱的全部大寫。僅當名稱周圍使用引號建立架構,表或列時,名稱可以是低位或大小寫。
Change Log Format:變更捕獲資料的格式。處理更改捕獲資料時使用。
Default Operation:如果未設定sdc.operation.type記錄標頭屬性,則執行的預設增刪改操作。
Unsupported Operation Handling :不支援在sdc.operation.type記錄標頭屬性中定義的CRUD操作型別時要採取的操作:
- Discard - 丟棄記錄。
- Send to Error - 將記錄傳送到管道以進行錯誤處理。
- Use Default Operation - 使用預設操作將記錄寫入目標系統。
Use Multi-Row Operation:確定階段如何處理記錄。選擇以一次啟用插入和刪除多個記錄。在啟用此選項之前,請驗證資料庫是否支援階段使用的多行SQL語句。預設情況下,該階段執行單行操作。
Max Cache Size Per Batch (Entries):定義多行插入的預準備語句中允許的引數數量。使用-1可禁用引數限制。預設值為-1。
Rollback Batch On Error:當批次中發生錯誤時,回滾整個批次。
Additional JDBC Configuration Propertie:要使用的其他JDBC配置屬性。要新增屬性,請單擊“ + ”並定義JDBC屬性名稱和值,使用JDBC所期望的屬性名稱和值。
c.編輯Credentials
填寫資料庫的使用者名稱與密碼:
d.如果jdbc版本低於4.0,則需要配置Legacy Drivers
JDBC Driver Class Name:JDBC驅動程式的類名。早於版本4.0的JDBC版本必須填寫。
Connection Health Test Query :可選查詢,用於測試連線的執行狀況。僅當JDBC版本低於4.0時建議使用。
e.編輯Advanced
Maximum Pool Size:要建立的最大連線數,預設值為1,建議值為1。
Minimum Idle Connections:要建立和維護的最小連線數。要定義固定連線池,請將其設定為與“Maximum Pool Size”相同的值,預設值為1。
Connection Timeout (Seconds):等待連線的最長時間。在表示式中使用時間常量來定義時間增量,預設值為30秒。
Idle Timeout (Seconds):允許連線空閒的最長時間。在表示式中使用時間常量來定義時間增量,使用0可以避免刪除任何空閒連線,預設值為10分鐘.
Max Connection Lifetime (Seconds):連線的最長壽命。在表示式中使用時間常量來定義時間增量,使用0設定無最大生命週期,預設值為30分鐘
Transaction Isolation:用於連線資料庫的事務隔離級別。預設值是為資料庫設定的預設事務隔離級別。您可以通過將級別設定為以下任何一項來覆蓋資料庫預設值:
- 讀取已提交
- 讀取未提交
- 可重複閱讀
序列化
Init Query:在該元件第一次連線到資料庫之後立即執行的SQL查詢
6、啟動管道流:
問題解決:
報錯一:
啟動時候報錯了 OraclORA-12505, TNS:listener does not currently know of SID given in connect descriptor
jdbc連線資料庫的時候,需要使用資料庫的sid_name,而不是資料庫的services_name.而使用plsql連線資料庫的時候,只需要資料庫的services_name即可,所以修改連線字串中的services_name 為sid_name.
查詢sid_name的語句:
select INSTANCE_NAME from v$instance;
在JDBC中修改即可:
報錯二:
我Oracle資料修改以後,並沒有在MySQL資料庫中同步到資料,然後在錯誤中可以看到錯誤資訊:
具體的報錯內容如下:
JDBC_405 - Error while generating records: java.util.concurrent.ExecutionException: com.streamsets.pipeline.api.StageException: JDBC_54 - Column: 'XM' does not exist in table: 'Schema = 'STREAMSETS', Table = 'STUDENT''. This is likely due to a DDL being performed on this table
java.util.concurrent.ExecutionException: com.streamsets.pipeline.api.StageException: JDBC_54 - Column: 'XM' does not exist in table: 'Schema = 'STREAMSETS', Table = 'STUDENT''. This is likely due to a DDL being performed on this table
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at com.streamsets.pipeline.stage.origin.jdbc.cdc.oracle.OracleCDCSource.addRecordsToQueue(OracleCDCSource.java:1151)
at com.streamsets.pipeline.stage.origin.jdbc.cdc.oracle.OracleCDCSource.generateRecords(OracleCDCSource.java:742)
at com.streamsets.pipeline.stage.origin.jdbc.cdc.oracle.OracleCDCSource.lambda$startGeneratorThread$5(OracleCDCSource.java:463)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.streamsets.pipeline.api.StageException: JDBC_54 - Column: 'XM' does not exist in table: 'Schema = 'STREAMSETS', Table = 'STUDENT''. This is likely due to a DDL being performed on this table
at com.streamsets.pipeline.stage.origin.jdbc.cdc.oracle.OracleCDCSource.objectToField(OracleCDCSource.java:1926)
at com.streamsets.pipeline.stage.origin.jdbc.cdc.oracle.OracleCDCSource.generateRecord(OracleCDCSource.java:930)
at com.streamsets.pipeline.stage.origin.jdbc.cdc.oracle.OracleCDCSource.lambda$addRecordsToQueue$11(OracleCDCSource.java:1135)
... 4 more
將Oracle與MySQL的資料表中的欄位都改為大寫,則成功執行:
在Oracle中新增一條資料後:
MySQL中實時新增一條資料: