開源 ETL 工具 DataX 實踐,從mysql到mysql的全量同步和批量更新
阿新 • • 發佈:2021-01-09
開源 ETL 工具 DataX 實踐,從mysql 到不同結構的另一個mysql的全量同步和批量更新
連結:
實踐步驟:
參照官方文件,採用方法一部署
如果點選下載沒反應,手動複製地址,把http換成https
下載解壓完成,執行自檢指令碼
File “datax.py”, line 114 print readerRef 。因為我電腦安裝的是python3 ,腳本里是python2語法
修改下 datax.py 中 114行後面的print print xx 改為 print(xx)
try except Exception, e: 改為 try except Exception as e
修改完datax.py後再次執行
成功!
參考官方文件,編寫一個從一個mysql到另一個mysql的全量同步
先準備資料庫
1、建兩張表 datax_src和datax_target 表 ,就兩個欄位,其中第二個欄位名稱不一樣 ,案列演示 從datax_src讀取資料,寫入到datax_target中
-- datax_src 表 欄位 id、src_name CREATE TABLE `datax_src` ( `id` bigint NOT NULL, `src_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; -- datax_target 表 欄位 id、target_name CREATE TABLE `datax_target` ( `id` bigint NOT NULL, `target_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
2、建立一個函式,向datax_src插入5000條資料
CREATE DEFINER=`root`@`%` FUNCTION `AUTO_INSERT`() RETURNS int DETERMINISTIC BEGIN DECLARE index_num int DEFAULT 0; WHILE index_num < 5000 DO SET index_num = index_num + 1; INSERT INTO datax_src VALUES (index_num,CONCAT('name',index_num)); END WHILE; RETURN 0; END
編寫job json檔案
按照 文件中的 mysqlreader 、mysqlwriter 的示例修改
1、修改mysqlreader的示例 從datax_src同步抽取資料到本地:
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "1131310577",
"column": [
"id",
"src_name"
],
"splitPk": "id",
"connection": [
{
"table": [
"datax_src"
],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3307/datax"
]
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print":true
}
}
}
]
}
}
將json檔案放在job資料夾中,執行
拋異常了:
DataX無法連線對應的資料庫,可能原因是:1) 配置的ip/port/database/jdbc錯誤,無法連線。2) 配置的username/password錯誤,鑑權失敗。請和DBA確認該資料庫的連線資訊是否正確
可是密碼明明是正確的。然後想著修改json檔案換了一個數據庫,再執行,成功了!
發現區別就是mysql的版本不一樣 mysql5.0.27的執行成功了, 失敗這個庫是mysql8.0.22。
所以看了下datax目錄結構,發現聯結器位置如下圖。 找了下聯結器jar包,把reader和writer資料夾中的mysql-connector5換成了8(這裡只截圖了reader)
再次執行json檔案job:成功執行
2、修改mysqlwriter的示例 ,結合mysqlreader ,把讀寫合併,
完整的 json檔案如下 ,具體引數的含義。官方文件中有詳細介紹
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "1131310577",
"column": [
"id",
"src_name"
],
"splitPk": "id",
"connection": [
{
"table": [
"datax_src"
],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3307/datax"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "1131310577",
"column": [
"id",
"target_name"
],
"session": [
"set session sql_mode='ANSI'"
],
"preSql": [
"delete from target_name"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://127.0.0.1:3307/datax?useUnicode=true&characterEncoding=gbk",
"table": [
"datax_target"
]
}
]
}
}
}
]
}
}
執行!成功!!
檢視 datax_target 表: 資料已成功寫入
以上是全表資料同步
參考官方文件,再編寫一個從一個mysql到另一個mysql的批量更新
下面通過自定義querySql實現一下部分更新的操作
1、更新datax_src表的部分資料,更新id小於10的9條資料
datax 的 job 的 json檔案如下: 加了一個 querySql 引數,並把 writer 中的 writeMode 換成 update
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "1131310577",
"column": [
"id",
"src_name"
],
"splitPk": "id",
"connection": [
{
"querySql": [
"select id,src_name from datax_src where id < 10;"
],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3307/datax"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "update",
"username": "root",
"password": "1131310577",
"column": [
"id",
"target_name"
],
"session": [
"set session sql_mode='ANSI'"
],
"preSql": [
"delete from datax_target"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://127.0.0.1:3307/datax?useUnicode=true&characterEncoding=gbk",
"table": [
"datax_target"
]
}
]
}
}
}
]
}
}
執行,成功
檢視下datax_target中的資料:也已被成功更新