資料收集之DataX
DataX
DataX是阿里開源的離線資料同步工具,可以實現包括 MySQL、Oracle、MongoDB、Hive、HDFS、HBase、Elasticsearch等各種異構資料來源之間的高效同步。
DataX原理
設計理念
為了解決異構資料來源同步問題,DataX將複雜的網狀同步鏈路變成星型鏈路,DataX作為中間傳輸載體負責連線各種資料來源。當需要接入一個新的資料來源的時候,只需將此資料來源對接到DataX,便能跟已有資料來源做到無縫資料同步。
框架設計
採用Framework + plugin架構構建。將資料來源讀取和寫入抽象成為Reader/Writer外掛,納入到整個同步框架中。
- Reader:資料採集模組,負責採集資料來源的資料,將資料傳送給Framework。
- Writer: 資料寫入模組,負責不斷從Framework取資料,並將資料寫入到目的端。
- Framework:Framework用於連線Reader和Writer,作為兩者的資料傳輸通道,並處理緩衝,流控,併發,資料轉換等核心技術問題。
核心架構
- 核心模組介紹
- DataX完成單個數據同步的作業,我們稱之為Job,DataX接受到一個Job之後,將啟動一個程序來完成整個作業同步過程。DataX Job模組是單個作業的中樞管理節點,承擔了資料清理、子任務切分(將單一作業計算轉化為多個子Task)、TaskGroup管理等功能。
- DataXJob啟動後,會根據不同的源端切分策略,將Job切分成多個小的Task(子任務),以便於併發執行。Task便是DataX作業的最小單元,每一個Task都會負責一部分資料的同步工作。
- 切分多個Task之後,DataX Job會呼叫Scheduler模組,根據配置的併發資料量,將拆分成的Task重新組合,組裝成TaskGroup(任務組)。每一個TaskGroup負責以一定的併發執行完畢分配好的所有Task,預設單個任務組的併發數量為5。
- 每一個Task都由TaskGroup負責啟動,Task啟動後,會固定啟動Reader—>Channel—>Writer的執行緒來完成任務同步工作。
- DataX作業執行起來之後, Job監控並等待多個TaskGroup模組任務完成,等待所有TaskGroup任務完成後Job成功退出。否則,異常退出,程序退出值非0。
- 排程流程
舉例來說,使用者提交了一個DataX作業,並且配置了20個併發,目的是將一個100張分表的mysql資料同步到odps裡面。 DataX的排程決策思路是:
- DataXJob根據分庫分表切分成了100個Task。
- 根據20個併發,DataX計算共需要分配4個TaskGroup。
- 4個TaskGroup平分切分好的100個Task,每一個TaskGroup負責以5個併發共計執行25個Task。
DataX 部署
工具部署
```shell
[[email protected] /data/software]# wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
[[email protected] /data/software]# tar -zxvf datax.tar.gz
[[email protected] /data/software/datax]# python bin/datax.py job/job.json #自檢
```
目錄結構
[[email protected] /data/software/datax]# tree -L 3 -I '*jar*'
.
├── bin 啟動指令碼
│ ├── datax.py
│ ├── dxprof.py
│ └── perftrace.py
├── conf 核心配置
│ ├── core.json
│ └── logback.xml
├── job job目錄
│ └── job.json
├── lib 核心類庫
├── log
├── log_perf
├── plugin 插卡目錄
│ ├── reader
│ │ ├── drdsreader
│ │ ├── ftpreader
│ │ ├── hbase094xreader
│ │ ├── hbase11xreader
│ │ ├── hdfsreader
│ │ ├── mongodbreader
│ │ ├── mysqlreader
│ │ ├── odpsreader
│ │ ├── oraclereader
│ │ ├── ossreader
│ │ ├── otsreader
│ │ ├── otsstreamreader
│ │ ├── postgresqlreader
│ │ ├── rdbmsreader
│ │ ├── sqlserverreader
│ │ ├── streamreader
│ │ └── txtfilereader
│ └── writer
│ ├── adswriter
│ ├── drdswriter
│ ├── ftpwriter
│ ├── hbase094xwriter
│ ├── hbase11xsqlwriter
│ ├── hbase11xwriter
│ ├── hdfswriter
│ ├── mongodbwriter
│ ├── mysqlwriter
│ ├── ocswriter
│ ├── odpswriter
│ ├── oraclewriter
│ ├── osswriter
│ ├── otswriter
│ ├── postgresqlwriter
│ ├── rdbmswriter
│ ├── sqlserverwriter
│ ├── streamwriter
│ └── txtfilewriter
├── script
│ └── Readme.md
└── tmp
└── readme.txt
DataX全量同步 Mysql-HDFS
DataX配置
{
"job":{
"setting":{
"errorLimit":{
"record":1,
"percentage":0.2
},
"speed": {
"channel":1
}
},
"content":[
{
"reader":{
"name":"mysqlreader",
"parameter":{
"username":"root",
"password":"111",
"column":["id","log_type","event_time","uid"],
"where":"event_time>='2018-08-10 01:01:01' and event_time<='2018-08-10 01:10:01'",
"connection":[
{
"table":[
"log_0",
"log_1",
"log_2"
],
"jdbcUrl":[
"jdbc:mysql://localhost:3306/test"
]
}
]
}
},
"writer":{
"name":"hdfswriter",
"parameter":{
"defaultFS":"hdfs://node1:8020",
"fileType":"text",
"path":"/test/access_log",
"fileName":"log_",
"fieldDelimiter":"\t",
"writeMode": "append",
"column":[
{
"name": "id",
"type": "bigint"
},
{
"name": "log_type",
"type": "string"
},
{
"name": "event_time",
"type": "string"
},
{
"name": "uid",
"type": "string"
}
]
}
}
}
]
}
}
DataX執行結果
[[email protected] /data/software/datax]# python bin/datax.py -j"-Xms125m -Xmx125m" job/mysql_hdfs.json
任務啟動時刻 : 2018-08-10 23:51:58
任務結束時刻 : 2018-08-10 23:52:24
任務總計耗時 : 25s
任務平均流量 : 5B/s
記錄寫入速度 : 0rec/s
讀出記錄總數 : 3
讀寫失敗總數 : 0
[[email protected] /data/software]# hdfs dfs -cat /test/access_log/*
1 appError 2018-08-10 01:01:01 2
2 appError 2018-08-10 01:10:01 2
3 appError 2018-08-10 01:01:01 2
DataX增量同步 Mysql-HDFS
思路
增量模板+shell+crontab。
1.定義datax任務執行模板檔案 如"where":"event_time>='${start_time}' and event_time<='${end_time}'"。
2.每次定時任務啟動時根據上次偏移量替換模板檔案中增量變數。
3.執行完後記錄此次偏移量。
注意
1.時間增量,需記錄每次時間偏移量。
2.自增id增量,需記錄每次自增id偏移量。
3.不論是時間增量還是自增id增量,記得給資料同步留下足夠的時間。如從庫同步延遲。
4.實際中,要結合資料庫索引情況合理設定增量條件,提高每次查詢速度。
5.若沒有任何增量規律,datax只能做全量同步。
相關推薦
資料收集之DataX
DataX DataX是阿里開源的離線資料同步工具,可以實現包括 MySQL、Oracle、MongoDB、Hive、HDFS、HBase、Elasticsearch等各種異構資料來源之間的高效同步。 DataX原理 設計理念 為了解決異
資料收集之Filebeat
Filebeat採用Go語言開發,也可用於日誌收集,相較於的Logstash,更輕量,資源佔用更少。一般部署在日誌收集的最前端。 本文基於Filebeat 6.3.2總結。 設計要點 主要元件 Filebeat主要由兩大元件組成:
資料收集之Flume
Flume最初由Cloudera開發,於2011年6月貢獻給Apache,於2012成為頂級專案。在孵化這一年,基於老版本的Flume(Flume OG:Flume Original Generation 即Flume 0.9.x版本)進行重構,摒棄了Zooke
資料收集之Logstash
Logstash 之前用的Logstash快忘了,好記性不如爛筆頭,好好總結一下。 Logstash由Java(Core)+Ruby(Plugin)語言編寫,是一個開源的日誌收集、處理、轉發工具。 Input產生事件,Filter修改事件,Output將事件傳
資料收集之binlog同步----Maxwell
簡介 Maxwell是由Java語言編寫,Zendesk開源的binlog解析同步工具。可通過簡單配置,將binlog解析並以json的格式同步到如file,kafka,redis,RabbitMQ等
Asp.netCore之安裝centos7 資料收集
虛擬機器的安裝和centos的安裝看博友的文章:https://www.cnblogs.com/zhaopei/p/netcore.html https://www.centos.org/ centos安裝netcore 步驟 https://dotnet.microsoft.com/l
Hadoop-No.15之Flume基於事件的資料收集和處理
Flume是一種分散式的可靠開源系統,用於流資料的高效收集,聚集和移動.Flume通常用於移動日誌資料.但是也能移動大量事件資料.如社交媒體訂閱,訊息佇列事件或者網路流量資料. Flume架構
JVM高級特性-三、垃圾收集之判斷對象存活算法
地方法 size none ava 裏的 結束 靜態屬性 概述 span 一、概述 運行時數據區中,程序計數器、虛擬機棧、本地方法棧都是隨線程而生隨線程而滅的 因此,他們的內存分配和回收是確定的,在方法或線程結束時就回收。而Java堆和方 法區則是不確定的
WordPress資料收集,以後整理
tiny log word res utm nbsp dao 指定 html WordPress主題開發:實現分頁功能 http://www.cnblogs.com/tinyphp/p/6361901.html WordPress如何調取顯示指定文章 https://w
信息收集之DNS信息收集 -- dnsenum
域名信息收集 dnsenum 滲透思路dnsenum 由perl編寫的一款多線程的、可指定DNS服務器、支持域名爆破、支持不同網速情況下的工具調優、結果可導入到其他工具中使用的一款DNS信息收集工具。(網上大佬們都說可以用來查不連續的IP段,這是在說什麽呢?現在還沒有相關的認知,求解答)語法: dnse
Android_連接數據庫_資料收集
解析 註意 target 實用 學習筆記 microsoft family 服務 ims 1、http://blog.csdn.net/conowen/article/details/7435231/ (Android學習筆記(21)————利用JDBC連接服務器數據庫)
QT學習資料收集
++ baidu 實現 收集 hello tornado 學習之路 share 學習指南 幾個專欄 Qt學習之路(3):Hello, world!(續) - 豆子空間 - 51CTO技術博客 http://devbean.blog.51cto.com/448512/
軟件測試_資料收集篇
info 再看 測試 .com 層次 span 收集 經典 平臺 JMETER jmeter的擴展性實在是太強大了,涉及到各種數據庫,各種服務器,各種類型的接口,甚至是大數據平臺。想要吃透真的不是一兩年時間能做到的。 Selenium selenium的經典文檔不多,但是
【資料收集】PCA降維
post hive ron str AD span clas htm logs 重點整理: PCA(Principal Components Analysis)即主成分分析,是圖像處理中經常用到的降維方法 1、原始數據: 假定數據是二維的 x=[2.5, 0.5, 2.2,
信息收集之域名、IP互查
域名轉IP目的 Linux下通過shell終端查詢某域名的IP地址、通過IP地址查詢綁定的域名。並 整理返回結果,創建python工具。 環境 linux + 命令行 工具 1. ping 2. host 3. dig 4. nslookup 工具一:PING --- 簡單粗暴 使用ping命令發送一
信息收集之DNS信息收集 -- fierce
fierce dns信息收集 目錄 工具描述 參數解釋 爆破子域名 自定義字典爆破子域名 反查指定範圍的IP段 反查指定域名附近的IP段 反查指定域名所在的C段IP 掃描優化:自定義超時時間、多線程 工具描述 Fierce是一款IP、域名互查的DNS工具,可進行域傳送漏洞檢測、字典爆破子域名、反查
網絡收音機資料收集
缺點 才有 流媒體 temp 返回 過程 group getchild 當前 最近打算利用業余時間,編寫一個Android的網絡收音機。因為我自己偶爾也喜歡聽聽廣播,所以打算用業余時間編寫一個網絡版收音機。說起收音機,其實在工作中已經編寫過一個,不過那個收音機是需要硬
幽門螺桿菌資料收集
出現 家庭 如果 十二指腸 發展 超過 經濟 沒有 世界 幽門螺桿菌資料收集 它會引起胃黏膜輕微的慢性發炎,甚或導致胃及十二指腸潰瘍與胃癌。 發現胃幽門螺旋桿菌在人群中的感染率與社會經濟、家庭衛生環境、教育水準或者個人衛生習慣有極大的關系。 超過 80% 的帶原者沒有表象
敏捷開發資料收集
一點 敏捷軟件開發 應對 nta 方向 div 計劃 是你 answer 1、Manifesto for Agile Software Development Individuals and interactions over processes and tools
信息收集之端口掃描 -- Amap
... rpc 發現 使用 print tar protocol 一個 創建 man amap amap [Mode] [Options] <target> <port/portrange> [<port> ...] 掃描準確度優化 1.