Elasticsearch 資料匯入匯出 Java 實現工具類
阿新 • • 發佈:2019-02-11
Elasticsearch 資料匯入匯出 Java 實現
最近學了elasticsearch 對它也不是非常的熟悉,它好像沒有像 mongodb 有mongodump 這樣的工具方便。
雖然也有一些別人做的外掛工具。但嫌麻煩,所以整理了網上一些大神寫程式碼。工具類如下。
如果發現有不對的地方,歡迎指正。或者可以優化的地方,歡迎指點。
package top.lrshuai.blog.util;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileNotFoundException ;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.index.IndexRequest ;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch .common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
/**
*
* @author rstyro
*
*/
public class CopyUtil {
public static void main(String[] args) throws Exception {
String srcClustName="robot";
String srcIndexName="robot4";
String srcIp="127.0.0.1";
int srcPort = 9300;
String tagClustName="robot";
String tagIndexName="robot6";
String tagTypeName="brain";
String tagIp="127.0.0.1";
int tagPort = 9300;
esToEs(srcClustName, srcIndexName, srcIp, srcPort, tagClustName, tagIndexName, tagTypeName, tagIp, tagPort);
//outToFile(srcClustName, srcIndexName, null, srcIp, srcPort, "f:\\json.txt");
//fileToEs(tagClustName, tagIndexName, tagTypeName, tagIp, tagPort, "f:\\json.txt");
}
/**
* 資料拷貝
* elasticsearch 到 elasticsearch
* @param srcClustName 原叢集名稱
* @param srcIndexName 原索引
* @param srcIp 原ip
* @param srcPort 原 transport 服務埠(預設9300的埠)
* @param tagClustName 目標叢集名稱
* @param tagIndexName 目標索引
* @param tagTypeName 目標type
* @param tagIp 目標ip
* @param tagPort 目標transport服務埠
* @throws InterruptedException
*/
public static void esToEs(String srcClustName,String srcIndexName,String srcIp,int srcPort,String tagClustName,String tagIndexName,String tagTypeName,String tagIp,int tagPort) throws InterruptedException{
Settings srcSettings = Settings.builder()
.put("cluster.name", srcClustName)
// .put("client.transport.sniff", true)
//.put("client.transport.ping_timeout", "30s")
//.put("client.transport.nodes_sampler_interval", "30s")
.build();
TransportClient srcClient = new PreBuiltTransportClient(srcSettings);
srcClient.addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress(srcIp, srcPort)));
Settings tagSettings = Settings.builder()
.put("cluster.name", tagClustName)
//.put("client.transport.sniff", true)
// .put("client.transport.ping_timeout", "30s")
// .put("client.transport.nodes_sampler_interval", "30s")
.build();
TransportClient tagClient = new PreBuiltTransportClient(tagSettings);
tagClient.addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress(tagIp, tagPort)));
SearchResponse scrollResp = srcClient.prepareSearch(srcIndexName)
.setScroll(new TimeValue(1000))
.setSize(1000)
.execute().actionGet();
BulkRequestBuilder bulk = tagClient.prepareBulk();
ExecutorService executor = Executors.newFixedThreadPool(5);
while(true){
bulk = tagClient.prepareBulk();
final BulkRequestBuilder bulk_new = bulk;
System.out.println("查詢條數="+scrollResp.getHits().getHits().length);
for(SearchHit hit : scrollResp.getHits().getHits()){
IndexRequest req = tagClient.prepareIndex().setIndex(tagIndexName)
.setType(tagTypeName).setSource(hit.getSourceAsMap()).request();
bulk_new.add(req);
}
executor.execute(new Runnable() {
@Override
public void run() {
bulk_new.execute();
}
});
Thread.sleep(100);
scrollResp = srcClient.prepareSearchScroll(scrollResp.getScrollId())
.setScroll(new TimeValue(1000)).execute().actionGet();
if(scrollResp.getHits().getHits().length == 0){
break;
}
}
//該方法在加入執行緒佇列的執行緒執行完之前不會執行
executor.shutdown();
System.out.println("執行結束");
tagClient.close();
srcClient.close();
}
/**
* elasticsearch 資料到檔案
* @param clustName 叢集名稱
* @param indexName 索引名稱
* @param typeName type名稱
* @param sourceIp ip
* @param sourcePort transport 服務埠
* @param filePath 生成的檔案路徑
*/
public static void outToFile(String clustName,String indexName,String typeName,String sourceIp,int sourcePort,String filePath){
Settings settings = Settings.builder()
.put("cluster.name", clustName)
//.put("client.transport.sniff", true)
// .put("client.transport.ping_timeout", "30s")
// .put("client.transport.nodes_sampler_interval", "30s")
.build();
TransportClient client = new PreBuiltTransportClient(settings);
client.addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress(sourceIp, sourcePort)));
SearchRequestBuilder builder = client.prepareSearch(indexName);
if(typeName != null){
builder.setTypes(typeName);
}
builder.setQuery(QueryBuilders.matchAllQuery());
builder.setSize(10000);
builder.setScroll(new TimeValue(6000));
SearchResponse scrollResp = builder.execute().actionGet();
try {
//把匯出的結果以JSON的格式寫到檔案裡
BufferedWriter out = new BufferedWriter(new FileWriter(filePath, true));
long count = 0;
while (true) {
for(SearchHit hit : scrollResp.getHits().getHits()){
String json = hit.getSourceAsString();
if(!json.isEmpty() && !"".equals(json)){
out.write(json);
out.write("\r\n");
count++;
}
}
scrollResp = client.prepareSearchScroll(scrollResp.getScrollId())
.setScroll(new TimeValue(6000)).execute().actionGet();
if(scrollResp.getHits().getHits().length == 0){
break;
}
}
System.out.println("總共寫入資料:"+count);
out.close();
client.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 把json 格式的檔案匯入到elasticsearch 伺服器
* @param clustName 叢集名稱
* @param indexName 索引名稱
* @param typeName type 名稱
* @param sourceIp ip
* @param sourcePort 埠
* @param filePath json格式的檔案路徑
*/
@SuppressWarnings("deprecation")
public static void fileToEs(String clustName,String indexName,String typeName,String sourceIp,int sourcePort,String filePath){
Settings settings = Settings.builder()
.put("cluster.name", clustName)
//.put("client.transport.sniff", true)
//.put("client.transport.ping_timeout", "30s")
//.put("client.transport.nodes_sampler_interval", "30s")
.build();
TransportClient client = new PreBuiltTransportClient(settings);
client.addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress(sourceIp, sourcePort)));
try {
//把匯出的結果以JSON的格式寫到檔案裡
BufferedReader br = new BufferedReader(new FileReader(filePath));
String json = null;
int count = 0;
//開啟批量插入
BulkRequestBuilder bulkRequest = client.prepareBulk();
while ((json = br.readLine()) != null) {
bulkRequest.add(client.prepareIndex(indexName, typeName).setSource(json));
//每一千條提交一次
count++;
// if (count% 1000==0) {
// System.out.println("提交了1000條");
// BulkResponse bulkResponse = bulkRequest.execute().actionGet();
// if (bulkResponse.hasFailures()) {
// System.out.println("message:"+bulkResponse.buildFailureMessage());
// }
// //重新建立一個bulk
// bulkRequest = client.prepareBulk();
// }
}
bulkRequest.execute().actionGet();
System.out.println("總提交了:" + count);
br.close();
client.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}