1. 程式人生 > >DataX學習筆記-Writer外掛開發

DataX學習筆記-Writer外掛開發

本文主要是基於將資料寫入ElasticSearch開發的Writer外掛

1、檢出DataX原始碼(svn checkout http://code.taobao.org/svn/datax/trunk)

2、在com.taobao.datax.plugins.writer包下面建立一個eswriter包,新建ESWriter.java,ParamKey.java

package com.taobao.datax.plugins.writer.eswriter;

import org.apache.log4j.Logger;

import com.taobao.datax.common.exception.DataExchangeException;
import com.taobao.datax.common.exception.ExceptionTracker;
import com.taobao.datax.common.plugin.Line;
import com.taobao.datax.common.plugin.LineReceiver;
import com.taobao.datax.common.plugin.PluginStatus;
import com.taobao.datax.common.plugin.Writer;
import com.taobao.datax.plugins.writer.eswriter.ParamKey;

public class ESWriter extends Writer {

    private String singleCurl = "";

    private String nullString = "";

    private String columnNameString = "";

    private String columnNameSplit = "";

    private String[] columnNames = null;

    private Logger logger = Logger.getLogger(ESWriter.class.getCanonicalName());

    @Override
    public int init() {
        this.singleCurl = param.getValue(ParamKey.singleCurl, 
                "curl -XPOST 'http://192.168.0.108:9200/user/student/{id}' -d '{data}'");
        this.nullString = param.getValue(ParamKey.nullChar, this.nullString);
        this.columnNameString = param.getValue(ParamKey.columnNames, "userid,name,phone");
        this.columnNameSplit = param.getValue(ParamKey.columnNameSplit, ",");
        this.columnNames = columnNameString.split(columnNameSplit);
        return PluginStatus.SUCCESS.value();
    }

    @Override
    public int connect() {
        return PluginStatus.SUCCESS.value();
    }

    private String makeCurl(Line line) {
        if (line == null || line.getFieldNum() == 0) {
            return this.singleCurl + "\n";
        }
        String item = null;
        int num = line.getFieldNum();
        this.singleCurl = this.singleCurl.replace("{id}", line.getField(0));
        StringBuilder sb = new StringBuilder().append("{");
        for (int i = 1; i < num; i++) {
            item = line.getField(i);
            sb.append("\"").append(columnNames[i-1]).append("\":\"");
            if (null == item) {
                sb.append(nullString);
            } else {
                sb.append(item);
            }
            sb.append("\",");
        }
        sb.deleteCharAt(sb.length() - 1);
        sb.append("}");
        return new StringBuilder(this.singleCurl).toString().replace("{data}", sb.toString());
    }

    @Override
    public int startWrite(LineReceiver receiver) {
        Line line;
        try {
            while ((line = receiver.getFromReader()) != null) {
                String curlCommand = makeCurl(line);
                String[] commands = new String[3];  
                commands[0] = "/bin/sh";  
                commands[1] = "-c";  
                commands[2] = curlCommand;  
                Runtime.getRuntime().exec(commands);
            }
            return PluginStatus.SUCCESS.value();
        }  catch (Exception e) {
            logger.error(ExceptionTracker.trace(e));
            throw new DataExchangeException(e.getCause());
        }
    }

    @Override
    public int commit() {
        return 0;
    }

    @Override
    public int finish() {
        return 0;
    }

}
package com.taobao.datax.plugins.writer.eswriter;

public final class ParamKey {

    /*
     * @name: singleCurl 
     * @description:  single curl
     * @range: 
     * @mandatory: false
     * @default:
     */
    public final static String singleCurl = "single_curl";

    /*
     * @name: nullChar
     * @description:  replace null with the nullchar
     * @range: 
     * @mandatory: false
     * @default: 
     */
    public final static String nullChar = "null_char";

    /*
     * @name: columnNames
     * @description:  column name list 
     * @range: 
     * @mandatory: false
     * @default: 
     */
    public final static String columnNames = "column_names";

    /*
     * @name: columnNameSplit
     * @description: separator to split column names
     * @range:
     * @mandatory: false
     * @default:\t
     */
    public final static String columnNameSplit = "column_name_split";

     /*
       * @name:concurrency
       * @description:concurrency of the job
       * @range:1
       * @mandatory: false
       * @default:1
       */
    public final static String concurrency = "concurrency";

}

3、DataX安裝目錄的conf目錄下的plugins.xml檔案中新增以下內容

<plugin>         <!-- the version of this plugin -->         <version>1</version>         <!-- the name of this plugin, it must be unique in this file -->         <name>eswriter</name>         <!-- reader | writer | spliter -->
        <type>writer</type>         <!-- mysql | oracle | hdfs | feitian -->         <target>es</target>         <!-- the jar's filename -->         <jar>eswriter-1.0.0.jar</jar>         <!-- pulgin's class name, must include package path -->         <class>com.taobao.datax.plugins.writer.eswriter.ESWriter</class>         <!-- max concurrency the plugin can support -->         <maxthreadnum>10</maxthreadnum>  </plugin> 4、修改build.xml檔案中部分內容 <target name="plugindist" depends="clean,compile">         <foreach target="eachplugindist" param="var">             <path id="plugins">                 <pathelement path="${classes.dir}/com/taobao/datax/plugins/writer/hdfswriter/1.0.0" />                 <pathelement path="${classes.dir}/com/taobao/datax/plugins/writer/oraclewriter/1.0.0" />                 <pathelement path="${classes.dir}/com/taobao/datax/plugins/writer/mysqlwriter/1.0.0" />                            <pathelement path="${classes.dir}/com/taobao/datax/plugins/reader/hdfsreader/1.0.0" />                 <pathelement path="${classes.dir}/com/taobao/datax/plugins/reader/oraclereader/1.0.0" />                 <pathelement path="${classes.dir}/com/taobao/datax/plugins/reader/mysqlreader/1.0.0" />                 <pathelement path="${classes.dir}/com/taobao/datax/plugins/reader/httpreader/1.0.0" />                 <pathelement path="${classes.dir}/com/taobao/datax/plugins/writer/hbasewriter/1.0.0" />                 <pathelement path="${classes.dir}/com/taobao/datax/plugins/reader/hbasereader/1.0.0" />                 <pathelement path="${classes.dir}/com/taobao/datax/plugins/writer/streamwriter/1.0.0" />                 <pathelement path="${classes.dir}/com/taobao/datax/plugins/reader/streamreader/1.0.0" />                 <pathelement path="${classes.dir}/com/taobao/datax/plugins/reader/fakereader/1.0.0" />                 <pathelement path="${classes.dir}/com/taobao/datax/plugins/reader/sqlserverreader/1.0.0" />                 <pathelement path="${classes.dir}/com/taobao/datax/plugins/writer/eswriter/1.0.0" />                 <pathelement path="${classes.dir}/com/taobao/datax/plugins/writer/esbulkwriter/1.0.0" />             </path>         </foreach> </target> <target name="eswriter" depends="clean,compile">         <foreach target="eachplugindist" param="var">             <path id="plugins">                 <pathelement path="${classes.dir}/com/taobao/datax/plugins/writer/eswriter/1.0.0" />             </path>         </foreach> </target> 5、在專案目錄下執行ant命令 6、在DataX安裝目錄的plugins/writer目錄下新建eswriter目錄,並將build/plugins目錄下的 eswriter-1.0.0.jar,plugins-common-1.0.0.jar 和 ParamKey.java 放到新建的eswriter目錄下 7、在DataX安裝目錄下執行bin/datax.py -e 按步驟選擇生成一個mysqlreader_to_eswriter_1464169417703.xml 8、編輯DataX安裝目錄下jobs/mysqlreader_to_eswriter_1464169417703.xml檔案欠缺的內容 9、在DataX安裝目錄下執行bin/datax.py jobs/mysqlreader_to_eswriter_1464169417703.xml

上面的步驟是基於ElasticSearch的CURL方式操作的,下面的步驟是基於ElasticSearch的JavaAPI方式操作的

程式碼如下:

package com.taobao.datax.plugins.writer.esbulkwriter;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;

import com.google.gson.Gson;
import com.ningmeng.es.commons.ESUtils;
import com.taobao.datax.common.exception.DataExchangeException;
import com.taobao.datax.common.exception.ExceptionTracker;
import com.taobao.datax.common.plugin.Line;
import com.taobao.datax.common.plugin.LineReceiver;
import com.taobao.datax.common.plugin.PluginStatus;
import com.taobao.datax.common.plugin.Writer;
import com.umeng.es.config.EsConfig;
import com.umeng.es.config.EsServerAddress;

public class ESBulkWriter extends Writer {

    private NingMengESUtils esUtils = null;

    private List<String> pojos = null;

    private Logger logger = Logger.getLogger(ESBulkWriter.class.getCanonicalName());

    @Override
    public int init() {
        this.pojos = new ArrayList<String>();
        return PluginStatus.SUCCESS.value();
    }

    @Override
    public int connect() {
        List<EsServerAddress> serverAddress = new ArrayList<EsServerAddress>();
        serverAddress.add(new EsServerAddress("192.168.0.1", 9300));
        serverAddress.add(new EsServerAddress("192.168.0.2", 9300));
        this.esUtils = new ESUtils(new EsConfig("youmeng", serverAddress));
        return PluginStatus.SUCCESS.value();
    }

    @Override
    public int startWrite(LineReceiver receiver) {
        Line line = null;
        Student student = null;
        Gson gson = new Gson();
        try {
            while ((line = receiver.getFromReader()) != null) {
                if (null != line && line.getFieldNum() > 0) {
                    student = new Student();
                    String field1 = line.getField(0);
                    if (StringUtils.isNotBlank(field1)) {
                        student.setId(Integer.parseInt(field1));
                    }
                    String field2 = line.getField(1);
                    if (StringUtils.isNotBlank(field2)) {
                        student.setUserid(Long.parseLong(field2));
                    }
                    String field3 = line.getField(2);
                    if (StringUtils.isNotBlank(field3)) {
                        student.setName(field3);
                    }
                    String field4 = line.getField(3);
                    if (StringUtils.isNotBlank(field4)) {
                        student.setPhone(field4);
                    }
                    pojos.add(gson.toJson(student));
                }

            }
            return PluginStatus.SUCCESS.value();
        }  catch (Exception e) {
            logger.error(ExceptionTracker.trace(e));
            throw new DataExchangeException(e.getCause());
        }
    }

    @Override
    public int commit() {
        esUtils.bulkSaveOrUpdate(pojos, "user", "student");
        return PluginStatus.SUCCESS.value();
    }

    @Override
    public int finish() {
        return 0;
    }

    class Student implements Serializable {

        private static final long serialVersionUID = 1L;

        private Integer id;

        private Long userid;

        private String name;

        private String phone;

        public Integer getId() {
            return id;
        }

        public void setId(Integer id) {
            this.id = id;
        }

        public Long getUserid() {
            return userid;
        }

        public void setUserid(Long userid) {
            this.userid = userid;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String getPhone() {
            return phone;
        }

        public void setPhone(String phone) {
            this.phone = phone;
        }

    }
}

其他步驟同上面步驟相同,但是需要將相關依賴的Jar包放到DataX安裝目錄的plugins/writer/eswriter目錄下。