DataX安裝部署-Reader外掛二次開發
DataX
DataX 是阿里巴巴集團內被廣泛使用的離線資料同步工具/平臺,實現包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各種異構資料來源之間高效的資料同步功能。
DataX詳細介紹
DataX安裝部署及小試 1.下載壓縮包: 下載頁面地址:https://github.com/alibaba/DataX 在頁面中【Quick Start】--->【Download DataX下載地址】進行下載。下載後的包名:datax.tar.gz。解壓後{datax}目錄下有{bin conf job lib log log_perf plugin script tmp}幾個目錄。
2.安裝 將下載後的壓縮包直接解壓後可用,前提是對應的java及python環境滿足要求。
System Requirements:
- Linux
- JDK(1.6以上,推薦1.6)
- Python(推薦Python2.6.X)一定要為python2,因為後面執行datax.py的時候,裡面的python的print會執行不了,導致執行不成功,會提示你print語法要加括號,python2中加不加都行 python3中必須要加,否則報語法錯,因為執行過程通過python指令碼執行,所以python3環境報錯無法執行。
- Apache Maven 3.x (Compile DataX)
3.測試 配置測試樣例:下面我們配置一組 從TXT文字到另一個TXT文字。
第一步、建立作業的配置檔案(json格式) 在bin目錄執行 python datax.py -r {your_reader} -w {your_writer}
{your_reader} 為datax\plugin\reader 目錄下的外掛名字,到包名就可以,{your_writer}相同;
如圖:txtfilereaderTest是我二次開發之後的包,在datax\plugin\reader目錄下;
執行python datax.py -r txtfilereaderTest -w txtFileWriter;會生成下面紅框json模板,只需要將其複製出來,在datax\job\目錄下新建一個newJob.josn 然後內容使用該模板。
自己配置的模板如下:
- {
- "job": {
- "content": [
- {
- "reader": {
- "name": "Test",
- "parameter": {
- "column": [
- {
- "index": 0,
- "type": "string"
- },
- {
- "index": 1,
- "type": "string"
- }
- ],
- "encoding": "utf-8",
- "fieldDelimiter": ",",
- "skipHeader": "True",
- "path": ["E:/python_project/BigData-Base/etl/test/ADDRESS02.txt"]
- }
- },
- "writer": {
- "name": "txtfilewriter",
- "parameter": {
- "dateFormat": "yyyy-MM-dd",
- "fieldDelimiter": ",",
- "fileName": "ADDRESS.txt",
- "path": "E:/datax/job/",
- "writeMode": "append"
- }
- }
- }
- ],
- "setting": {
- "speed": {
- "channel": "1"
- }
- }
- }
- }
模板具體內容怎麼配置可參考:https://github.com/alibaba/DataX ,官方每個包內都有個doc資料夾,裡面專門有配置引數說明,按照說明配置即可。
啟動:python datax.py {your_file} 如果是Windows則要寫絕對路徑。
python datax.py E:\datax\job\txtjsonTest.json
windows下亂碼修復: 我把這個工具遷移到一臺windows主機上使用時候看到控制檯友好的中文提示居然都變成了亂碼了(話說有中文提示也是我選擇他很重要的理由啊)。還好官方也給出瞭解決方案: 開啟CMD.exe命令列視窗 通過 chcp命令改變內碼表,UTF-8的內碼表為65001 chcp 65001 執行該操作後,內碼表就被變成UTF-8了。但是,在視窗中仍舊不能正確顯示UTF-8字元。 修改視窗屬性,改變字型,在命令列標題欄上點選右鍵,選擇"屬性"->"字型",將字型修改為True Type字型"Lucida Console",然後點選確定將屬性應用到當前視窗。 執行:
型別 | 資料來源 | Reader(讀) | Writer(寫) | 文件 |
---|---|---|---|---|
RDBMS 關係型資料庫 | MySQL | √ | √ | 讀 、寫 |
Oracle | √ | √ | 讀 、寫 | |
SQLServer | √ | √ | 讀 、寫 | |
PostgreSQL | √ | √ | 讀 、寫 | |
DRDS | √ | √ | 讀 、寫 | |
通用RDBMS(支援所有關係型資料庫) | √ | √ | 讀 、寫 | |
阿里雲數倉資料儲存 | ODPS | √ | √ | 讀 、寫 |
ADS | √ | 寫 | ||
OSS | √ | √ | 讀 、寫 | |
OCS | √ | √ | 讀 、寫 | |
NoSQL資料儲存 | OTS | √ | √ | 讀 、寫 |
Hbase0.94 | √ | √ | 讀 、寫 | |
Hbase1.1 | √ | √ | 讀 、寫 | |
Phoenix4.x | √ | √ | 讀 、寫 | |
MongoDB | √ | √ | 讀 、寫 | |
Hive | √ | √ | 讀 、寫 | |
無結構化資料儲存 | TxtFile | √ | √ | 讀 、寫 |
FTP | √ | √ | 讀 、寫 | |
HDFS | √ | √ | 讀 、寫 | |
Elasticsearch | √ | 寫 |
也可以參考其他部落格,測試例子基本相同,實際使用根據自己需求調整配置檔案即可,目前DataX已經迭代到了3.0版本,開源部分目前是單機版本,不過阿里內部已經可以叢集運行了,3.0中也有所體現,未來說不定叢集版本也會開源呢。
列舉一下官方文件:
DataX外掛二次開發
如果官方提供的外掛沒有自己需要用的怎麼辦,就需要自己開發需要的外掛了,可參考官方DataX外掛開發寶典。
我做了一個Reader外掛,讀取TXT文件的時候清除單引號的ETLdemo。
1. 建立一個maven工程。
模板:只需要改artifactId,name其他內容不變。
pom.xml內容,紅色標記為自己檔案
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>com.alibaba.datax</groupId> <artifactId>datax-all</artifactId> <version>0.0.1-SNAPSHOT</version> </parent> <artifactId>Test</artifactId> <name>Test</name> <description>txtFilereaderTest,並可以根據使用者配置的型別進行型別轉換,建議開發、測試環境使用。</description> <packaging>jar</packaging> <dependencies> <dependency> <groupId>com.alibaba.datax</groupId> <artifactId>datax-common</artifactId> <version>${datax-project-version}</version> <exclusions> <exclusion> <artifactId>slf4j-log4j12</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.alibaba.datax</groupId> <artifactId>plugin-unstructured-storage-util</artifactId> <version>${datax-project-version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>16.0.1</version> </dependency> </dependencies> <build> <plugins> <!-- compiler plugin --> <plugin> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.6</source> <target>1.6</target> <encoding>${project-sourceEncoding}</encoding> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptors> <descriptor>src/main/assembly/package.xml</descriptor> </descriptors> <finalName>datax</finalName> </configuration> <executions> <execution> <id>dwzip</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
2. 建立如下檔案樹
package.xml 檔案為最後打包檔案
它會讀取模板配置:plugin.json;plugin_job_template.json
需要自行修改的已標紅,其他目錄級別不可變。
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd"> <id></id> <formats> <format>dir</format> </formats> <includeBaseDirectory>false</includeBaseDirectory> <fileSets> <fileSet> <directory>src/main/resources</directory> <includes> <include>plugin.json</include> <include>plugin_job_template.json</include> </includes> <outputDirectory>plugin/reader/txtFilereaderTest</outputDirectory> </fileSet> <fileSet> <directory>target/</directory> <includes> <include>txtFilereaderTest-0.0.1-SNAPSHOT.jar</include> </includes> <outputDirectory>plugin/reader/txtFilereaderTest</outputDirectory> </fileSet> </fileSets> <dependencySets> <dependencySet> <useProjectArtifact>false</useProjectArtifact> <outputDirectory>plugin/reader/txtFilereaderTest/libs</outputDirectory> <scope>runtime</scope> </dependencySet> </dependencySets> </assembly>
plugin.json檔案:
class:你自己的class檔案目錄,我的是reader外掛,所以在reader目錄下的txtFilereaderTest包裡的Test類。
{ "name": "Test", "class": "com.alibaba.datax.plugin.reader.txtFilereaderTest.Test", "description": "useScene: test. mechanism: use datax framework to transport data from txt file. warn: The more you know about the data, the less problems you encounter.", "developer": "alibaba" }
plugin_job_template.json檔案:
該檔案用於開始生成模板的檔案,reader外掛與writer外掛各有不同,可參考官方不同包下的doc資料夾裡的說明。
{
"name": "Test",
"parameter": {
"path": [],
"encoding": "",
"column": [],
"fieldDelimiter": ""
}
}
剩下的如何編寫邏輯就要自己實現了,可以先參考官方說明:DataX外掛開發寶典
我這裡簡單實現一下去除文字中單引號,部分程式碼如下:
重寫startRead方法,並且在檔案輸入流中對讀入的文字進行字串替換處理,將“ ‘北京‘ ”,處理為:“北京”;
自己要實現的所有邏輯都在此編寫。
@Override public void startRead(RecordSender recordSender) { LOG.debug("start read source files..."); for (String fileName : this.sourceFiles) { LOG.info(String.format("reading file : [%s]", fileName)); InputStream inputStream; InputStream inputStreamEtl; try { inputStream = new FileInputStream(fileName); try { inputStreamEtl = string_InputStream(inputStream_String(inputStream)); com.alibaba.datax.plugin.unstructuredstorage.reader.UnstructuredStorageReaderUtil.readFromStream(inputStreamEtl, fileName, this.readerSliceConfig, recordSender, this.getTaskPluginCollector()); recordSender.flush(); } catch (Exception e) { e.printStackTrace(); } } catch (FileNotFoundException e) { // warn: sock 檔案無法read,能影響所有檔案的傳輸,需要使用者自己保證 String message = String .format("找不到待讀取的檔案 : [%s]", fileName); LOG.error(message); throw DataXException.asDataXException( TxtFileReaderErrorCode.OPEN_FILE_ERROR, message); } } LOG.debug("end read source files..."); } } /** * inputStream to String * @param in * @return "去除字串中引號" * @throws Exception */ public static String inputStream_String(InputStream in) throws Exception { java.io.ByteArrayOutputStream swapStream = new java.io.ByteArrayOutputStream(); int ch; while ((ch = in.read()) != -1) { swapStream.write(ch); } /** * ETL: 去除字元之間的單引號 * '北京', '大興', * 北京, 大興, */ return swapStream.toString().replace("'",""); } /** * String to inputStream * @param str * @return * @throws Exception */ public static ByteArrayInputStream string_InputStream(String str) throws Exception{ ByteArrayInputStream stream= new ByteArrayInputStream(str.getBytes()); return stream; }
3. 打包部署
編寫好的檔案按照上圖檔案樹放置,然後新增自己外掛到Git clone下的原始碼最外層package.xml和pom.xml 檔案。
package.xml
directory:為你的外掛包名,不是類名
<fileSet>
<directory>txtFilereaderTest/target/datax/</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
pom.xml
同上,新增自己包名,新增至reader樹內,我使用idea開發,所以maven工程可直接打包。
注意:該pom檔案用maven打包會將所有外掛都重新打一次包,如果只想打自己的包,將其他reader和writer包都註釋掉,不然需要好久
<module>txtFileReaderTest</module>
4. 打包成功
應該包含以下目錄及檔案:
libs為所有依賴,另外兩個json檔案上面提過,一個載入class檔案,一個生成配置;
執行:python datax.py -r txtfilereader -w txtFileWriter;
#### 據說功能很強大,還在摸索中