DataX學習筆記-Writer外掛開發
阿新 • • 發佈:2019-01-27
本文主要是基於將資料寫入ElasticSearch開發的Writer外掛
1、檢出DataX原始碼(svn checkout http://code.taobao.org/svn/datax/trunk)
2、在com.taobao.datax.plugins.writer包下面建立一個eswriter包,新建ESWriter.java,ParamKey.java
package com.taobao.datax.plugins.writer.eswriter; import org.apache.log4j.Logger; import com.taobao.datax.common.exception.DataExchangeException; import com.taobao.datax.common.exception.ExceptionTracker; import com.taobao.datax.common.plugin.Line; import com.taobao.datax.common.plugin.LineReceiver; import com.taobao.datax.common.plugin.PluginStatus; import com.taobao.datax.common.plugin.Writer; import com.taobao.datax.plugins.writer.eswriter.ParamKey; public class ESWriter extends Writer { private String singleCurl = ""; private String nullString = ""; private String columnNameString = ""; private String columnNameSplit = ""; private String[] columnNames = null; private Logger logger = Logger.getLogger(ESWriter.class.getCanonicalName()); @Override public int init() { this.singleCurl = param.getValue(ParamKey.singleCurl, "curl -XPOST 'http://192.168.0.108:9200/user/student/{id}' -d '{data}'"); this.nullString = param.getValue(ParamKey.nullChar, this.nullString); this.columnNameString = param.getValue(ParamKey.columnNames, "userid,name,phone"); this.columnNameSplit = param.getValue(ParamKey.columnNameSplit, ","); this.columnNames = columnNameString.split(columnNameSplit); return PluginStatus.SUCCESS.value(); } @Override public int connect() { return PluginStatus.SUCCESS.value(); } private String makeCurl(Line line) { if (line == null || line.getFieldNum() == 0) { return this.singleCurl + "\n"; } String item = null; int num = line.getFieldNum(); this.singleCurl = this.singleCurl.replace("{id}", line.getField(0)); StringBuilder sb = new StringBuilder().append("{"); for (int i = 1; i < num; i++) { item = line.getField(i); sb.append("\"").append(columnNames[i-1]).append("\":\""); if (null == item) { sb.append(nullString); } else { sb.append(item); } sb.append("\","); } sb.deleteCharAt(sb.length() - 1); sb.append("}"); return new StringBuilder(this.singleCurl).toString().replace("{data}", sb.toString()); } @Override public int startWrite(LineReceiver receiver) { Line line; try { while ((line = receiver.getFromReader()) != null) { String curlCommand = makeCurl(line); String[] commands = new String[3]; commands[0] = "/bin/sh"; commands[1] = "-c"; commands[2] = curlCommand; Runtime.getRuntime().exec(commands); } return PluginStatus.SUCCESS.value(); } catch (Exception e) { logger.error(ExceptionTracker.trace(e)); throw new DataExchangeException(e.getCause()); } } @Override public int commit() { return 0; } @Override public int finish() { return 0; } }
package com.taobao.datax.plugins.writer.eswriter; public final class ParamKey { /* * @name: singleCurl * @description: single curl * @range: * @mandatory: false * @default: */ public final static String singleCurl = "single_curl"; /* * @name: nullChar * @description: replace null with the nullchar * @range: * @mandatory: false * @default: */ public final static String nullChar = "null_char"; /* * @name: columnNames * @description: column name list * @range: * @mandatory: false * @default: */ public final static String columnNames = "column_names"; /* * @name: columnNameSplit * @description: separator to split column names * @range: * @mandatory: false * @default:\t */ public final static String columnNameSplit = "column_name_split"; /* * @name:concurrency * @description:concurrency of the job * @range:1 * @mandatory: false * @default:1 */ public final static String concurrency = "concurrency"; }
3、在DataX安裝目錄的conf目錄下的plugins.xml檔案中新增以下內容
<plugin> <!-- the version of this plugin --> <version>1</version> <!-- the name of this plugin, it must be unique in this file --> <name>eswriter</name> <!-- reader | writer | spliter -->上面的步驟是基於ElasticSearch的CURL方式操作的,下面的步驟是基於ElasticSearch的JavaAPI方式操作的
程式碼如下:
package com.taobao.datax.plugins.writer.esbulkwriter;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import com.google.gson.Gson;
import com.ningmeng.es.commons.ESUtils;
import com.taobao.datax.common.exception.DataExchangeException;
import com.taobao.datax.common.exception.ExceptionTracker;
import com.taobao.datax.common.plugin.Line;
import com.taobao.datax.common.plugin.LineReceiver;
import com.taobao.datax.common.plugin.PluginStatus;
import com.taobao.datax.common.plugin.Writer;
import com.umeng.es.config.EsConfig;
import com.umeng.es.config.EsServerAddress;
public class ESBulkWriter extends Writer {
private NingMengESUtils esUtils = null;
private List<String> pojos = null;
private Logger logger = Logger.getLogger(ESBulkWriter.class.getCanonicalName());
@Override
public int init() {
this.pojos = new ArrayList<String>();
return PluginStatus.SUCCESS.value();
}
@Override
public int connect() {
List<EsServerAddress> serverAddress = new ArrayList<EsServerAddress>();
serverAddress.add(new EsServerAddress("192.168.0.1", 9300));
serverAddress.add(new EsServerAddress("192.168.0.2", 9300));
this.esUtils = new ESUtils(new EsConfig("youmeng", serverAddress));
return PluginStatus.SUCCESS.value();
}
@Override
public int startWrite(LineReceiver receiver) {
Line line = null;
Student student = null;
Gson gson = new Gson();
try {
while ((line = receiver.getFromReader()) != null) {
if (null != line && line.getFieldNum() > 0) {
student = new Student();
String field1 = line.getField(0);
if (StringUtils.isNotBlank(field1)) {
student.setId(Integer.parseInt(field1));
}
String field2 = line.getField(1);
if (StringUtils.isNotBlank(field2)) {
student.setUserid(Long.parseLong(field2));
}
String field3 = line.getField(2);
if (StringUtils.isNotBlank(field3)) {
student.setName(field3);
}
String field4 = line.getField(3);
if (StringUtils.isNotBlank(field4)) {
student.setPhone(field4);
}
pojos.add(gson.toJson(student));
}
}
return PluginStatus.SUCCESS.value();
} catch (Exception e) {
logger.error(ExceptionTracker.trace(e));
throw new DataExchangeException(e.getCause());
}
}
@Override
public int commit() {
esUtils.bulkSaveOrUpdate(pojos, "user", "student");
return PluginStatus.SUCCESS.value();
}
@Override
public int finish() {
return 0;
}
class Student implements Serializable {
private static final long serialVersionUID = 1L;
private Integer id;
private Long userid;
private String name;
private String phone;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public Long getUserid() {
return userid;
}
public void setUserid(Long userid) {
this.userid = userid;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPhone() {
return phone;
}
public void setPhone(String phone) {
this.phone = phone;
}
}
}
其他步驟同上面步驟相同,但是需要將相關依賴的Jar包放到DataX安裝目錄的plugins/writer/eswriter目錄下。