DataX學習筆記-Reader外掛開發
阿新 • • 發佈:2019-01-26
DataX開發基於讀取ElasticSearch資料的Reader外掛
1、檢出DataX原始碼(git clone https://github.com/alibaba/DataX.git DataX),匯入專案,新建一個esreader的maven專案進行外掛開發。
2、在DataX安裝目錄的plugins/reader目錄下新建esreader目錄,目錄下包含plugin_job_template.json、plugin.json、esreader-0.0.1-SNAPSHOT.jar,同時在目錄下建立一個libs目錄,存放相關依賴的jar檔案。
相關程式碼:
package com.alibaba.datax.plugin.reader.esreader; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.datax.common.element.Record; import com.alibaba.datax.common.element.StringColumn; import com.alibaba.datax.common.plugin.RecordSender; import com.alibaba.datax.common.spi.Reader; import com.alibaba.datax.common.util.Configuration; import com.google.gson.Gson; import com.umeng.es.config.EsServerAddress; public class ESReader extends Reader { public static class Job extends Reader.Job { private Configuration originalConfiguration = null; @Override public void preCheck() { super.preCheck(); } @Override public void preHandler(Configuration jobConfiguration) { super.preHandler(jobConfiguration); } @Override public void init() { this.originalConfiguration = super.getPluginJobConf(); } @Override public void prepare() { super.prepare(); } @Override public void post() { super.post(); } @Override public void postHandler(Configuration jobConfiguration) { super.postHandler(jobConfiguration); } @Override public void destroy() { } @Override public List<Configuration> split(int adviceNumber) { List<Configuration> readerSplitConfigurations = new ArrayList<Configuration>(); for (int i = 0; i < adviceNumber; i++) { Configuration readerSplitConfiguration = this.originalConfiguration.clone(); readerSplitConfigurations.add(readerSplitConfiguration); } return readerSplitConfigurations; } } public static class Task extends Reader.Task { private Configuration readerSliceConfiguration = null; private String esClusterName = null; private String esClusterIP = null; private Integer esClusterPort = null; private String esIndex = null; private String esType = null; private Gson gson = null; private TransportClient client = null; private Integer batchSize = null; private static final Logger LOG = LoggerFactory.getLogger(Task.class); @Override public void preCheck() { super.preCheck(); } @Override public void preHandler(Configuration jobConfiguration) { super.preHandler(jobConfiguration); } @Override public void init() { this.readerSplitConfiguration= super.getPluginJobConf(); this.esClusterName = readerSplitConfiguration.getString(Key.esClusterName); this.esClusterIP = readerSplitConfiguration.getString(Key.esClusterIP); this.esClusterPort = readerSplitConfiguration.getInt(Key.esClusterPort, 9300); this.esIndex = readerSplitConfiguration.getString(Key.esIndex); this.esType = readerSplitConfiguration.getString(Key.esType); this.batchSize = readerSplitConfiguration.getInt(Key.batchSize, 1000); this.gson = new Gson(); } @Override public void prepare() { super.prepare(); Settings settings = Settings.builder().put("cluster.name", esClusterName) .put("client.tansport.sniff", true).build(); client = TransportClient.builder().settings(settings).build(); List<EsServerAddress> serverAddress = new ArrayList<EsServerAddress>(); String[] esClusterIPs = esClusterIP.contains(",") ? esClusterIP.split(",") : new String[]{esClusterIP}; for (int i = 0, len = esClusterIPs.length; i < len; i++) { serverAddress.add(new EsServerAddress(esClusterIPs[i], esClusterPort)); } for (EsServerAddress address : serverAddress) { client.addTransportAddress(new InetSocketTransportAddress( new InetSocketAddress(address.getHost(), address.getPort()))); } } @Override public void post() { super.post(); } @Override public void postHandler(Configuration jobConfiguration) { super.postHandler(jobConfiguration); } @Override public void destroy() { client.close(); } @Override public void startRead(RecordSender recordSender) { SearchResponse response = client.prepareSearch(esIndex).setTypes(esType) .setQuery(QueryBuilders.matchAllQuery()).setSearchType(SearchType.QUERY_THEN_FETCH) .setScroll(new TimeValue(60000)).setSize(batchSize).setExplain(false).execute().actionGet(); int totalSize = 0; Record record = null; while (true) { SearchHit[] hitArray = response.getHits().getHits(); SearchHit hit = null; for (int i = 0, len = hitArray.length; i < len; i++) { record = recordSender.createRecord(); hit = hitArray[i]; record.addColumn(new StringColumn(gson.toJson(hit.getSource()))); recordSender.sendToWriter(record); } if (hitArray.length == 0) break; totalSize += hitArray.length; response = client.prepareSearchScroll(response.getScrollId()) .setScroll(new TimeValue(60000)).execute().actionGet(); } LOG.info("total size : " + totalSize); } } }
package com.alibaba.datax.plugin.reader.esreader; public final class Key { /* * @name: esClusterName * @description: elastic search cluster name */ public final static String esClusterName = "esClusterName"; /* * @name: esClusterIP * @description: elastic search cluster ip */ public final static String esClusterIP = "esClusterIP"; /* * @name: esClusterPort * @description: elastic search cluster port */ public final static String esClusterPort = "esClusterPort"; /* * @name: esIndex * @description: elastic search index */ public final static String esIndex = "esIndex"; /* * @name: esType * @description: elastic search type */ public final static String esType = "esType"; /* * @name: batchSize * @description: elasticsearch batch size */ public final static String batchSize = "batchSize"; }
plugin_job_template.json
{
"name": "esreader",
"parameter": {
"esClusterName": "",
"esClusterIP": "",
"esClusterPort": "",
"esIndex": "",
"esType": "",
"batchSize": ""
}
}
plugin.json
{ "name": "esreader", "class": "com.alibaba.datax.plugin.reader.esreader.ESReader", "description": { "useScene": "only for developer test.", "mechanism": "use datax framework to transport elastic search data to channel.", "warn": "Never use it in your real job." }, "developer": "wulin" }
3、根據python bin/datax.py -r esreader -w hdfswriter生成一個job/es_to_hdfs.json檔案,填寫相關內容。
{
"job": {
"content": [
{
"reader": {
"name": "esreader",
"parameter": {
"batchSize": "1000",
"esClusterIP": "192.168.0.114",
"esClusterName": "elasticsearch",
"esClusterPort": "9300",
"esIndex": "data",
"esType": "t1"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [{"name":"data","type":"string"}],
"defaultFS": "hdfs://192.168.0.114:9000",
"compress": "gzip",
"fieldDelimiter": ",",
"fileName": "esdata",
"fileType": "text",
"path": "/user/data/es",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
4、執行python bin/datax.py job/es_to_hdfs.json