1. 程式人生 > >遠端讀取elasticSearch資料庫並匯出資料

遠端讀取elasticSearch資料庫並匯出資料

最近剛開完題,畢設是使用機器學習演算法對電磁資料中的異常進行檢測。所有的電磁資料都儲存在分散式資料庫es中,所以第一步需要匯出資料,這兩天寫了點這部分的程式,已經匯出部分資料。

package org.elasticsearch.esTest;

import java.awt.List;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet
; import java.util.concurrent.ExecutionException; //maven管理依賴 import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; import org.elasticsearch
.client.transport.TransportClient; import org.elasticsearch.index.query.QueryBuilders.*; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings
; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.search.SearchHits; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; /** * Hello world! * */ public class EsClient { static File trace = new File("E:/es data/emcas-2018.01.04_trace.txt"); static File warning = new File("E:/es data/emcas-2018.01.04_warning.txt"); static File other = new File("E:/es data/emcas-2018.01.04_other.txt"); public static Client getClient() throws IOException { Settings settings = ImmutableSettings.settingsBuilder() .put("cluster.name", "estest1") .build(); TransportClient client = new TransportClient(settings).addTransportAddress( new InetSocketTransportAddress("10.10.41.153", 9300)); // FileWriter fw = new FileWriter(article); // BufferedWriter bfw = new BufferedWriter(fw); return client; } public static HashSet<String> write2File(Client client) throws IOException{ long start = System.currentTimeMillis(); int scrollSize = 1000; SearchResponse response = null; FileWriter fw_trace1 = new FileWriter(trace); BufferedWriter bfw1 = new BufferedWriter(fw_trace1); FileWriter fw_warning1 = new FileWriter(warning); BufferedWriter bfw2 = new BufferedWriter(fw_warning1); FileWriter fw_other1 = new FileWriter(trace); BufferedWriter bfw3 = new BufferedWriter(fw_other1); // ArrayList<Integer>collectid = new ArrayList<Integer>(); HashSet collectid = new HashSet(); int i =0; while (response == null || response.getHits().hits().length != 0 && i <=1) { // if(i % 100 == 0){ // fw = new FileWriter(autoCreateFile(i/10+1)); // BufferedWriter bfw1 = new BufferedWriter(fw); // bfw = bfw1; // System.out.println("這是第"+i/10+"萬條資料"); // } try{ response = client.prepareSearch("emcas-2017.10.16") .setQuery(QueryBuilders.matchAllQuery()) .setSize(scrollSize) .setFrom(i*scrollSize) // .setFrom(0) .execute() .actionGet(); } catch (IndexMissingException e) { System.out.println("not found"); } SearchHits hits = response.getHits(); int trace_count = 0; int warning_count =0; int other_count = 0; for(int j = 0 ; j < hits.getHits().length; j++){ String jsonstr = hits.getHits()[j] .getSourceAsString(); JSONObject json_1 = JSON.parseObject(jsonstr); System.out.println(json_1); if(json_1.get("eventType").equals("trace")){ trace_count++; collectid.add(json_1.get("collectorId")); if(trace_count % 100000 == 0){ FileWriter fw_trace2 = new FileWriter(autoCreateFile(trace_count/100000)); BufferedWriter bfw_trace = new BufferedWriter(fw_trace2); bfw1 = bfw_trace; } bfw1.write(json_1.toString()+'\r'); bfw1.flush(); }else if(json_1.get("eventType").equals("warning")){ warning_count++; if(warning_count % 100 == 0){ FileWriter fw_warning2 = new FileWriter(autoCreateFile(warning_count/100)); BufferedWriter bfw_warning2 = new BufferedWriter(fw_warning2); bfw2 = bfw_warning2; } bfw2.write(json_1.toString()+'\r'); bfw2.flush(); }else{ other_count++; if(other_count % 100 == 0){ FileWriter fw_other2 = new FileWriter(autoCreateFile(other_count/100)); BufferedWriter bfw_other2 = new BufferedWriter(fw_other2); bfw3 = bfw_other2; } bfw3.write(json_1.toString()+'\r'); bfw3.flush(); } } i++; } bfw1.close(); bfw2.close(); bfw3.close(); fw_other1.close(); fw_trace1.close(); fw_warning1.close(); long end = System.currentTimeMillis(); long totalTime = end - start; System.out.println("總耗時:"+totalTime); return collectid; } public static File autoCreateFile(int i ) throws IOException { File file = new File("E:/es data/"+i+".txt"); file.createNewFile(); return file; } public static void main(String[] args) throws InterruptedException, ExecutionException, IOException { EsClient instance = new EsClient(); Client client = instance.getClient(); HashSet hashSet = new HashSet(); hashSet = write2File(client); for (Object object : hashSet) { System.out.println(object); } System.out.println(hashSet.size()+"size!!!!!!!!"); // GetResponse response = client.prepareGet("emcas-2017.10.18","trace","AV8tK5NeSBmsIUk260HQ") // GetResponse response = client.prepareGet("emcas-2017.10.18","status","4") // .execute() // .actionGet(); // System.out.println(response.getSource()); //用於計算es資料庫中一個index下docs的總記錄數 // SearchResponse response2 = client.prepareSearch("emcas-2018.01.04") // .setQuery(QueryBuilders.matchAllQuery()) // .setSize(0) // .execute() // .actionGet(); // SearchHits hits = response2.getHits(); // long hitscount = hits.getTotalHits(); // System.out.println(hitscount); } }