DataX二次開發-支援writeMode配置update
阿新 • • 發佈:2021-10-20
背景
目前很多主流資料庫支援on duplicate key update(當主鍵衝突update資料)模式,DataX也支援通過配置writeMode來配置寫入模式。但是目前僅僅只支援MySQL的實現,這裡以支援PostgreSQL介紹如何適配資料庫on duplicate key update模式。
環境準備
從github上拉取最新的DataX原始碼
git clone https://github.com/alibaba/DataX.git
需求
- 當PostgreSQL 同步資料時出現主鍵衝突時,對欄位進行update。
- 如果沒有這樣一條記錄,則新增。
SQL語法
INSERT INTO table_name(column_list) VALUES(value_list) ON CONFLICT target action;
舉例
INSERT INTO %s (id,name,data_time,remark) VALUES ( ?,?,?,? )
ON CONFLICT (id,name)
DO UPDATE SET id=excluded.id,name=excluded.name,data_time=excluded.data_time,remark=excluded.remark
程式碼
通過分析DataX原始碼可知,update模式主要是在com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil中通過生成update sql語句來實現存在則更新。
MySQL語句不需要填充主鍵或唯一鍵欄位,但PostgreSQL需要顯式指定主鍵或唯一鍵。
以此可修改程式碼
public static String getWriteTemplate(List<String> columnHolders, List<String> valueHolders, String writeMode, DataBaseType dataBaseType, boolean forceUseUpdate) { boolean isWriteModeLegal = writeMode.trim().toLowerCase().startsWith("insert") || writeMode.trim().toLowerCase().startsWith("replace") || writeMode.trim().toLowerCase().startsWith("update"); String mode = writeMode.trim().toLowerCase(); String columns = StringUtils.join(columnHolders, ","); String placeHolders = StringUtils.join(valueHolders, ","); if (!isWriteModeLegal) { throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_VALUE, String.format("您所配置的 writeMode:%s 錯誤. 因為DataX 目前僅支援replace,update 或 insert 方式. 請檢查您的配置並作出修改.", writeMode)); } // && writeMode.trim().toLowerCase().startsWith("replace") //適配pgsql String writeDataSqlTemplate; if (forceUseUpdate || mode.startsWith("update")) { if (dataBaseType == DataBaseType.MySql || dataBaseType == DataBaseType.Tddl) { writeDataSqlTemplate = new StringBuilder() .append("INSERT INTO %s (").append(StringUtils.join(columnHolders, ",")) .append(") VALUES(").append(StringUtils.join(valueHolders, ",")) .append(")") .append(onDuplicateKeyUpdateString(columnHolders)) .toString(); } else if (dataBaseType == DataBaseType.PostgreSQL) { writeDataSqlTemplate = "INSERT INTO %s (" + columns + ") VALUES ( " + placeHolders + " )" + doPostgresqlUpdate(writeMode, columnHolders); } else if (dataBaseType == DataBaseType.Oracle) { writeDataSqlTemplate = doOracleUpdate(writeMode, columnHolders) + "INSERT (" + StringUtils.join(columnHolders, ",") + ") VALUES(" + StringUtils.join(valueHolders, ",") + ")"; } else { throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_VALUE, String.format("當前資料庫不支援 writeMode:%s 模式.", writeMode)); } } else { //這裡是保護,如果其他錯誤的使用了update,需要更換為replace if (writeMode.trim().toLowerCase().startsWith("update")) { writeMode = "replace"; } writeDataSqlTemplate = new StringBuilder().append(writeMode) .append(" INTO %s (").append(StringUtils.join(columnHolders, ",")) .append(") VALUES(").append(StringUtils.join(valueHolders, ",")) .append(")").toString(); } return writeDataSqlTemplate; }
/**
* INSERT INTO table_name(column_list) VALUES(value_list)
* ON CONFLICT target action;
*
* Postgresql 存在則用主鍵或唯一鍵更新
*
* @param writeMode
* @param columnHolders
* @return
*/
private static String doPostgresqlUpdate(String writeMode, List<String> columnHolders) {
String conflict = writeMode.replace("update", "");
StringBuilder sb = new StringBuilder();
sb.append(" ON CONFLICT ");
sb.append(conflict);
sb.append(" DO ");
if (columnHolders == null || columnHolders.size() < 1) {
sb.append("NOTHING");
return sb.toString();
}
sb.append(" UPDATE SET ");
boolean first = true;
for (String column : columnHolders) {
if (!first) {
sb.append(",");
} else {
first = false;
}
sb.append(column);
sb.append("=excluded.");
sb.append(column);
}
return sb.toString();
}
在PostgresqlWriter中,解除對writeMode的判斷
public void init() {
this.originalConfig = super.getPluginJobConf();
String writeMode = this.originalConfig.getString(Key.WRITE_MODE);
if (null != writeMode) {
if (!"insert".equalsIgnoreCase(writeMode)
&& !writeMode.startsWith("update")) {
throw DataXException.asDataXException(
DBUtilErrorCode.CONF_ERROR,
String.format("寫入模式(writeMode)配置錯誤. PostgreSQL 僅支援insert, update兩種模式." +
" %s 不支援",
writeMode));
}
}
this.commonRdbmsWriterMaster = new CommonRdbmsWriter.Job(DATABASE_TYPE);
this.commonRdbmsWriterMaster.init(this.originalConfig);
}
至此,對於PostgreSQL update模式已經支援,重寫打包即可使用。對於其他資料來源實現步驟基本相同,原理就是生成對應的SQL即可。
使用
在job.json配置writeMode為update(key1,key2,key3)即可使用,例如:
{
"job": {
"setting": {
"speed": {
"channel": 1
}
},
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column" : [
{
"value": "DataX",
"type": "string"
},
{
"value": 19880808,
"type": "long"
},
{
"value": "1988-08-08 08:08:08",
"type": "date"
},
{
"value": true,
"type": "bool"
},
{
"value": "test",
"type": "bytes"
}
],
"sliceRecordCount": 1000
}
},
"writer": {
"name": "postgresqlwriter",
"parameter": {
"username": "xx",
"password": "xx",
"writeMode": "update(id,name)",
"column": [
"id",
"name"
],
"preSql": [
"delete from test"
],
"connection": [
{
"jdbcUrl": "jdbc:postgresql://127.0.0.1:3002/datax",
"table": [
"test"
]
}
]
}
}
}
]
}
}
原始碼
github原始碼:DataX二次開發