DataX學習筆記-Writer外掛開發(續)
阿新 • • 發佈:2019-01-03
之前那篇筆記基於的DataX版本比較低,現換成git上最新版本的DataX重新開發基於將資料寫入ElasticSearch的Writer外掛
1、檢出DataX原始碼(git clone https://github.com/alibaba/DataX.git DataX),匯入專案,新建一個eswriter的maven專案進行外掛開發。
2、在DataX安裝目錄的plugins/writer目錄下新建eswriter目錄,目錄下包含plugin_job_template.json、plugin.json、eswriter-0.0.1-SNAPSHOT.jar,同時在目錄下建立一個libs目錄,存放相關依賴的jar檔案。
相關程式碼:
package com.alibaba.datax.plugin.writer.eswriter; import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.text.ParseException; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.datax.common.element.Record; import com.alibaba.datax.common.plugin.RecordReceiver; import com.alibaba.datax.common.spi.Writer; import com.alibaba.datax.common.util.Configuration; import com.google.gson.Gson; public class ESWriter extends Writer { public static class Job extends Writer.Job { private Configuration originalConfiguration = null; @Override public void init() { this.originalConfiguration = super.getPluginJobConf(); } @Override public void prepare() { super.prepare(); } @Override public void preCheck() { super.preCheck(); } @Override public void preHandler(Configuration jobConfiguration) { super.preHandler(jobConfiguration); } @Override public void post() { super.post(); } @Override public void postHandler(Configuration jobConfiguration) { super.postHandler(jobConfiguration); } @Override public void destroy() { } @Override public List<Configuration> split(int mandatoryNumber) { List<Configuration> writerSplitConfiguration = new ArrayList<Configuration>(); for (int i = 0; i < mandatoryNumber; i++) { writerSplitConfiguration.add(this.originalConfiguration); } return writerSplitConfiguration; } } public static class Task extends Writer.Task { private Configuration writerSliceConfiguration = null; private String esClusterName = null; private String esClusterIP = null; private Integer esClusterPort = null; private String esIndex = null; private String esType = null; private String attributeNameString = null; private String attributeNameSplit = null; private String[] attributeNames = null; private String className = null; private Gson gson = null; private TransportClient client = null; private Integer batchSize = null; private static final Logger LOG = LoggerFactory.getLogger(Task.class); @Override public void init() { this.writerSliceConfiguration = super.getPluginJobConf(); this.esClusterName = writerSliceConfiguration.getString(Key.esClusterName); this.esClusterIP = writerSliceConfiguration.getString(Key.esClusterIP); this.esClusterPort = writerSliceConfiguration.getInt(Key.esClusterPort, 9300); this.esIndex = writerSliceConfiguration.getString(Key.esIndex); this.esType = writerSliceConfiguration.getString(Key.esType); this.attributeNameString = writerSliceConfiguration.getString(Key.attributeNameString); this.attributeNameSplit = writerSliceConfiguration.getString(Key.attributeNameSplit, ","); attributeNames = attributeNameString.split(attributeNameSplit); this.className = writerSliceConfiguration.getString(Key.className); this.batchSize = writerSliceConfiguration.getInt(Key.batchSize, 1000); this.gson = new Gson(); } @Override public void prepare() { super.prepare(); Settings settings = Settings.builder().put("cluster.name", esClusterName) .put("client.tansport.sniff", true).build(); client = TransportClient.builder().settings(settings).build(); List<EsServerAddress> serverAddress = new ArrayList<EsServerAddress>(); String[] esClusterIPs = esClusterIP.contains(",") ? esClusterIP.split(",") : new String[]{esClusterIP}; for (int i = 0, len = esClusterIPs.length; i < len; i++) { serverAddress.add(new EsServerAddress(esClusterIPs[i], esClusterPort)); } for (EsServerAddress address : serverAddress) { client.addTransportAddress(new InetSocketTransportAddress( new InetSocketAddress(address.getHost(), address.getPort()))); } } @Override public void preCheck() { super.preCheck(); } @Override public void preHandler(Configuration jobConfiguration) { super.preHandler(jobConfiguration); } @Override public void post() { super.post(); } @Override public void postHandler(Configuration jobConfiguration) { super.postHandler(jobConfiguration); } @Override public void destroy() { client.close(); } @Override public void startWrite(RecordReceiver lineReceiver) { List<Record> writerBuffer = new ArrayList<Record>(this.batchSize); Record record = null; while ((record = lineReceiver.getFromReader()) != null) { writerBuffer.add(record); if (writerBuffer.size() >= this.batchSize) { bulkSaveOrUpdateES(writerBuffer); writerBuffer.clear(); } } if (!writerBuffer.isEmpty()) { bulkSaveOrUpdateES(writerBuffer); writerBuffer.clear(); } } private void bulkSaveOrUpdateES(List<Record> writerBuffer) { Record record = null; Object object = null; Map<String, String> attributeValueMap = null; List<ESEntity> entities = new ArrayList<ESEntity>(); try { for (int w = 0, wlen = writerBuffer.size(); w < wlen; w++) { record = writerBuffer.get(w); object = Class.forName(className).newInstance(); int fieldNum = record.getColumnNumber(); if (null != record && fieldNum > 0) { attributeValueMap = new HashMap<String, String>(); for (int i = 0; i < fieldNum; i++) { attributeValueMap.put(attributeNames[i].toLowerCase(), record.getColumn(i).asString()); } for (Class<?> superClass = object.getClass(); superClass != Object.class; superClass = superClass.getSuperclass()) { Field[] fields = superClass.getDeclaredFields(); for (int i = 0, len = fields.length; i < len; i++) { Field field = fields[i]; String fieldNameLowerCase = field.getName().toLowerCase(); if (!attributeValueMap.containsKey(fieldNameLowerCase)) continue; String valueString = attributeValueMap.get(fieldNameLowerCase); Object value = convertValueByFieldType(field.getType(), valueString); if (field.isAccessible()) { field.set(object, value); } else { field.setAccessible(true); field.set(object, value); field.setAccessible(false); } } } entities.add((ESEntity) object); } } } catch (Exception e) { LOG.error(e.getMessage(), e); } bulkSaveOrUpdate(entities, esIndex, esType); } private void bulkSaveOrUpdate(List<ESEntity> entities, String database, String table) { if (null == entities || entities.isEmpty()) return; BulkRequestBuilder prepareBulk = client.prepareBulk(); for (ESEntity entity : entities) { IndexRequestBuilder irb = client.prepareIndex() .setIndex(database).setType(table).setId(entity.get_id()); entity.remove_id(); String source = gson.toJson(entity); irb.setSource(source); prepareBulk.add(irb); } prepareBulk.execute().actionGet(); } private Object convertValueByFieldType(Class<?> type, Object value) { Object finalValue = value; if (String.class.isAssignableFrom(type)) { finalValue = null == value ? "NA" : String.valueOf(value); } else if (Boolean.class.isAssignableFrom(type)) { finalValue = null == value ? Boolean.FALSE : Boolean.parseBoolean(String.valueOf(value)); } else if (Integer.class.isAssignableFrom(type)) { finalValue = null == value ? 0 : Integer.parseInt(String.valueOf(value)); } else if (Long.class.isAssignableFrom(type)) { finalValue = null == value ? 0 : Long.parseLong(String.valueOf(value)); } else if (Float.class.isAssignableFrom(type)) { finalValue = null == value ? 0 : Float.parseFloat(String.valueOf(value)); } else if (Double.class.isAssignableFrom(type)) { finalValue = null == value ? 0 : Double.parseDouble(String.valueOf(value)); } else if (Date.class.isAssignableFrom(type)) { try { value = null == value ? DateFormat.TIME.get().format(new Date()) : value; finalValue = DateFormat.TIME.get().parse(String.valueOf(value)); } catch (ParseException e) { LOG.error(e.getMessage(), e); } } return finalValue; } } }
package com.alibaba.datax.plugin.writer.eswriter; public final class Key { /* * @name: esClusterName * @description: elastic search cluster name */ public final static String esClusterName = "esClusterName"; /* * @name: esClusterIP * @description: elastic search cluster ip */ public final static String esClusterIP = "esClusterIP"; /* * @name: esClusterPort * @description: elastic search cluster port */ public final static String esClusterPort = "esClusterPort"; /* * @name: esIndex * @description: elastic search index */ public final static String esIndex = "esIndex"; /* * @name: esType * @description: elastic search type */ public final static String esType = "esType"; /* * @name: attributeNameString * @description: attribute name list */ public final static String attributeNameString = "attributeNameString"; /* * @name: attributeNameSplit * @description: separator to split attribute name string */ public final static String attributeNameSplit = "attributeNameSplit"; /* * @name: className * @description: qualified class name */ public final static String className = "className"; /* * @name: batchSize * @description: commit to elasticsearch batch size */ public final static String batchSize = "batchSize"; }
plugin_job_template.json
{
"name": "eswriter",
"parameter": {
"esClusterName": "",
"esClusterIP": "192.168.0.1,192.168.0.2",
"esClusterPort": "9300",
"esIndex": "user",
"esType": "student",
"attributeNameString": "id,userid,name,phone",
"attributeNameSplit": ",",
"className": "com.alibaba.datax.plugin.writer.eswriter.Student",
"batchSize": "1000"
}
}
plugin.json
{
"name": "eswriter",
"class": "com.alibaba.datax.plugin.writer.eswriter.ESWriter",
"description": {
"useScene": "only for developer test.",
"mechanism": "use datax framework to transport data to elasticsearch.",
"warn": ""
},
"developer": "wulin"
}
3、根據python bin/datax.py -r mysqlreader -w eswriter生成一個job/mysql_to_es.json檔案,填寫相關內容。
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["id,userid,name,phone"],
"connection": [
{
"jdbcUrl": ["jdbc:mysql://192.168.0.114:3306/student?useUnicode=true&characterEncoding=UTF-8"],
"table": ["info"]
}
],
"password": "123456",
"username": "test",
"where": "id < 10"
}
},
"writer": {
"name": "eswriter",
"parameter": {
"attributeNameSplit": ",",
"attributeNameString": "id,userid,name,phone",
"className": "com.alibaba.datax.plugin.writer.eswriter.Student",
"esClusterName": "elasticsearch",
"esClusterIP": "192.168.0.105,192.168.0.108",
"esClusterPort": "9300",
"esIndex": "user",
"esType": "student",
"batchSize": "1000"
}
}
}
],
"setting": {
"speed": {
"channel": "10"
}
}
}
}
4、執行python bin/datax.py job/mysql_to_es.json