1. 程式人生 > >環境篇:資料同步工具DataX

環境篇:資料同步工具DataX

# 環境篇:資料同步工具DataX ## 1 概述 ![](https://img2020.cnblogs.com/blog/1235870/202007/1235870-20200730142446060-1263489256.png) [https://github.com/alibaba/DataX](https://github.com/alibaba/DataX) - DataX是什麼? > DataX 是阿里巴巴集團內被廣泛使用的離線資料同步工具/平臺,實現包括 MySQL、SQL Server、Oracle、PostgreSQL、HDFS、Hive、HBase、OTS、ODPS 等各種異構資料來源之間高效的資料同步功能。 ![](https://img2020.cnblogs.com/blog/1235870/202007/1235870-20200730142625809-2121561480.png) - 設計理念 > 為了解決異構資料來源同步問題,DataX將複雜的網狀的同步鏈路變成了星型資料鏈路,DataX作為中間傳輸載體負責連線各種資料來源。當需要接入一個新的資料來源的時候,只需要將此資料來源對接到DataX,便能跟已有的資料來源做到無縫資料同步。 - 當前使用現狀 > DataX在阿里巴巴集團內被廣泛使用,承擔了所有大資料的離線同步業務,並已持續穩定運行了6年之久。目前每天完成同步8w多道作業,每日傳輸資料量超過300TB。 ## 2 支援資料 | 型別 | 資料來源 | Reader(讀) | Writer(寫) | 文件 | | ------------------ | ------------------------------- | ---------- | ---------- | ------------------------------------------------------------ | | RDBMS 關係型資料庫 | MySQL | √ | √ | [讀](https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md) 、[寫](https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md) | | | Oracle | √ | √ | [讀](https://github.com/alibaba/DataX/blob/master/oraclereader/doc/oraclereader.md) 、[寫](https://github.com/alibaba/DataX/blob/master/oraclewriter/doc/oraclewriter.md) | | | SQLServer | √ | √ | [讀](https://github.com/alibaba/DataX/blob/master/sqlserverreader/doc/sqlserverreader.md) 、[寫](https://github.com/alibaba/DataX/blob/master/sqlserverwriter/doc/sqlserverwriter.md) | | | PostgreSQL | √ | √ | [讀](https://github.com/alibaba/DataX/blob/master/postgresqlreader/doc/postgresqlreader.md) 、[寫](https://github.com/alibaba/DataX/blob/master/postgresqlwriter/doc/postgresqlwriter.md) | | | DRDS | √ | √ | [讀](https://github.com/alibaba/DataX/blob/master/drdsreader/doc/drdsreader.md) 、[寫](https://github.com/alibaba/DataX/blob/master/drdswriter/doc/drdswriter.md) | | | 達夢 | √ | √ | [讀](https://github.com/alibaba/DataX/blob/master) 、[寫](https://github.com/alibaba/DataX/blob/master) | | | 通用RDBMS(支援所有關係型資料庫) | √ | √ | [讀](https://github.com/alibaba/DataX/blob/master) 、[寫](https://github.com/alibaba/DataX/blob/master) | | 阿里雲數倉資料儲存 | ODPS | √ | √ | [讀](https://github.com/alibaba/DataX/blob/master/odpsreader/doc/odpsreader.md) 、[寫](https://github.com/alibaba/DataX/blob/master/odpsswriter/doc/odpswriter.md) | | | ADS | | √ | [寫](https://github.com/alibaba/DataX/blob/master/adswriter/doc/adswriter.md) | | | OSS | √ | √ | [讀](https://github.com/alibaba/DataX/blob/master/ossreader/doc/ossreader.md) 、[寫](https://github.com/alibaba/DataX/blob/master/osswriter/doc/osswriter.md) | | | OCS | √ | √ | [讀](https://github.com/alibaba/DataX/blob/master/ocsreader/doc/ocsreader.md) 、[寫](https://github.com/alibaba/DataX/blob/master/ocswriter/doc/ocswriter.md) | | NoSQL資料儲存 | OTS | √ | √ | [讀](https://github.com/alibaba/DataX/blob/master/otsreader/doc/otsreader.md) 、[寫](https://github.com/alibaba/DataX/blob/master/otswriter/doc/otswriter.md) | | | Hbase0.94 | √ | √ | [讀](https://github.com/alibaba/DataX/blob/master/hbase094xreader/doc/hbase094xreader.md) 、[寫](https://github.com/alibaba/DataX/blob/master/hbase094xwriter/doc/hbase094xwriter.md) | | | Hbase1.1 | √ | √ | [讀](https://github.com/alibaba/DataX/blob/master/hbase11xreader/doc/hbase11xreader.md) 、[寫](https://github.com/alibaba/DataX/blob/master/hbase11xwriter/doc/hbase11xwriter.md) | | | MongoDB | √ | √ | [讀](https://github.com/alibaba/DataX/blob/master/mongoreader/doc/mongoreader.md) 、[寫](https://github.com/alibaba/DataX/blob/master/mongowriter/doc/mongowriter.md) | | | Hive | √ | √ | [讀](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md) 、[寫](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) | | 無結構化資料儲存 | TxtFile | √ | √ | [讀](https://github.com/alibaba/DataX/blob/master/txtfilereader/doc/txtfilereader.md) 、[寫](https://github.com/alibaba/DataX/blob/master/txtfilewriter/doc/txtfilewriter.md) | | | FTP | √ | √ | [讀](https://github.com/alibaba/DataX/blob/master/ftpreader/doc/ftpreader.md) 、[寫](https://github.com/alibaba/DataX/blob/master/ftpwriter/doc/ftpwriter.md) | | | HDFS | √ | √ | [讀](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md) 、[寫](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) | | | Elasticsearch | | √ | [寫](https://github.com/alibaba/DataX/blob/master/elasticsearchwriter/doc/elasticsearchwriter.md) | ## 3 架構設計 ![](https://img2020.cnblogs.com/blog/1235870/202007/1235870-20200730142856207-551830141.png) > DataX本身作為離線資料同步框架,採用Framework + plugin架構構建。將資料來源讀取和寫入抽象成為Reader/Writer外掛,納入到整個同步框架中。 - Reader:Reader為資料採集模組,負責採集資料來源的資料,將資料傳送給Framework。 - Writer: Writer為資料寫入模組,負責不斷向Framework取資料,並將資料寫入到目的端。 - Framework:Framework用於連線reader和writer,作為兩者的資料傳輸通道,並處理緩衝,流控,併發,資料轉換等核心技術問題。 ## 4 核心架構 > DataX 3.0 開源版本支援單機多執行緒模式完成同步作業執行,本小節按一個DataX作業生命週期的時序圖,從整體架構設計非常簡要說明DataX各個模組相互關係。 ![](https://img2020.cnblogs.com/blog/1235870/202007/1235870-20200730143027812-872769378.png) ### 4.1 核心模組介紹: 1. DataX完成單個數據同步的作業,我們稱之為Job,DataX接受到一個Job之後,將啟動一個程序來完成整個作業同步過程。DataX Job模組是單個作業的中樞管理節點,承擔了資料清理、子任務切分(將單一作業計算轉化為多個子Task)、TaskGroup管理等功能。 2. DataXJob啟動後,會根據不同的源端切分策略,將Job切分成多個小的Task(子任務),以便於併發執行。Task便是DataX作業的最小單元,每一個Task都會負責一部分資料的同步工作。 3. 切分多個Task之後,DataX Job會呼叫Scheduler模組,根據配置的併發資料量,將拆分成的Task重新組合,組裝成TaskGroup(任務組)。每一個TaskGroup負責以一定的併發執行完畢分配好的所有Task,預設單個任務組的併發數量為5。 4. 每一個Task都由TaskGroup負責啟動,Task啟動後,會固定啟動Reader—>Channel—>Writer的執行緒來完成任務同步工作。 5. DataX作業執行起來之後, Job監控並等待多個TaskGroup模組任務完成,等待所有TaskGroup任務完成後Job成功退出。否則,異常退出,程序退出值非0 ### 4.2 DataX排程流程: > 舉例來說,使用者提交了一個DataX作業,並且配置了20個併發,目的是將一個100張分表的mysql資料同步到odps裡面。 DataX的排程決策思路是: 1. DataXJob根據分庫分表切分成了100個Task。 2. 根據20個併發,DataX計算共需要分配4個TaskGroup。 3. 4個TaskGroup平分切分好的100個Task,每一個TaskGroup負責以5個併發共計執行25個Task。 ## 5 安裝 - 下載地址:[http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz](http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz) ```java #解壓即可 tar -zxvf datax.tar.gz ``` ## 6 案例 ### 6.1 官方案例 ```java ./bin/datax.py job/job.json ``` - 結果檢視 >任務啟動時刻 : 2020-07-30 14:36:00 >任務結束時刻 : 2020-07-30 14:36:10 >任務總計耗時 : 10s >任務平均流量 : 253.91KB/s >記錄寫入速度 : 10000rec/s >讀出記錄總數 : 100000 >讀寫失敗總數 : 0 - 查閱job.json檔案 ```json 1 { 2 "job": { 3 "setting": { 4 "speed": { 5 "byte":10485760 6 }, 7 "errorLimit": { 8 "record": 0, 9 "percentage": 0.02 10 } 11 }, 12 "content": [ 13 { 14 "reader": { 15 "name": "streamreader", 16 "parameter": { 17 "column" : [ 18 { 19 "value": "DataX", 20 "type": "string" 21 }, 22 { 23 "value": 19890604, 24 "type": "long" 25 }, 26 { 27 "value": "1989-06-04 00:00:00", 28 "type": "date" 29 }, 30 { 31 "value": true, 32 "type": "bool" 33 }, 34 { 35 "value": "test", 36 "type": "bytes" 37 } 38 ], 39 "sliceRecordCount": 100000 40 } 41 }, 42 "writer": { 43 "name": "streamwriter", 44 "parameter": { 45 "print": false, 46 "encoding": "UTF-8" 47 } 48 } 49 } 50 ] 51 } 52 } ``` ![](https://img2020.cnblogs.com/blog/1235870/202007/1235870-20200730144315309-1137657105.png) ![](https://img2020.cnblogs.com/blog/1235870/202007/1235870-20200730144720778-1752634364.png) ![](https://img2020.cnblogs.com/blog/1235870/202007/1235870-20200730144924619-864147100.png) ### 6.2 從Stream流讀取資料並列印到控制檯 > 可以通過命令檢視配置模板 ```java python bin/datax.py -r streamreader -w streamwriter ``` > 如上命令會輸出如下資訊,這樣就方便我們去編寫了 ```json DataX (DATAX-OPENSOURCE-3.0), From Alibaba ! Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved. Please refer to the streamreader document: https://github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md Please refer to the streamwriter document: https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md Please save the following configuration as a json file and use python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json to run the job. { "job": { "content": [ { "reader": { "name": "streamreader", "parameter": { "column": [], "sliceRecordCount": "" } }, "writer": { "name": "streamwriter", "parameter": { "encoding": "", "print": true } } } ], "setting": { "speed": { "channel": "" } } } } ``` - 配置模板 > vim job/HelloWorld.json ```json { "job": { "content": [ { "reader": { "name": "streamreader", "parameter": { "column": [ { "type":"string", "value":"HelloWorld" }, { "type":"long", "value":"2020" } ], "sliceRecordCount": "10" } }, "writer": { "name": "streamwriter", "parameter": { "encoding": "UTF-8", "print": true } } } ], "setting": { "speed": { "channel": "2" } } } } ``` > 上述模板配置了列印資訊為HelloWorld和2020,列印10條,並顯示,格式為UTF-8 > > 注意:這裡的Framework(Speed)為channel,其意思為併發,在流中的意思為啟動了2個流去做這個事情,那麼輸出結果就會變為20條 - 執行 ```java ./bin/datax.py job/HelloWorld.json ``` ![](https://img2020.cnblogs.com/blog/1235870/202007/1235870-20200730151839016-881712892.png) ![](https://img2020.cnblogs.com/blog/1235870/202007/1235870-20200730151816889-738694771.png) ### 6.3 Mysql資料庫同步任務到本地 | DataX 內部型別 | Mysql 資料型別 | | -------------- | --------------------------------------------------------- | | Long | int, tinyint, smallint, mediumint, int, bigint | | Double | float, double, decimal | | String | varchar, char, tinytext, text, mediumtext, longtext, year | | Date | date, datetime, timestamp, time | | Boolean | bit, bool | | Bytes | tinyblob, mediumblob, blob, longblob, varbinary | - mysql測試表建立 ```java CREATE TABLE `user_info` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '編號', `login_name` varchar(200) DEFAULT NULL COMMENT '使用者名稱稱', `nick_name` varchar(200) DEFAULT NULL COMMENT '使用者暱稱', `passwd` varchar(200) DEFAULT NULL COMMENT '使用者密碼', `name` varchar(200) DEFAULT NULL COMMENT '使用者姓名', `phone_num` varchar(200) DEFAULT NULL COMMENT '手機號', `email` varchar(200) DEFAULT NULL COMMENT '郵箱', `head_img` varchar(200) DEFAULT NULL COMMENT '頭像', `user_level` varchar(200) DEFAULT NULL COMMENT '使用者級別', `birthday` date DEFAULT NULL COMMENT '使用者生日', `gender` varchar(1) DEFAULT NULL COMMENT '性別 M男,F女', `create_time` datetime DEFAULT NULL COMMENT '建立時間', `operate_time` datetime DEFAULT NULL COMMENT '修改時間', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=41 DEFAULT CHARSET=utf8 COMMENT='使用者表'; INSERT INTO `mall`.`user_info`(`id`, `login_name`, `nick_name`, `passwd`, `name`, `phone_num`, `email`, `head_img`, `user_level`, `birthday`, `gender`, `create_time`, `operate_time`) VALUES (1, 'rz3bor', '香香', NULL, '戴香', '13446155565', '[email protected]', NULL, '1', '1990-03-15', 'F', '2020-03-15 00:00:00', '2020-03-20 00:00:00'); INSERT INTO `mall`.`user_info`(`id`, `login_name`, `nick_name`, `passwd`, `name`, `phone_num`, `email`, `head_img`, `user_level`, `birthday`, `gender`, `create_time`, `operate_time`) VALUES (2, 'n332si1jfe45', '玲芬', NULL, '柏鶯', '13531861983', '[email protected]', NULL, '1', '1976-03-15', 'F', '2020-03-15 00:00:00', '2020-03-20 00:00:00'); INSERT INTO `mall`.`user_info`(`id`, `login_name`, `nick_name`, `passwd`, `name`, `phone_num`, `email`, `head_img`, `user_level`, `birthday`, `gender`, `create_time`, `operate_time`) VALUES (3, 'hflupfwh9acg', '詠詠', NULL, '伍寒伊', '13146668529', '[email protected]', NULL, '2', '1997-03-15', 'F', '2020-03-15 00:00:00', '2020-03-20 00:00:00'); INSERT INTO `mall`.`user_info`(`id`, `login_name`, `nick_name`, `passwd`, `name`, `phone_num`, `email`, `head_img`, `user_level`, `birthday`, `gender`, `create_time`, `operate_time`) VALUES (4, 'o8o8cq4c', '昭昭', NULL, '軒轅婉', '13542317252', '[email protected]', NULL, '3', '1965-03-15', 'F', '2020-03-15 00:00:00', '2020-03-20 00:00:00'); INSERT INTO `mall`.`user_info`(`id`, `login_name`, `nick_name`, `passwd`, `name`, `phone_num`, `email`, `head_img`, `user_level`, `birthday`, `gender`, `create_time`, `operate_time`) VALUES (5, 'lqk2g4', '可可', NULL, '安靄', '13899248493', '[email protected]', NULL, '1', '1978-03-15', 'F', '2020-03-15 00:00:00', '2020-03-20 00:00:00'); INSERT INTO `mall`.`user_info`(`id`, `login_name`, `nick_name`, `passwd`, `name`, `phone_num`, `email`, `head_img`, `user_level`, `birthday`, `gender`, `create_time`, `operate_time`) VALUES (6, '866ym3j4b', '蕊薇', NULL, '司馬蕊薇', '13662131286', '[email protected]', NULL, '2', '2000-03-15', 'F', '2020-03-15 00:00:00', '2020-03-20 00:00:00'); INSERT INTO `mall`.`user_info`(`id`, `login_name`, `nick_name`, `passwd`, `name`, `phone_num`, `email`, `head_img`, `user_level`, `birthday`, `gender`, `create_time`, `operate_time`) VALUES (7, 'u8duuusr', '黛青', NULL, '百里涵予', '13742717589', '[email protected]', NULL, '1', '1996-03-15', 'F', '2020-03-15 00:00:00', '2020-03-20 00:00:00'); INSERT INTO `mall`.`user_info`(`id`, `login_name`, `nick_name`, `passwd`, `name`, `phone_num`, `email`, `head_img`, `user_level`, `birthday`, `gender`, `create_time`, `operate_time`) VALUES (8, 'hg9fkdthkv7', '娣娣', NULL, '戴莎錦', '13953633795', '[email protected]', NULL, '3', '2000-03-15', 'F', '2020-03-15 00:00:00', '2020-03-20 00:00:00'); INSERT INTO `mall`.`user_info`(`id`, `login_name`, `nick_name`, `passwd`, `name`, `phone_num`, `email`, `head_img`, `user_level`, `birthday`, `gender`, `create_time`, `operate_time`) VALUES (9, 'goatp7d', '月月', NULL, '戚珊莎', '13513119569', '[email protected]', NULL, '1', '1981-03-15', 'F', '2020-03-15 00:00:00', '2020-03-20 00:00:00'); INSERT INTO `mall`.`user_info`(`id`, `login_name`, `nick_name`, `passwd`, `name`, `phone_num`, `email`, `head_img`, `user_level`, `birthday`, `gender`, `create_time`, `operate_time`) VALUES (10, '6ernv47', '阿謙', NULL, '西門利清', '13176428692', '[email protected]', NULL, '1', '1983-03-15', 'M', '2020-03-15 00:00:00', '2020-03-20 00:00:00'); INSERT INTO `mall`.`user_info`(`id`, `login_name`, `nick_name`, `passwd`, `name`, `phone_num`, `email`, `head_img`, `user_level`, `birthday`, `gender`, `create_time`, `operate_time`) VALUES (11, 'zpo8f6swym8', '嫻瑾', NULL, '臧詠', '13782895459', '[email protected]', NULL, '1', '1979-03-16', 'F', '2020-03-16 00:00:00', '2020-03-20 00:00:00'); INSERT INTO `mall`.`user_info`(`id`, `login_name`, `nick_name`, `passwd`, `name`, `phone_num`, `email`, `head_img`, `user_level`, `birthday`, `gender`, `create_time`, `operate_time`) VALUES (12, '6nnowpp', '凡凡', NULL, '紀姣婉', '13425112725', '[email protected]', NULL, '2', '1978-03-16', 'F', '2020-03-16 00:00:00', '2020-03-20 00:00:00'); INSERT INTO `mall`.`user_info`(`id`, `login_name`, `nick_name`, `passwd`, `name`, `phone_num`, `email`, `head_img`, `user_level`, `birthday`, `gender`, `create_time`, `operate_time`) VALUES (13, 'hhzxmgoo8no', '聰聰', NULL, '淳于妹霞', '13147486822', '[email protected]', NULL, '1', '1990-03-16', 'F', '2020-03-16 00:00:00', '2020-03-20 00:00:00'); INSERT INTO `mall`.`user_info`(`id`, `login_name`, `nick_name`, `passwd`, `name`, `phone_num`, `email`, `head_img`, `user_level`, `birthday`, `gender`, `create_time`, `operate_time`) VALUES (14, 'l2p366773l', '馨馨', NULL, '葛馨', '13159754795', '[email protected]', NULL, '1', '1995-03-16', 'F', '2020-03-16 00:00:00', '2020-03-20 00:00:00'); INSERT INTO `mall`.`user_info`(`id`, `login_name`, `nick_name`, `passwd`, `name`, `phone_num`, `email`, `head_img`, `user_level`, `birthday`, `gender`, `create_time`, `operate_time`) VALUES (15, '1cyd3eop', '凡凡', NULL, '司馬瑤', '13731934554', '[email protected]', NULL, '1', '1980-03-16', 'F', '2020-03-16 00:00:00', '2020-03-20 00:00:00'); INSERT INTO `mall`.`user_info`(`id`, `login_name`, `nick_name`, `passwd`, `name`, `phone_num`, `email`, `head_img`, `user_level`, `birthday`, `gender`, `create_time`, `operate_time`) VALUES (16, 'cn5wnk4utxom', '蓓蓓', NULL, '卞蓓', '13377379335', '[email protected]', NULL, '1', '1976-03-16', 'F', '2020-03-16 00:00:00', '2020-03-20 00:00:00'); INSERT INTO `mall`.`user_info`(`id`, `login_name`, `nick_name`, `passwd`, `name`, `phone_num`, `email`, `head_img`, `user_level`, `birthday`, `gender`, `create_time`, `operate_time`) VALUES (17, 'r7l1wj', '燕彩', NULL, '謝素雲', '13624975852', '[email protected]', NULL, '1', '1989-03-16', 'F', '2020-03-16 00:00:00', '2020-03-20 00:00:00'); INSERT INTO `mall`.`user_info`(`id`, `login_name`, `nick_name`, `passwd`, `name`, `phone_num`, `email`, `head_img`, `user_level`, `birthday`, `gender`, `create_time`, `operate_time`) VALUES (18, '3e8hts7n9do1', '瑞凡', NULL, '計楓', '13417352247', '[email protected]', NULL, '1', '1995-03-16', 'F', '2020-03-16 00:00:00', '2020-03-20 00:00:00'); INSERT INTO `mall`.`user_info`(`id`, `login_name`, `nick_name`, `passwd`, `name`, `phone_num`, `email`, `head_img`, `user_level`, `birthday`, `gender`, `create_time`, `operate_time`) VALUES (19, 'b34feue0x', '萍萍', NULL, '伍英華', '13628754425', '[email protected]', NULL, '3', '2005-03-16', 'F', '2020-03-16 00:00:00', '2020-03-20 00:00:00'); INSERT INTO `mall`.`user_info`(`id`, `login_name`, `nick_name`, `passwd`, `name`, `phone_num`, `email`, `head_img`, `user_level`, `birthday`, `gender`, `create_time`, `operate_time`) VALUES (20, 'sdyk2u501qis', '蕊蕊', NULL, '周冰爽', '13161154156', '[email protected]', NULL, '1', '1985-03-16', 'F', '2020-03-16 00:00:00', '2020-03-20 00:00:00'); ``` - 模板檢視 ```java python bin/datax.py -r mysqlreader -w streamwriter ``` - vim job/mysql-local.json ```java { "job": { "setting": { "speed": { "channel":1 } }, "content": [ { "reader": { "name": "mysqlreader", "parameter": { "username": "root", "password": "root", "connection": [ { "querySql": [ "select db_id,on_line_flag from db_info where db_id < 10;" ], "jdbcUrl": [ "jdbc:mysql://127.0.0.1:3306/database" ] } ] } }, "writer": { "name": "streamwriter", "parameter": { "print": true, "encoding": "UTF-8" } } } ] } } ``` - 執行 ```java ./bin/datax.py job/mysql-local.json ``` ![](https://img2020.cnblogs.com/blog/1235870/202007/1235870-20200730153957824-814127531.png) ![](https://img2020.cnblogs.com/blog/1235870/202007/1235870-20200730154009877-1231850399.png) >
寫在最後,官網例子相當的全,請參考各資料來源讀