1. 程式人生 > >資料收集之DataX

資料收集之DataX

DataX

DataX是阿里開源的離線資料同步工具,可以實現包括 MySQL、Oracle、MongoDB、Hive、HDFS、HBase、Elasticsearch等各種異構資料來源之間的高效同步。

DataX原理

設計理念

datax_star
為了解決異構資料來源同步問題,DataX將複雜的網狀同步鏈路變成星型鏈路,DataX作為中間傳輸載體負責連線各種資料來源。當需要接入一個新的資料來源的時候,只需將此資料來源對接到DataX,便能跟已有資料來源做到無縫資料同步。

框架設計

datax_frame

採用Framework + plugin架構構建。將資料來源讀取和寫入抽象成為Reader/Writer外掛,納入到整個同步框架中。

  • Reader:資料採集模組,負責採集資料來源的資料,將資料傳送給Framework。
  • Writer: 資料寫入模組,負責不斷從Framework取資料,並將資料寫入到目的端。
  • Framework:Framework用於連線Reader和Writer,作為兩者的資料傳輸通道,並處理緩衝,流控,併發,資料轉換等核心技術問題。

核心架構

datax_point_struct

  • 核心模組介紹
    1. DataX完成單個數據同步的作業,我們稱之為Job,DataX接受到一個Job之後,將啟動一個程序來完成整個作業同步過程。DataX Job模組是單個作業的中樞管理節點,承擔了資料清理、子任務切分(將單一作業計算轉化為多個子Task)、TaskGroup管理等功能。
    2. DataXJob啟動後,會根據不同的源端切分策略,將Job切分成多個小的Task(子任務),以便於併發執行。Task便是DataX作業的最小單元,每一個Task都會負責一部分資料的同步工作。
    3. 切分多個Task之後,DataX Job會呼叫Scheduler模組,根據配置的併發資料量,將拆分成的Task重新組合,組裝成TaskGroup(任務組)。每一個TaskGroup負責以一定的併發執行完畢分配好的所有Task,預設單個任務組的併發數量為5。
    4. 每一個Task都由TaskGroup負責啟動,Task啟動後,會固定啟動Reader—>Channel—>Writer的執行緒來完成任務同步工作。
    5. DataX作業執行起來之後, Job監控並等待多個TaskGroup模組任務完成,等待所有TaskGroup任務完成後Job成功退出。否則,異常退出,程序退出值非0。
  • 排程流程
    舉例來說,使用者提交了一個DataX作業,並且配置了20個併發,目的是將一個100張分表的mysql資料同步到odps裡面。 DataX的排程決策思路是:
    1. DataXJob根據分庫分表切分成了100個Task。
    2. 根據20個併發,DataX計算共需要分配4個TaskGroup。
    3. 4個TaskGroup平分切分好的100個Task,每一個TaskGroup負責以5個併發共計執行25個Task。

DataX 部署

工具部署

```shell
[[email protected] /data/software]# wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
[[email protected] /data/software]# tar -zxvf datax.tar.gz
[[email protected] /data/software/datax]# python bin/datax.py job/job.json #自檢
```

目錄結構

 [[email protected] /data/software/datax]# tree -L 3 -I '*jar*'
    .
    ├── bin 啟動指令碼
    │   ├── datax.py
    │   ├── dxprof.py
    │   └── perftrace.py
    ├── conf 核心配置
    │   ├── core.json
    │   └── logback.xml
    ├── job  job目錄
    │   └── job.json
    ├── lib 核心類庫
    ├── log
    ├── log_perf
    ├── plugin 插卡目錄
    │   ├── reader 
    │   │   ├── drdsreader
    │   │   ├── ftpreader
    │   │   ├── hbase094xreader
    │   │   ├── hbase11xreader
    │   │   ├── hdfsreader
    │   │   ├── mongodbreader
    │   │   ├── mysqlreader
    │   │   ├── odpsreader
    │   │   ├── oraclereader
    │   │   ├── ossreader
    │   │   ├── otsreader
    │   │   ├── otsstreamreader
    │   │   ├── postgresqlreader
    │   │   ├── rdbmsreader
    │   │   ├── sqlserverreader
    │   │   ├── streamreader
    │   │   └── txtfilereader
    │   └── writer
    │       ├── adswriter
    │       ├── drdswriter
    │       ├── ftpwriter
    │       ├── hbase094xwriter
    │       ├── hbase11xsqlwriter
    │       ├── hbase11xwriter
    │       ├── hdfswriter
    │       ├── mongodbwriter
    │       ├── mysqlwriter
    │       ├── ocswriter
    │       ├── odpswriter
    │       ├── oraclewriter
    │       ├── osswriter
    │       ├── otswriter
    │       ├── postgresqlwriter
    │       ├── rdbmswriter
    │       ├── sqlserverwriter
    │       ├── streamwriter
    │       └── txtfilewriter
    ├── script
    │   └── Readme.md
    └── tmp
        └── readme.txt

DataX全量同步 Mysql-HDFS

DataX配置

 {
    "job":{
        "setting":{
            "errorLimit":{
                "record":1,
                "percentage":0.2
            },
            "speed": {
                 "channel":1
            }
        },
        "content":[
            {
                "reader":{
                    "name":"mysqlreader",
                    "parameter":{
                        "username":"root",
                        "password":"111",
                        "column":["id","log_type","event_time","uid"],
            "where":"event_time>='2018-08-10 01:01:01' and event_time<='2018-08-10 01:10:01'",
            "connection":[
                            {
                                "table":[
                                    "log_0",
                                    "log_1",
                                    "log_2"
                                ],
                                "jdbcUrl":[
                                    "jdbc:mysql://localhost:3306/test"
                                ]
                            }
                        ]
                    }
                },
                "writer":{
                    "name":"hdfswriter",
                    "parameter":{
                        "defaultFS":"hdfs://node1:8020",
                        "fileType":"text",
                        "path":"/test/access_log",
                        "fileName":"log_",
                        "fieldDelimiter":"\t",
                        "writeMode": "append",
                        "column":[
                           {
                            "name": "id",
                            "type": "bigint"
                            },
                            {
                             "name": "log_type",
                             "type": "string"
                            },
                            {
                             "name": "event_time",
                             "type": "string"
                            },
                            {
                             "name": "uid",
                             "type": "string"
                            }
                        ]
                    }
                }
            }
        ]
    }
}

DataX執行結果

 [[email protected] /data/software/datax]# python bin/datax.py -j"-Xms125m -Xmx125m" job/mysql_hdfs.json

任務啟動時刻                    : 2018-08-10 23:51:58
任務結束時刻                    : 2018-08-10 23:52:24
任務總計耗時                    :                 25s
任務平均流量                    :                5B/s
記錄寫入速度                    :              0rec/s
讀出記錄總數                    :                   3
讀寫失敗總數                    :                   0

[[email protected] /data/software]# hdfs dfs -cat /test/access_log/*
1   appError    2018-08-10 01:01:01 2
2   appError    2018-08-10 01:10:01 2
3   appError    2018-08-10 01:01:01 2

DataX增量同步 Mysql-HDFS

思路

增量模板+shell+crontab。
1.定義datax任務執行模板檔案 如"where":"event_time>='${start_time}' and event_time<='${end_time}'"。
2.每次定時任務啟動時根據上次偏移量替換模板檔案中增量變數。
3.執行完後記錄此次偏移量。

注意

1.時間增量,需記錄每次時間偏移量。
2.自增id增量,需記錄每次自增id偏移量。
3.不論是時間增量還是自增id增量,記得給資料同步留下足夠的時間。如從庫同步延遲。
4.實際中,要結合資料庫索引情況合理設定增量條件,提高每次查詢速度。
5.若沒有任何增量規律,datax只能做全量同步。

相關推薦

資料收集DataX

DataX DataX是阿里開源的離線資料同步工具,可以實現包括 MySQL、Oracle、MongoDB、Hive、HDFS、HBase、Elasticsearch等各種異構資料來源之間的高效同步。 DataX原理 設計理念 為了解決異

資料收集Filebeat

  Filebeat採用Go語言開發,也可用於日誌收集,相較於的Logstash,更輕量,資源佔用更少。一般部署在日誌收集的最前端。   本文基於Filebeat 6.3.2總結。 設計要點 主要元件   Filebeat主要由兩大元件組成:

資料收集Flume

Flume最初由Cloudera開發,於2011年6月貢獻給Apache,於2012成為頂級專案。在孵化這一年,基於老版本的Flume(Flume OG:Flume Original Generation 即Flume 0.9.x版本)進行重構,摒棄了Zooke

資料收集Logstash

Logstash 之前用的Logstash快忘了,好記性不如爛筆頭,好好總結一下。 Logstash由Java(Core)+Ruby(Plugin)語言編寫,是一個開源的日誌收集、處理、轉發工具。 Input產生事件,Filter修改事件,Output將事件傳

資料收集binlog同步----Maxwell

簡介 Maxwell是由Java語言編寫,Zendesk開源的binlog解析同步工具。可通過簡單配置,將binlog解析並以json的格式同步到如file,kafka,redis,RabbitMQ等

Asp.netCore安裝centos7 資料收集

虛擬機器的安裝和centos的安裝看博友的文章:https://www.cnblogs.com/zhaopei/p/netcore.html   https://www.centos.org/ centos安裝netcore 步驟 https://dotnet.microsoft.com/l

Hadoop-No.15Flume基於事件的資料收集和處理

Flume是一種分散式的可靠開源系統,用於流資料的高效收集,聚集和移動.Flume通常用於移動日誌資料.但是也能移動大量事件資料.如社交媒體訂閱,訊息佇列事件或者網路流量資料. Flume架構

JVM高級特性-三、垃圾收集判斷對象存活算法

地方法 size none ava 裏的 結束 靜態屬性 概述 span 一、概述   運行時數據區中,程序計數器、虛擬機棧、本地方法棧都是隨線程而生隨線程而滅的   因此,他們的內存分配和回收是確定的,在方法或線程結束時就回收。而Java堆和方   法區則是不確定的

WordPress資料收集,以後整理

tiny log word res utm nbsp dao 指定 html WordPress主題開發:實現分頁功能 http://www.cnblogs.com/tinyphp/p/6361901.html WordPress如何調取顯示指定文章 https://w

信息收集DNS信息收集 -- dnsenum

域名信息收集 dnsenum 滲透思路dnsenum 由perl編寫的一款多線程的、可指定DNS服務器、支持域名爆破、支持不同網速情況下的工具調優、結果可導入到其他工具中使用的一款DNS信息收集工具。(網上大佬們都說可以用來查不連續的IP段,這是在說什麽呢?現在還沒有相關的認知,求解答)語法: dnse

Android_連接數據庫_資料收集

解析 註意 target 實用 學習筆記 microsoft family 服務 ims 1、http://blog.csdn.net/conowen/article/details/7435231/  (Android學習筆記(21)————利用JDBC連接服務器數據庫)

QT學習資料收集

++ baidu 實現 收集 hello tornado 學習之路 share 學習指南 幾個專欄 Qt學習之路(3):Hello, world!(續) - 豆子空間 - 51CTO技術博客 http://devbean.blog.51cto.com/448512/

軟件測試_資料收集

info 再看 測試 .com 層次 span 收集 經典 平臺 JMETER jmeter的擴展性實在是太強大了,涉及到各種數據庫,各種服務器,各種類型的接口,甚至是大數據平臺。想要吃透真的不是一兩年時間能做到的。 Selenium selenium的經典文檔不多,但是

資料收集】PCA降維

post hive ron str AD span clas htm logs 重點整理: PCA(Principal Components Analysis)即主成分分析,是圖像處理中經常用到的降維方法 1、原始數據: 假定數據是二維的 x=[2.5, 0.5, 2.2,

信息收集域名、IP互查

域名轉IP目的 Linux下通過shell終端查詢某域名的IP地址、通過IP地址查詢綁定的域名。並 整理返回結果,創建python工具。 環境 linux + 命令行 工具 1. ping 2. host 3. dig 4. nslookup 工具一:PING --- 簡單粗暴 使用ping命令發送一

信息收集DNS信息收集 -- fierce

fierce dns信息收集 目錄 工具描述 參數解釋 爆破子域名 自定義字典爆破子域名 反查指定範圍的IP段 反查指定域名附近的IP段 反查指定域名所在的C段IP 掃描優化:自定義超時時間、多線程 工具描述 Fierce是一款IP、域名互查的DNS工具,可進行域傳送漏洞檢測、字典爆破子域名、反查

網絡收音機資料收集

缺點 才有 流媒體 temp 返回 過程 group getchild 當前 最近打算利用業余時間,編寫一個Android的網絡收音機。因為我自己偶爾也喜歡聽聽廣播,所以打算用業余時間編寫一個網絡版收音機。說起收音機,其實在工作中已經編寫過一個,不過那個收音機是需要硬

幽門螺桿菌資料收集

出現 家庭 如果 十二指腸 發展 超過 經濟 沒有 世界 幽門螺桿菌資料收集 它會引起胃黏膜輕微的慢性發炎,甚或導致胃及十二指腸潰瘍與胃癌。 發現胃幽門螺旋桿菌在人群中的感染率與社會經濟、家庭衛生環境、教育水準或者個人衛生習慣有極大的關系。 超過 80% 的帶原者沒有表象

敏捷開發資料收集

一點 敏捷軟件開發 應對 nta 方向 div 計劃 是你 answer 1、Manifesto for Agile Software Development Individuals and interactions over processes and tools

信息收集端口掃描 -- Amap

... rpc 發現 使用 print tar protocol 一個 創建 man amap amap [Mode] [Options] <target> <port/portrange> [<port> ...] 掃描準確度優化 1.