1. 程式人生 > 資料庫 >開源 ETL 工具 DataX 實踐,從mysql到mysql的全量同步和批量更新

開源 ETL 工具 DataX 實踐,從mysql到mysql的全量同步和批量更新

開源 ETL 工具 DataX 實踐,從mysql 到不同結構的另一個mysql的全量同步和批量更新

連結:

實踐步驟:

參照官方文件,採用方法一部署

在這裡插入圖片描述

如果點選下載沒反應,手動複製地址,把http換成https
在這裡插入圖片描述

下載解壓完成,執行自檢指令碼

在這裡插入圖片描述

File “datax.py”, line 114 print readerRef 。因為我電腦安裝的是python3 ,腳本里是python2語法

修改下 datax.py 中 114行後面的print print xx 改為 print(xx)

try except Exception, e: 改為 try except Exception as e

修改完datax.py後再次執行

在這裡插入圖片描述

成功!

參考官方文件,編寫一個從一個mysql到另一個mysql的全量同步
先準備資料庫

1、建兩張表 datax_src和datax_target 表 ,就兩個欄位,其中第二個欄位名稱不一樣 ,案列演示 從datax_src讀取資料,寫入到datax_target中

-- datax_src 表  欄位 id、src_name
CREATE TABLE `datax_src` (
  `id` bigint NOT NULL,
  `src_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

-- datax_target 表  欄位 id、target_name
CREATE TABLE `datax_target` (
  `id` bigint NOT NULL,
  `target_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

2、建立一個函式,向datax_src插入5000條資料

CREATE DEFINER=`root`@`%` FUNCTION `AUTO_INSERT`() RETURNS int
    DETERMINISTIC
BEGIN
	DECLARE index_num int DEFAULT 0;
		WHILE index_num < 5000 
			DO
				SET index_num = index_num + 1;
				INSERT INTO datax_src VALUES (index_num,CONCAT('name',index_num));
		END WHILE;
	RETURN 0;
END
編寫job json檔案

按照 文件中的 mysqlreader 、mysqlwriter 的示例修改

在這裡插入圖片描述

在這裡插入圖片描述

1、修改mysqlreader的示例 從datax_src同步抽取資料到本地:
{
    "job": {
        "setting": {
            "speed": {
                 "channel": 3
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "1131310577",
                        "column": [
                            "id",
                            "src_name"
                        ],
                        "splitPk": "id",
                        "connection": [
                            {
                                "table": [
                                    "datax_src"
                                ],
                                "jdbcUrl": [
									"jdbc:mysql://127.0.0.1:3307/datax"
                                ]
                            }
                        ]
                    }
                },
               "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "print":true
                    }
                }
            }
        ]
    }
}

將json檔案放在job資料夾中,執行

在這裡插入圖片描述

在這裡插入圖片描述

拋異常了:

DataX無法連線對應的資料庫,可能原因是:1) 配置的ip/port/database/jdbc錯誤,無法連線。2) 配置的username/password錯誤,鑑權失敗。請和DBA確認該資料庫的連線資訊是否正確

可是密碼明明是正確的。然後想著修改json檔案換了一個數據庫,再執行,成功了!
發現區別就是mysql的版本不一樣 mysql5.0.27的執行成功了, 失敗這個庫是mysql8.0.22。
所以看了下datax目錄結構,發現聯結器位置如下圖。 找了下聯結器jar包,把reader和writer資料夾中的mysql-connector5換成了8(這裡只截圖了reader)

在這裡插入圖片描述

再次執行json檔案job:成功執行

在這裡插入圖片描述

2、修改mysqlwriter的示例 ,結合mysqlreader ,把讀寫合併,
完整的 json檔案如下 ,具體引數的含義。官方文件中有詳細介紹
{
    "job": {
        "setting": {
            "speed": {
                 "channel": 3
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "1131310577",
                        "column": [
                            "id",
                            "src_name"
                        ],
                        "splitPk": "id",
                        "connection": [
                            {
                                "table": [
                                    "datax_src"
                                ],
                                "jdbcUrl": [
									"jdbc:mysql://127.0.0.1:3307/datax"
                                ]
                            }
                        ]
                    }
                },
               "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "insert",
                        "username": "root",
                        "password": "1131310577",
                        "column": [
                            "id",
                            "target_name"
                        ],
                        "session": [
                        	"set session sql_mode='ANSI'"
                        ],
                        "preSql": [
                            "delete from target_name"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://127.0.0.1:3307/datax?useUnicode=true&characterEncoding=gbk",
                                "table": [
                                    "datax_target"
                                ]
                            }
                        ]
                    }
                }
            }
        ]
    }
}

執行!成功!!

在這裡插入圖片描述

檢視 datax_target 表: 資料已成功寫入

在這裡插入圖片描述

以上是全表資料同步

參考官方文件,再編寫一個從一個mysql到另一個mysql的批量更新

下面通過自定義querySql實現一下部分更新的操作

1、更新datax_src表的部分資料,更新id小於10的9條資料

在這裡插入圖片描述

datax 的 job 的 json檔案如下: 加了一個 querySql 引數,並把 writer 中的 writeMode 換成 update

{
    "job": {
        "setting": {
            "speed": {
                 "channel": 3
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "1131310577",
                        "column": [
                            "id",
                            "src_name"
                        ],
                        "splitPk": "id",
                        "connection": [
                            {
								 "querySql": [
                                    "select id,src_name from datax_src where id < 10;"
                                ],
                                "jdbcUrl": [
									"jdbc:mysql://127.0.0.1:3307/datax"
                                ]
                            }
                        ]
                    }
                },
               "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "update",
                        "username": "root",
                        "password": "1131310577",
                        "column": [
                            "id",
                            "target_name"
                        ],
                        "session": [
                        	"set session sql_mode='ANSI'"
                        ],
                        "preSql": [
                            "delete from datax_target"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://127.0.0.1:3307/datax?useUnicode=true&characterEncoding=gbk",
                                "table": [
                                    "datax_target"
                                ]
                            }
                        ]
                    }
                }
            }
        ]
    }
}

執行,成功

在這裡插入圖片描述

檢視下datax_target中的資料:也已被成功更新

在這裡插入圖片描述