1. 程式人生 > 其它 >DataX二次開發-適配自定義資料來源

DataX二次開發-適配自定義資料來源

背景

DataX目前已經有了比較全面的外掛體系,主流的RDBMS資料庫、NOSQL、大資料計算系統都已經接入,這裡介紹通用RDBMS資料庫如何進行自定義擴充套件註冊進DataX。

  • 需求:擴充套件SAP HANA資料來源(其他資料來源都可)相關的讀寫外掛能夠進行異構資料來源的同步工作。

環境準備


從github上拉取最新的DataX原始碼


git clone https://github.com/alibaba/DataX.git 

匯入idea後新建模組 hanareader\hanawriter

參考DataX提供的外掛開發寶典,可建立以下目錄結構
請點選:DataX外掛開發寶典


├─hanareader
│  │  pom.xml
│  │
│  ├─doc
│  │      hanareader.md
│  │
│  └─src
│      └─main
│          ├─assembly
│          │      package.xml
│          │
│          ├─java
│          │  └─com
│          │      └─alibaba
│          │          └─datax
│          │              └─plugin
│          │                  └─reader
│          │                      └─hanareader
│          │                              Constant.java
│          │                              HanaReader.java
│          │                              HanaReaderErrorCode.java
│          │
│          ├─libs
│          │      ngdbc-2.8.12.jar
│          │
│          └─resources
│                  plugin.json
│                  plugin_job_template.json
│
├─hanawriter
│  │  pom.xml
│  │
│  ├─doc
│  │      hanawriter.md
│  │
│  └─src
│      └─main
│          ├─assembly
│          │      package.xml
│          │
│          ├─java
│          │  └─com
│          │      └─alibaba
│          │          └─datax
│          │              └─plugin
│          │                  └─writer
│          │                      └─hanawriter
│          │                              HanaWriter.java
│          │
│          ├─libs
│          │      ngdbc-2.8.12.jar
│          │
│          └─resources
│                  plugin.json
│                  plugin_job_template.json
│ 

配置對應的plugin.json

{
    "name": "hanawriter",
    "class": "com.alibaba.datax.plugin.writer.hanawriter.HanaWriter",
    "description": "useScene: prod. mechanism: Jdbc connection using the database, execute select sql, retrieve data from the ResultSet. warn: The more you know about the database, the less problems you encounter.",
    "developer": "xmzpc"
}


{
  "name": "hanareader",
  "class": "com.alibaba.datax.plugin.reader.hanareader.HanaReader",
  "description": "useScene: prod. mechanism: Jdbc connection using the database, execute select sql, retrieve data from the ResultSet. warn: The more you know about the database, the less problems you encounter.",
  "developer": "xmzpc"
}




將HANA JDBC的jar包放入hanareader/src/main/libs、hanawriter/src/main/libs目錄下

ngdbc-2.8.12.jar


讀、寫外掛編寫


根據DataX外掛載入原理可知,plugin.json配置的name為外掛的名稱,必須全域性唯一作為jarLoaderCenter的key儲存,value為外掛對應的ClassLoader。class為外掛實現類的全限定類名,用於類載入。

因此,可新建com.alibaba.datax.plugin.reader.hanareader.HanaReader類並繼承com.alibaba.datax.common.spi.Reader類,若無特殊需求,按照通用RDBMS的讀外掛編寫即可。


HanaReader.java


package com.alibaba.datax.plugin.reader.hanareader;

import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader;
import com.alibaba.datax.plugin.rdbms.reader.Key;
import com.alibaba.datax.plugin.rdbms.reader.util.HintUtil;
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

public class HanaReader extends Reader {

    private static final DataBaseType DATABASE_TYPE = DataBaseType.HANA;

    public static class Job extends Reader.Job {
        private static final Logger LOG = LoggerFactory.getLogger(Job.class);

        private Configuration originalConfig = null;
        private CommonRdbmsReader.Job commonRdbmsReaderJob;

        @Override
        public void init() {
            this.originalConfig = super.getPluginJobConf();

            Integer fetchSize = this.originalConfig.getInt(com.alibaba.datax.plugin.rdbms.reader.Constant.FETCH_SIZE, Constant.DEFAULT_FETCH_SIZE);

            if (fetchSize < 1) {
                throw DataXException
                        .asDataXException(
                                DBUtilErrorCode.REQUIRED_VALUE,
                                String.format(
                                        "您配置的fetchSize有誤,根據DataX的設計,fetchSize : [%d] 設定值不能小於 1.",
                                        fetchSize));
            }

            LOG.info("Custom parameter [ fetchSize ] = {}", fetchSize.toString());

            this.originalConfig.set(com.alibaba.datax.plugin.rdbms.reader.Constant.FETCH_SIZE, fetchSize);
            this.commonRdbmsReaderJob = new CommonRdbmsReader.Job(DATABASE_TYPE);
            this.commonRdbmsReaderJob.init(this.originalConfig);

            // 注意:要在 this.commonRdbmsReaderJob.init(this.originalConfig); 之後執行,這樣可以直接快速判斷是否是querySql 模式
            dealHint(this.originalConfig);
        }

        @Override
        public void preCheck() {
            init();
        }

        @Override
        public void destroy() {

        }

        @Override
        public List<Configuration> split(int adviceNumber) {
            return this.commonRdbmsReaderJob.split(this.originalConfig, adviceNumber);
        }


        private void dealHint(Configuration originalConfig) {
            String hint = originalConfig.getString(Key.HINT);
            if (StringUtils.isNotBlank(hint)) {
                boolean isTableMode = originalConfig.getBool(com.alibaba.datax.plugin.rdbms.reader.Constant.IS_TABLE_MODE);
                if(!isTableMode){
                    throw DataXException.asDataXException(HanaReaderErrorCode.HINT_ERROR, "當且僅當非 querySql 模式讀取 oracle 時才能配置 HINT.");
                }
                HintUtil.initHintConf(DATABASE_TYPE, originalConfig);
            }
        }
    }

    public static class Task extends Reader.Task {
        private Configuration readerSliceConfig;
        private CommonRdbmsReader.Task commonRdbmsReaderTask;

        @Override
        public void init() {
            this.readerSliceConfig = super.getPluginJobConf();
            this.commonRdbmsReaderTask = new CommonRdbmsReader.Task(DATABASE_TYPE, super.getTaskGroupId(), super.getTaskId());
            this.commonRdbmsReaderTask.init(this.readerSliceConfig);

        }

        @Override
        public void startRead(RecordSender recordSender) {
            int fetchSize = this.readerSliceConfig.getInt(com.alibaba.datax.plugin.rdbms.reader.Constant.FETCH_SIZE);

            this.commonRdbmsReaderTask.startRead(this.readerSliceConfig, recordSender,
                    super.getTaskPluginCollector(), fetchSize);
        }

        @Override
        public void post() {
            this.commonRdbmsReaderTask.post(this.readerSliceConfig);
        }

        @Override
        public void destroy() {
            this.commonRdbmsReaderTask.destroy(this.readerSliceConfig);
        }
    }
}

寫外掛也一樣,和讀外掛類似,但是要注意DataX目前只實現了對MySQL的writeMode的配置,後面的部落格會對此進行改造。

HanaWriter.java


package com.alibaba.datax.plugin.writer.hanawriter;

import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter;
import com.alibaba.datax.plugin.rdbms.writer.Key;

import java.util.List;

public class HanaWriter extends Writer {
    private static final DataBaseType DATABASE_TYPE = DataBaseType.HANA;

    public static class Job extends Writer.Job {
        private Configuration originalConfig = null;
        private CommonRdbmsWriter.Job commonRdbmsWriterJob;

        @Override
        public void preCheck() {
            this.init();
            this.commonRdbmsWriterJob.writerPreCheck(this.originalConfig, DATABASE_TYPE);
        }

        @Override
        public void init() {
            this.originalConfig = super.getPluginJobConf();

            // warn:not like mysql, hana only support insert mode, don't use
            String writeMode = this.originalConfig.getString(Key.WRITE_MODE);
            if (null != writeMode) {
                throw DataXException
                        .asDataXException(
                                DBUtilErrorCode.CONF_ERROR,
                                String.format(
                                        "寫入模式(writeMode)配置錯誤. 因為Hana不支援配置項 writeMode: %s, Hana只能使用insert sql 插入資料. 請檢查您的配置並作出修改",
                                        writeMode));
            }
            this.commonRdbmsWriterJob = new CommonRdbmsWriter.Job(DATABASE_TYPE);
            this.commonRdbmsWriterJob.init(this.originalConfig);
        }

        // 一般來說,是需要推遲到 task 中進行pre 的執行(單表情況例外)
        @Override
        public void prepare() {
            this.commonRdbmsWriterJob.prepare(this.originalConfig);
        }

        @Override
        public List<Configuration> split(int mandatoryNumber) {
            return this.commonRdbmsWriterJob.split(this.originalConfig, mandatoryNumber);
        }

        // 一般來說,是需要推遲到 task 中進行post 的執行(單表情況例外)
        @Override
        public void post() {
            this.commonRdbmsWriterJob.post(this.originalConfig);
        }

        @Override
        public void destroy() {
            this.commonRdbmsWriterJob.destroy(this.originalConfig);
        }

    }

    public static class Task extends Writer.Task {
        private Configuration writerSliceConfig;
        private CommonRdbmsWriter.Task commonRdbmsWriterTask;

        @Override
        public void init() {
            this.writerSliceConfig = super.getPluginJobConf();
            this.commonRdbmsWriterTask = new CommonRdbmsWriter.Task(DATABASE_TYPE);
            this.commonRdbmsWriterTask.init(this.writerSliceConfig);
        }

        @Override
        public void prepare() {
            this.commonRdbmsWriterTask.prepare(this.writerSliceConfig);
        }


        public void startWrite(RecordReceiver recordReceiver) {
            this.commonRdbmsWriterTask.startWrite(recordReceiver, this.writerSliceConfig,
                    super.getTaskPluginCollector());
        }

        @Override
        public void post() {
            this.commonRdbmsWriterTask.post(this.writerSliceConfig);
        }

        @Override
        public void destroy() {
            this.commonRdbmsWriterTask.destroy(this.writerSliceConfig);
        }



        @Override
        public boolean supportFailOver() {
            String writeMode = writerSliceConfig.getString(Key.WRITE_MODE);
            return "replace".equalsIgnoreCase(writeMode);
        }

    }

}

配置檔案修改


讀外掛


  • hanareader/src/main/assembly/package.xml

這個檔案主要配置打包時檔案存放的路徑,按照其他外掛仿著改就行了


<assembly
        xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
    <id></id>
    <formats>
        <format>dir</format>
    </formats>
    <includeBaseDirectory>false</includeBaseDirectory>
    <fileSets>
        <fileSet>
            <directory>src/main/resources</directory>
            <includes>
                <include>plugin.json</include>
                <include>plugin_job_template.json</include>
            </includes>
            <outputDirectory>plugin/reader/hanareader</outputDirectory>
        </fileSet>
        <fileSet>
            <directory>target/</directory>
            <includes>
                <include>hanareader-0.0.1-SNAPSHOT.jar</include>
            </includes>
            <outputDirectory>plugin/reader/hanareader</outputDirectory>
        </fileSet>
        <fileSet>
            <directory>src/main/libs</directory>
            <includes>
                <include>*.*</include>
            </includes>
            <outputDirectory>plugin/reader/hanareader/libs</outputDirectory>
        </fileSet>
    </fileSets>

    <dependencySets>
        <dependencySet>
            <useProjectArtifact>false</useProjectArtifact>
            <outputDirectory>plugin/reader/hanareader/libs</outputDirectory>
            <scope>runtime</scope>
        </dependencySet>
    </dependencySets>
</assembly>


  • hanareader/pom.xml

pom檔案主要新增assembly plugin的支援


 <build>
        <plugins>
            <!-- compiler plugin -->
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>${jdk-version}</source>
                    <target>${jdk-version}</target>
                    <encoding>${project-sourceEncoding}</encoding>
                </configuration>
            </plugin>
            <!-- assembly plugin -->
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptors>
                        <descriptor>src/main/assembly/package.xml</descriptor>
                    </descriptors>
                    <finalName>datax</finalName>
                </configuration>
                <executions>
                    <execution>
                        <id>dwzip</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

寫外掛

寫外掛和讀外掛類似,都只是修改配置路徑,這裡不再贅述。


根目錄下的package.xml

這裡主要配置對hanareader以及hanawriter打包後文件輸出路徑。


        <fileSet>
            <directory>hanawriter/target/datax/</directory>
            <includes>
                <include>**/*.*</include>
            </includes>
            <outputDirectory>datax</outputDirectory>
        </fileSet>
        <fileSet>
            <directory>hanareader/target/datax/</directory>
            <includes>
                <include>**/*.*</include>
            </includes>
            <outputDirectory>datax</outputDirectory>
        </fileSet>

打包

打包前,先填坑:

  • oscarwriter模組沒提供oscarJDBC.jar,以此編譯不會通過,在根目錄pom檔案和package.xml移除oscarwriter的依賴即可。
  • 還有一些阿里內部的資料來源模組也進行依賴移除

通過maven打包:

   $ cd  {DataX_source_code_home}
   $ mvn -U clean package assembly:assembly -Dmaven.test.skip=true

打包成功,日誌顯示如下:


    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time:  09:45 min
    [INFO] Finished at: 2021-10-20T15:37:01+08:00
    [INFO] ------------------------------------------------------------------------

打包成功後目錄如下:

    $ cd  {DataX_source_code_home}
    $ ls ./target/datax/datax/
    bin		conf		job		lib		log		log_perf	plugin		script		tmp


並且在plugin/reader下有hanareader、plugin/writer下有hanawriter目錄,這樣自定義外掛就完成了。

原始碼

github原始碼:DataX二次開發