1. 程式人生 > 其它 >DataX全量、增量、已刪除資料同步方案與實際運用

DataX全量、增量、已刪除資料同步方案與實際運用

目錄

本文目的

DataX 是一款可以實現異構資料庫間離線資料同步的工具,本文重點將使用DataX做一個oracle到mysql的資料同步,其中會藉助datax-web進行視覺化配置。
使用場景簡單講下:客戶提供了oracle前置庫,我們系統每天需要定時從前置庫將資料同步到我們自己的mysql資料庫。
本文不再細說如何使用datax和data-web,直接從問題的角度觸發,主要記錄下問題解決思路和辦法,歡迎大家指正和探討!!!

1.datax-web部分

1.1 Oracle資料來源測試成功,但是任務構建提示表不存在

任務構建時,資料來源選擇oracle,提示表不存在、Schema沒有任何選項、資料庫表沒有顯示等類似的問題都可能與datax-web有關。
根據瀏覽器的請求分析下datax-web的原始碼

找到對應介面的查詢如下:

    @Override
    public String getSQLQueryTables(String... tableSchema) {
        return "select table_name from dba_tables where owner='" + tableSchema[0] + "'";
    }

    @Override
    public String getSQLQueryTableSchema(String... args) {
        return "select username from sys.dba_users";
    }

根據sql應該可以分析出來問題了,授予對應的許可權應該就可以了。或者直接修改原始碼(有點麻煩不建議)

2.DataX部分

2.1同步資料亂碼

jdbcUrl加上指定編碼配置即可

2.2資料更新問題(主鍵衝突):writeMode

預設writeMode為insert,此情況下只能新增資料,有主鍵衝突就會報錯,此時需要設定為寫入模式為更新模式(replace)。原始碼mysqlwriter.md中有解釋如下:

* writeMode
* 描述:控制寫入資料到目標表採用 `insert into` 或者 `replace into` 或者 `ON DUPLICATE KEY UPDATE` 語句<br />
* 必選:是 <br />
* 所有選項:insert/replace/update <br />
* 預設值:insert<br />

mysql比較特殊的寫入模式配置為"writeMode": "update",其他資料庫需要酌情配置為"writeMode": "replace"

2.3增量同步(根據日期)

按日期進行同步,在reader.parameter增加“where”引數,裡面就是需要過濾的資料,例子是隻同步30天以內的資料

"where": "CREATE_TIME > TO_CHAR(TO_DATE(SYSDATE - 30),'yyyy-MM-dd HH24:mi:ss')"
#資料庫是oracle,其他資料庫可能需要定製。此處的是隻同步建立時間在30天內的資料。

可以根據實際業務需要定製where引數來實現資料篩選

2.4刪除資料同步

datax只有新增和更新兩種資料會同步,當源資料庫有資料刪除時是無法同步的,就會造成源資料庫已經刪除了,但目標資料庫還存在這些資料。目前想到以下兩種方案:

2.4.1清空表完全走新增邏輯

在前置sql中配置清空標的sql即可。唯一的問題就是清空表到資料同步完成期間表是資料確實的,可能對業務影響比較大。
在writer.parameter引數中新增preSql配置即可

"preSql": ["truncate table 表名;"],

2.4.2利用已刪除資料不會同步的邏輯

總體思路:
1、需要同步的目標資料庫表增加一個SYNC_STATUS欄位
2、每次同步時,用前置sql更新SYNC_STATUS=0
3、每次同步資料時將一個常量1同步到SYNC_STATUS,達到SYNC_STATUS=1的目的
4、後置sql執行刪除操作,將SYNC_STATUS=0的資料全部刪除(源表此資料已經物理刪除,目標表此資料不會有更新,所以前置sql更新的SYNC_STATUS=0不會變,可以認定為是已刪除資料)
這樣目前只能全量同步,需要增量的同步資料(含刪除)還需要在進行改造,示例如下:

{
  "job": {
    "setting": {
      "speed": {
        "channel": 3,
        "byte": 1048576
      },
      "errorLimit": {
        "record": 0,
        "percentage": 0.02
      }
    },
    "content": [
      {
        "reader": {
          "name": "oraclereader",
          "parameter": {
            "username": "xxx",
            "password": "xxx",
            "column": [
              "\"ID\"",
              "1"
            ],
            "splitPk": "ID",
            "connection": [
              {
                "table": [
                  "xxx"
                ],
                "jdbcUrl": [
                  "jdbc:oracle:thin:@//xxx:1521/orcl"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "writeMode": "update",
            "username": "xxx",
            "password": "xxx",
            "column": [
              "`ID`",
              "`SYNC_STATUS`"
            ],
            "preSql": [
              "UPDATE xxx SET SYNC_STATUS = '0';"
            ],
            "postSql": [
              "DELETE FROM xxx WHERE SYNC_STATUS = '0';"
            ],
            "connection": [
              {
                "table": [
                  "xxx"
                ],
                "jdbcUrl": "jdbc:mysql://xxx:3306/xxx?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&allowMultiQueries=true"
              }
            ]
          }
        }
      }
    ]
  }
}