1. 程式人生 > >Elasticsearch 資料匯入匯出 Java 實現工具類

Elasticsearch 資料匯入匯出 Java 實現工具類

Elasticsearch 資料匯入匯出 Java 實現

最近學了elasticsearch 對它也不是非常的熟悉,它好像沒有像 mongodb 有mongodump 這樣的工具方便。

雖然也有一些別人做的外掛工具。但嫌麻煩,所以整理了網上一些大神寫程式碼。工具類如下。

如果發現有不對的地方,歡迎指正。或者可以優化的地方,歡迎指點。

package top.lrshuai.blog.util;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileNotFoundException
; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.net.InetSocketAddress; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.index.IndexRequest
; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch
.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.transport.client.PreBuiltTransportClient; /** * * @author rstyro * */ public class CopyUtil { public static void main(String[] args) throws Exception { String srcClustName="robot"; String srcIndexName="robot4"; String srcIp="127.0.0.1"; int srcPort = 9300; String tagClustName="robot"; String tagIndexName="robot6"; String tagTypeName="brain"; String tagIp="127.0.0.1"; int tagPort = 9300; esToEs(srcClustName, srcIndexName, srcIp, srcPort, tagClustName, tagIndexName, tagTypeName, tagIp, tagPort); //outToFile(srcClustName, srcIndexName, null, srcIp, srcPort, "f:\\json.txt"); //fileToEs(tagClustName, tagIndexName, tagTypeName, tagIp, tagPort, "f:\\json.txt"); } /** * 資料拷貝 * elasticsearch 到 elasticsearch * @param srcClustName 原叢集名稱 * @param srcIndexName 原索引 * @param srcIp 原ip * @param srcPort 原 transport 服務埠(預設9300的埠) * @param tagClustName 目標叢集名稱 * @param tagIndexName 目標索引 * @param tagTypeName 目標type * @param tagIp 目標ip * @param tagPort 目標transport服務埠 * @throws InterruptedException */ public static void esToEs(String srcClustName,String srcIndexName,String srcIp,int srcPort,String tagClustName,String tagIndexName,String tagTypeName,String tagIp,int tagPort) throws InterruptedException{ Settings srcSettings = Settings.builder() .put("cluster.name", srcClustName) // .put("client.transport.sniff", true) //.put("client.transport.ping_timeout", "30s") //.put("client.transport.nodes_sampler_interval", "30s") .build(); TransportClient srcClient = new PreBuiltTransportClient(srcSettings); srcClient.addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress(srcIp, srcPort))); Settings tagSettings = Settings.builder() .put("cluster.name", tagClustName) //.put("client.transport.sniff", true) // .put("client.transport.ping_timeout", "30s") // .put("client.transport.nodes_sampler_interval", "30s") .build(); TransportClient tagClient = new PreBuiltTransportClient(tagSettings); tagClient.addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress(tagIp, tagPort))); SearchResponse scrollResp = srcClient.prepareSearch(srcIndexName) .setScroll(new TimeValue(1000)) .setSize(1000) .execute().actionGet(); BulkRequestBuilder bulk = tagClient.prepareBulk(); ExecutorService executor = Executors.newFixedThreadPool(5); while(true){ bulk = tagClient.prepareBulk(); final BulkRequestBuilder bulk_new = bulk; System.out.println("查詢條數="+scrollResp.getHits().getHits().length); for(SearchHit hit : scrollResp.getHits().getHits()){ IndexRequest req = tagClient.prepareIndex().setIndex(tagIndexName) .setType(tagTypeName).setSource(hit.getSourceAsMap()).request(); bulk_new.add(req); } executor.execute(new Runnable() { @Override public void run() { bulk_new.execute(); } }); Thread.sleep(100); scrollResp = srcClient.prepareSearchScroll(scrollResp.getScrollId()) .setScroll(new TimeValue(1000)).execute().actionGet(); if(scrollResp.getHits().getHits().length == 0){ break; } } //該方法在加入執行緒佇列的執行緒執行完之前不會執行 executor.shutdown(); System.out.println("執行結束"); tagClient.close(); srcClient.close(); } /** * elasticsearch 資料到檔案 * @param clustName 叢集名稱 * @param indexName 索引名稱 * @param typeName type名稱 * @param sourceIp ip * @param sourcePort transport 服務埠 * @param filePath 生成的檔案路徑 */ public static void outToFile(String clustName,String indexName,String typeName,String sourceIp,int sourcePort,String filePath){ Settings settings = Settings.builder() .put("cluster.name", clustName) //.put("client.transport.sniff", true) // .put("client.transport.ping_timeout", "30s") // .put("client.transport.nodes_sampler_interval", "30s") .build(); TransportClient client = new PreBuiltTransportClient(settings); client.addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress(sourceIp, sourcePort))); SearchRequestBuilder builder = client.prepareSearch(indexName); if(typeName != null){ builder.setTypes(typeName); } builder.setQuery(QueryBuilders.matchAllQuery()); builder.setSize(10000); builder.setScroll(new TimeValue(6000)); SearchResponse scrollResp = builder.execute().actionGet(); try { //把匯出的結果以JSON的格式寫到檔案裡 BufferedWriter out = new BufferedWriter(new FileWriter(filePath, true)); long count = 0; while (true) { for(SearchHit hit : scrollResp.getHits().getHits()){ String json = hit.getSourceAsString(); if(!json.isEmpty() && !"".equals(json)){ out.write(json); out.write("\r\n"); count++; } } scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()) .setScroll(new TimeValue(6000)).execute().actionGet(); if(scrollResp.getHits().getHits().length == 0){ break; } } System.out.println("總共寫入資料:"+count); out.close(); client.close(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } /** * 把json 格式的檔案匯入到elasticsearch 伺服器 * @param clustName 叢集名稱 * @param indexName 索引名稱 * @param typeName type 名稱 * @param sourceIp ip * @param sourcePort 埠 * @param filePath json格式的檔案路徑 */ @SuppressWarnings("deprecation") public static void fileToEs(String clustName,String indexName,String typeName,String sourceIp,int sourcePort,String filePath){ Settings settings = Settings.builder() .put("cluster.name", clustName) //.put("client.transport.sniff", true) //.put("client.transport.ping_timeout", "30s") //.put("client.transport.nodes_sampler_interval", "30s") .build(); TransportClient client = new PreBuiltTransportClient(settings); client.addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress(sourceIp, sourcePort))); try { //把匯出的結果以JSON的格式寫到檔案裡 BufferedReader br = new BufferedReader(new FileReader(filePath)); String json = null; int count = 0; //開啟批量插入 BulkRequestBuilder bulkRequest = client.prepareBulk(); while ((json = br.readLine()) != null) { bulkRequest.add(client.prepareIndex(indexName, typeName).setSource(json)); //每一千條提交一次 count++; // if (count% 1000==0) { // System.out.println("提交了1000條"); // BulkResponse bulkResponse = bulkRequest.execute().actionGet(); // if (bulkResponse.hasFailures()) { // System.out.println("message:"+bulkResponse.buildFailureMessage()); // } // //重新建立一個bulk // bulkRequest = client.prepareBulk(); // } } bulkRequest.execute().actionGet(); System.out.println("總提交了:" + count); br.close(); client.close(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }