DataX全量、增量、已刪除資料同步方案與實際運用
本文目的
DataX 是一款可以實現異構資料庫間離線資料同步的工具,本文重點將使用DataX做一個oracle到mysql的資料同步,其中會藉助datax-web進行視覺化配置。
使用場景簡單講下:客戶提供了oracle前置庫,我們系統每天需要定時從前置庫將資料同步到我們自己的mysql資料庫。
本文不再細說如何使用datax和data-web,直接從問題的角度觸發,主要記錄下問題解決思路和辦法,歡迎大家指正和探討!!!
1.datax-web部分
1.1 Oracle資料來源測試成功,但是任務構建提示表不存在
任務構建時,資料來源選擇oracle,提示表不存在、Schema沒有任何選項、資料庫表沒有顯示等類似的問題都可能與datax-web有關。
根據瀏覽器的請求分析下datax-web的原始碼
找到對應介面的查詢如下:
@Override public String getSQLQueryTables(String... tableSchema) { return "select table_name from dba_tables where owner='" + tableSchema[0] + "'"; } @Override public String getSQLQueryTableSchema(String... args) { return "select username from sys.dba_users"; }
根據sql應該可以分析出來問題了,授予對應的許可權應該就可以了。或者直接修改原始碼(有點麻煩不建議)
2.DataX部分
2.1同步資料亂碼
jdbcUrl加上指定編碼配置即可
2.2資料更新問題(主鍵衝突):writeMode
預設writeMode為insert,此情況下只能新增資料,有主鍵衝突就會報錯,此時需要設定為寫入模式為更新模式(replace)。原始碼mysqlwriter.md中有解釋如下:
* writeMode * 描述:控制寫入資料到目標表採用 `insert into` 或者 `replace into` 或者 `ON DUPLICATE KEY UPDATE` 語句<br /> * 必選:是 <br /> * 所有選項:insert/replace/update <br /> * 預設值:insert<br />
mysql比較特殊的寫入模式配置為"writeMode": "update"
,其他資料庫需要酌情配置為"writeMode": "replace"
2.3增量同步(根據日期)
按日期進行同步,在reader.parameter增加“where”引數,裡面就是需要過濾的資料,例子是隻同步30天以內的資料
"where": "CREATE_TIME > TO_CHAR(TO_DATE(SYSDATE - 30),'yyyy-MM-dd HH24:mi:ss')"
#資料庫是oracle,其他資料庫可能需要定製。此處的是隻同步建立時間在30天內的資料。
可以根據實際業務需要定製where引數來實現資料篩選
2.4刪除資料同步
datax只有新增和更新兩種資料會同步,當源資料庫有資料刪除時是無法同步的,就會造成源資料庫已經刪除了,但目標資料庫還存在這些資料。目前想到以下兩種方案:
2.4.1清空表完全走新增邏輯
在前置sql中配置清空標的sql即可。唯一的問題就是清空表到資料同步完成期間表是資料確實的,可能對業務影響比較大。
在writer.parameter引數中新增preSql配置即可
"preSql": ["truncate table 表名;"],
2.4.2利用已刪除資料不會同步的邏輯
總體思路:
1、需要同步的目標資料庫表增加一個SYNC_STATUS欄位
2、每次同步時,用前置sql更新SYNC_STATUS=0
3、每次同步資料時將一個常量1同步到SYNC_STATUS,達到SYNC_STATUS=1的目的
4、後置sql執行刪除操作,將SYNC_STATUS=0的資料全部刪除(源表此資料已經物理刪除,目標表此資料不會有更新,所以前置sql更新的SYNC_STATUS=0不會變,可以認定為是已刪除資料)
這樣目前只能全量同步,需要增量的同步資料(含刪除)還需要在進行改造,示例如下:
{
"job": {
"setting": {
"speed": {
"channel": 3,
"byte": 1048576
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "oraclereader",
"parameter": {
"username": "xxx",
"password": "xxx",
"column": [
"\"ID\"",
"1"
],
"splitPk": "ID",
"connection": [
{
"table": [
"xxx"
],
"jdbcUrl": [
"jdbc:oracle:thin:@//xxx:1521/orcl"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "update",
"username": "xxx",
"password": "xxx",
"column": [
"`ID`",
"`SYNC_STATUS`"
],
"preSql": [
"UPDATE xxx SET SYNC_STATUS = '0';"
],
"postSql": [
"DELETE FROM xxx WHERE SYNC_STATUS = '0';"
],
"connection": [
{
"table": [
"xxx"
],
"jdbcUrl": "jdbc:mysql://xxx:3306/xxx?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&allowMultiQueries=true"
}
]
}
}
}
]
}
}