Elastic Search(ES)使用筆記
ElasticSearch介紹:
ElasticSearch是一個基於Lucene的搜尋伺服器。它提供了一個分散式多使用者能力的全文搜尋引擎,基於RESTful web介面。Elasticsearch是用Java開發的,並作為Apache許可條款下的開放原始碼釋出,是當前流行的企業級搜尋引擎。設計用於雲端計算中,能夠達到實時搜尋,穩定,可靠,快速,安裝使用方便。
我們建立一個網站或應用程式,並要新增搜尋功能,但是想要完成搜尋工作的建立是非常困難的。我們希望搜尋解決方案要執行速度快,我們希望能有一個零配置和一個完全免費的搜尋模式,我們希望能夠簡單地使用JSON通過HTTP來索引資料,我們希望我們的搜尋伺服器始終可用,我們希望能夠從一臺開始並擴充套件到數百臺,我們要實時搜尋,我們要簡單的多租戶,我們希望建立一個雲的解決方案。因此我們利用Elasticsearch來解決所有這些問題及可能出現的更多其它問題。其作用如果你用過Solr的話,他們效果是差不多的,看你怎麼用,但是在資料量過億級別的話,ES的資料處理速度就比solr快很多。
使用
① maven依賴匯入:
<dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>6.2.2</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>6.2.2</version> </dependency>
② 工具類編寫:
package com.mvs.utils; import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.Stack; import org.apache.commons.lang3.StringUtils; import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; import org.elasticsearch.action.admin.indices.exists.types.TypesExistsRequest; import org.elasticsearch.action.admin.indices.exists.types.TypesExistsResponse; import org.elasticsearch.action.deletebyquery.DeleteByQueryAction; import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder; import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.MultiSearchRequestBuilder; import org.elasticsearch.action.search.MultiSearchResponse; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.action.update.UpdateRequestBuilder; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.MatchPhraseQueryBuilder; import org.elasticsearch.index.query.Operator; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.transport.client.PreBuiltTransportClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Repository; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.infochang.comm.utils.PUtils; import com.mvs.common.BizException; import com.mvs.model.EsSearchVo; @Repository @SuppressWarnings("resource") public class ElasticSearchUtils{ //public class ElasticSearchUtils implements InitializingBean { private static Logger log = LoggerFactory.getLogger(ElasticSearchUtils.class); private static TransportClient client; //取得例項 public static TransportClient getTransportClient(){ if (client != null) return client; else { try { Settings settings = Settings.builder() .put("cluster.name",PUtils.getString("es.cluster.name")) .put("client.transport.sniff", true) //啟動叢集嗅探 .build(); client = new PreBuiltTransportClient(settings) .addTransportAddress(new TransportAddress(InetAddress.getByName(PUtils.getString("es.cluster.url")), Integer.valueOf(PUtils.getString("es.cluster.port")))); } catch (UnknownHostException e) { log.error(">>>>>>>>>ES連線初始化失敗<<<<<<<<<", e); client = null; } } return client; } /** * 建立索引 * @param indexName 索引名稱,相當於資料庫名稱 * @param typeName 索引型別,相當於資料庫中的表名 * @param id id名稱,相當於每個表中某一行記錄的標識 * @param jsonData json資料 */ public static void createIndex(String indexName, String typeName, String id, Map<String,Object> dataMap) { TransportClient transportClient = getTransportClient(); if(transportClient!=null) { if(dataMap!=null&&dataMap.get("summary")!=null) { log.info(">>>>>>>>>ES Temp Insert Data Summary Is:"+dataMap.get("summary").toString()+"<<<<<<<<<"); } IndexRequestBuilder requestBuilder = transportClient.prepareIndex(indexName, typeName, id).setSource(dataMap); requestBuilder.setRefreshPolicy(RefreshPolicy.IMMEDIATE); requestBuilder.execute().actionGet(); // transportClient.close(); }else { log.error(">>>>>>>>>ES連線初始化失敗,建立索引失敗<<<<<<<<<"); } } /** * 批量建立索引 * @param indexName 索引名稱,相當於資料庫名稱 * @param typeName 索引型別,相當於資料庫中的表名 * @param id id名稱,相當於每個表中某一行記錄的標識 * @param jsonData json資料 */ public static void createIndexDoBatch(String indexName, String typeName, String id, List<Map<String,Object>> dataLst) { TransportClient transportClient = getTransportClient(); if(transportClient!=null) { BulkRequestBuilder bulkRequest = transportClient.prepareBulk(); if(dataLst!=null&&dataLst.size()>0) { for (Map<String,Object> dataMap : dataLst){ bulkRequest.add(transportClient.prepareIndex(indexName, typeName).setSource(dataMap)); } } bulkRequest.setRefreshPolicy(RefreshPolicy.IMMEDIATE); bulkRequest.execute().actionGet(); } } /** * 執行搜尋 * @param indexname 索引名稱 * @param type 索引型別 * @param queryBuilder 查詢條件 * @return */ public static SearchResponse searcher(String indexName, String typeName, QueryBuilder queryBuilder) { TransportClient transportClient = getTransportClient(); SearchResponse searchResponse = transportClient.prepareSearch(indexName) .setTypes(typeName).setQuery(queryBuilder).execute() .actionGet();//執行查詢 // transportClient.close(); return searchResponse; } /** * 執行搜尋 * @param indexname 索引名稱 * @param type 索引型別 * @param queryBuilder 查詢條件 * @return */ @SuppressWarnings("unchecked") public static List<Map<String,Object>> searcher(QueryBuilder queryBuilder,String indexName, String typeName) { List<Map<String,Object>> dataMap = new ArrayList<>(); TransportClient transportClient = getTransportClient(); try { SearchResponse searchResponse = transportClient.prepareSearch(indexName) .setTypes(typeName).setQuery(queryBuilder).execute() .actionGet();//執行查詢 if(searchResponse!=null) { SearchHits hits = searchResponse.getHits(); for (int i = 0; i < hits.getHits().length; i++) { if (null == hits.getHits()[i]) { continue; } else { dataMap.add(JSON.parseObject(hits.getHits()[i].getSourceAsString(), Map.class)); } } }else { log.error("ES未查詢到任何結果!!!"); } } catch (Exception e) { log.error("ES查詢異常!!!"); throw new BizException(e.getMessage(), e.getCause()); } return dataMap; } /** * 更新索引 * @param indexName 索引名稱 * @param typeName 索引型別 * @param id id名稱 * @param jsonData json資料 */ public static void updateIndex(String indexName, String typeName, String id, Map<String,Object> dataMap) { TransportClient transportClient = getTransportClient(); UpdateRequestBuilder updateRequest = transportClient.prepareUpdate(indexName, typeName, id).setDoc(dataMap); updateRequest.setRefreshPolicy(RefreshPolicy.IMMEDIATE); updateRequest.execute().actionGet(); // transportClient.close(); } public static Map<String, Object> getEsMatchFields(Map<String,Object> dataMap) { Map<String,Object> fieldMap = new HashMap<>(); try { String fields = PUtils.getString("zabbix.es.match.field"); if(StringUtils.isBlank(fields)) { log.error("*********請先在配置檔案中配置Zabbix命中欄位*********"); fieldMap = null; }else { String[] fieldArr = fields.split(",",-1); for (String key : fieldArr) { if(dataMap.get(key) == null){ log.error("*********當前資料不包含要命中的欄位,請檢查資料欄位*********"); break; } String fieldValue = dataMap.get(key).toString(); if(StringUtils.isBlank(fieldValue)) { log.error("*********當前資料不包含要命中的欄位,請檢查資料欄位*********"); break; }else { fieldMap.put(key, fieldValue); } } } } catch (Exception e) { log.error("*********設定Zabbix命中欄位資料異常*********",e); } return fieldMap; } /** * 根據查詢資料更新 * @param indexName 索引名稱 * @param typeName 索引型別 * @param flag * @param jsonData json資料 */ @SuppressWarnings({ "rawtypes"}) public static void updateByQueryNotice(String indexName, String typeName, Map<String,Object> dataMap, boolean flag) { try { TransportClient transportClient = getTransportClient(); Map<String, Object> conMap = getEsMatchFields(dataMap); if(conMap!=null&&conMap.size()>0) { List<String> fieldLst = new ArrayList<>(conMap.keySet()) ; BoolQueryBuilder builder = null; for (int i=0;i< fieldLst.size(); i++) { if(i==0) { builder = QueryBuilders.boolQuery().mustNot(QueryBuilders.matchQuery("warnStatus", "CLOSE")) .must(QueryBuilders.matchQuery(fieldLst.get(i),conMap.get(fieldLst.get(i)).toString())); }else { builder = builder.must(QueryBuilders.matchQuery(fieldLst.get(i),conMap.get(fieldLst.get(i)).toString())); } } SearchRequestBuilder responsebuilder = transportClient.prepareSearch(indexName).setTypes(typeName); SearchResponse myresponse=responsebuilder.setQuery(builder).execute().actionGet(); if(myresponse!=null) { SearchHits hits = myresponse.getHits(); for (int i = 0; i < hits.getHits().length; i++) { if (null == hits.getHits()[i]) { log.error("ES未查詢到任何結果!!!"); } else { if(flag) { log.info("ES Match Data Length Is:"+hits.getTotalHits()); Map map = JSONObject.parseObject(hits.getHits()[i].getSourceAsString(),Map.class); if(map.get("repeat_count")!=null&&StringUtils.isNotEmpty(map.get("repeat_count").toString())) { dataMap.put("repeat_count",(Integer)map.get("repeat_count")+1); updateIndex(indexName,typeName,hits.getHits()[i].getId(),dataMap); } }else { updateIndex(indexName,typeName,hits.getHits()[i].getId(),dataMap); } } } }else { log.error("ES未查詢到任何結果!!!"); } }else { log.error("當前搜尋沒有條件資料,請先確認條件是否新增"); } } catch (Exception e) { log.error("ES Query Search Failed!!!",e); } } /** * 根據查詢資料更新 * @param indexName 索引名稱 * @param typeName 索引型別 * @param flag * @param jsonData json資料 */ @SuppressWarnings({ "unchecked"}) public static List<Map<String,Object>> getZabbixMatchData(String indexName, String typeName, Map<String,Object> dataMap) { List<Map<String,Object>> resultMap = new ArrayList<>(); TransportClient transportClient = getTransportClient(); try { if(dataMap!=null&&dataMap.size()>0) { List<String> fieldLst = new ArrayList<>(dataMap.keySet()) ; BoolQueryBuilder builder = null; for (int i=0;i< fieldLst.size(); i++) { if(i==0) { builder = QueryBuilders.boolQuery().mustNot(QueryBuilders.matchQuery("warnStatus", "CLOSE")) .must(QueryBuilders.matchQuery(fieldLst.get(i),dataMap.get(fieldLst.get(i)).toString())); }else { builder = builder.must(QueryBuilders.matchQuery(fieldLst.get(i),dataMap.get(fieldLst.get(i)).toString())); } } SearchRequestBuilder responsebuilder = transportClient.prepareSearch(indexName).setTypes(typeName); SearchResponse myresponse=responsebuilder.setQuery(builder).execute().actionGet(); if(myresponse!=null) { SearchHits hits = myresponse.getHits(); for (int i = 0; i < hits.getHits().length; i++) { if (null == hits.getHits()[i]) { continue; } else { resultMap.add(JSON.parseObject(hits.getHits()[i].getSourceAsString(), Map.class)); } } }else { log.error("ES未查詢到任何結果!!!"); } }else { log.error("當前搜尋沒有條件資料,請先確認條件是否新增"); } } catch (Exception e) { log.error("ES Query Search Failed!!!",e); } return resultMap; } /** * 根據查詢資料更新 * @param indexName 索引名稱 * @param typeName 索引型別 * @param flag * @param jsonData json資料 */ @SuppressWarnings({ "rawtypes"}) public static void updateByQueryNew(String indexName, String typeName, Map<String,Object> dataMap, boolean flag) { try { TransportClient transportClient = getTransportClient(); JSONObject jsonObject = (JSONObject) JSONObject.toJSON(dataMap); if(jsonObject!=null&&jsonObject.size()>0) { Set<String> keySet = jsonObject.keySet(); List<SearchRequestBuilder > dataLst = new ArrayList<>(); for (String string : keySet) { SearchRequestBuilder requestBuilder = transportClient.prepareSearch().setQuery( QueryBuilders.matchQuery(string, jsonObject.get(string)).operator(Operator.AND)) .addAggregation(AggregationBuilders.terms(string).field(jsonObject.get(string).toString())); dataLst.add(requestBuilder); } if(dataLst==null||dataLst.size()==0) { log.error(">>>>>>>>>Update Data is Not Right,Pleace Check Out Data Fileds!!!<<<<<<<<<"); }else { //取最後一條聚合查詢的資料 MultiSearchRequestBuilder mrBuilder = transportClient.prepareMultiSearch(); mrBuilder.add(dataLst.get(dataLst.size()-1)); MultiSearchResponse multiResponse = mrBuilder.execute().actionGet(); if(multiResponse!=null) { for (MultiSearchResponse.Item item : multiResponse.getResponses()) { SearchResponse response = item.getResponse(); if(response!=null&&response.getHits().getTotalHits()>0) { SearchHit[] hits = response.getHits().getHits(); log.info("ES Match Data Length Is:"+response.getHits().getTotalHits()); for (SearchHit searchHit : hits) { if(flag) { Map map = JSONObject.parseObject(searchHit.getSourceAsString(),Map.class); if(map.get("repeat_count")!=null&&StringUtils.isNotEmpty(map.get("repeat_count").toString())) { dataMap.put("repeat_count",(Integer)map.get("repeat_count")+1); updateIndex(indexName,typeName,searchHit.getId(),dataMap); } }else { updateIndex(indexName,typeName,searchHit.getId(),dataMap); } } }else { log.error(">>>>>>>>>ES DO NOT MATCH ANAY DATA!!!<<<<<<<<<"); } } }else { log.error(">>>>>>>>>ES DO NOT MATCH ANAY DATA!!!<<<<<<<<<"); } } } } catch (Exception e) { log.error("ES Query Search Failed!!!",e); } } /** * 根據查詢資料更新 * @param indexName 索引名稱 * @param typeName 索引型別 * @param flag * @param jsonData json資料 */ public static void updateByQueryTemp(String indexName, String typeName, Map<String,Object> dataMap) { try { TransportClient transportClient = getTransportClient(); JSONObject jsonObject = (JSONObject) JSONObject.toJSON(dataMap); if(jsonObject!=null&&jsonObject.size()>0) { Set<String> keySet = jsonObject.keySet(); List<MatchPhraseQueryBuilder > dataLst = new ArrayList<>(); for (String key : keySet) { if(key.equals("sourceId")) { dataLst.add(QueryBuilders.matchPhraseQuery(key,jsonObject.get(key))); } } if(dataLst.size()!=1) { log.error(">>>>>>>>>Update Data is Not Right,Pleace Check Out Data Fileds!!!<<<<<<<<<"); }else { QueryBuilder qb = QueryBuilders.boolQuery().must(dataLst.get(0)); SearchRequestBuilder responsebuilder = transportClient.prepareSearch(indexName).setTypes(typeName); SearchResponse myresponse=responsebuilder.setQuery(qb).execute().actionGet(); if(myresponse!=null) { SearchHits hits = myresponse.getHits(); for (int i = 0; i < hits.getHits().length; i++) { if (null == hits.getHits()[i]) { log.error("ES未查詢到任何結果!!!"); } else { log.info("ES Match Data Length Is:"+hits.getTotalHits()); // log.info("ES Temp Update Data Summary Is:"+dataMap.get("summary").toString()); updateIndex(indexName,typeName,hits.getHits()[i].getId(),dataMap); } } }else { log.error("ES未查詢到任何結果!!!"); } } } } catch (Exception e) { log.error("ES Query Search Failed!!!",e); } } /** * 根據查詢資料更新 * @param indexName 索引名稱 * @param typeName 索引型別 * @param flag * @param jsonData json資料 */ public static void updateByQueryTempAlert(String indexName, String typeName, Map<String,Object> dataMap) { try { TransportClient transportClient = getTransportClient(); JSONObject jsonObject = (JSONObject) JSONObject.toJSON(dataMap); if(jsonObject!=null&&jsonObject.size()>0) { Set<String> keySet = jsonObject.keySet(); List<MatchPhraseQueryBuilder > dataLst = new ArrayList<>(); for (String key : keySet) { if(key.equals("alertNum")) { dataLst.add(QueryBuilders.matchPhraseQuery(key,jsonObject.get(key))); } } if(dataLst.size()!=1) { log.error(">>>>>>>>>Update Data is Not Right,Pleace Check Out Data Fileds!!!<<<<<<<<<"); }else { QueryBuilder qb = QueryBuilders.boolQuery().must(dataLst.get(0)); SearchRequestBuilder responsebuilder = transportClient.prepareSearch(indexName).setTypes(typeName); SearchResponse myresponse=responsebuilder.setQuery(qb).execute().actionGet(); if(myresponse!=null) { SearchHits hits = myresponse.getHits(); for (int i = 0; i < hits.getHits().length; i++) { if (null == hits.getHits()[i]) { log.error("ES未查詢到任何結果!!!"); } else { log.info("ES Match Data Length Is:"+hits.getTotalHits()); // log.info("ES Temp Update Data Summary Is:"+dataMap.get("summary").toString()); updateIndex(indexName,typeName,hits.getHits()[i].getId(),dataMap); } } }else { log.error("ES未查詢到任何結果!!!"); } } } } catch (Exception e) { log.error("ES Query Search Failed!!!",e); } } /** * 刪除指定索引 * @param indexName * @param typeName * @param id */ public static void deleteIndex(String indexName, String typeName, String id) { TransportClient transportClient = getTransportClient(); transportClient.prepareDelete(indexName, typeName, id).setRefreshPolicy(RefreshPolicy.IMMEDIATE).get(); // transportClient.close(); } /** * 判斷一個index中的type是否有資料 * @param index * @param type * @return * @throws Exception */ public static Boolean existDocOfType(String index, String type) throws Exception { SearchRequestBuilder builder = client.prepareSearch(index).setTypes(type) .setSearchType(SearchType.QUERY_THEN_FETCH) .setSize(1); SearchResponse response = builder.execute().actionGet(); long docNum = response.getHits().getTotalHits(); if (docNum == 0) { return false; } return true; } /** * 根據type來刪除資料 * @param index * @param types * @return */ public static long deleteDocByType(String index, String[] types) { TransportClient transportClient = getTransportClient(); long oldTime = System.currentTimeMillis(); StringBuilder b = new StringBuilder(); b.append("{\"query\":{\"match_all\":{}}}"); DeleteByQueryResponse response = new DeleteByQueryRequestBuilder(transportClient, DeleteByQueryAction.INSTANCE) .setIndices(index).setTypes(types) .setSource(b.toString()) .execute().actionGet(); Stack<String> allTypes = new Stack<String>(); for(String type : types){ allTypes.add(type); } while(!allTypes.isEmpty()){ String type = allTypes.pop(); while(true){ try { if (existDocOfType(index, type) == false) { break; } } catch (Exception e) { log.error("queryError: " + e.getMessage()); } } } System.out.println(System.currentTimeMillis() - oldTime); return response.getTotalDeleted(); } /** * 根據欄位刪除資料 * @param (indexName,typeName,field,value) * @return */ public static boolean deleteByField(String indexName,String typeName,String field,String value) { boolean flag = false; try { SearchResponse searchResponse = searcher(indexName,typeName,QueryBuilders.matchQuery(field, value)); if(searchResponse!=null) { SearchHits hits = searchResponse.getHits(); for (int i = 0; i < hits.getHits().length; i++) { if (null == hits.getHits()[i]) { log.error("ES未查詢到任何結果!!!"); } else { log.info("Color Rules Will Be Deleted Length IS: "+hits.getTotalHits()); deleteIndex(indexName,typeName,hits.getHits()[i].getId()); } } } flag = true; log.info("ES Data Delete By Field Success!!!"); } catch (Exception e) { log.error("ES Data Delete By Field Exception!!!",e); flag = false; } return flag; } /** * 分頁查詢 * @param sortType * @param sort * @param index * @param types * @return */ public static EsSearchVo findByPage(String indexName, Integer size,Integer pageNum,QueryBuilder queryBuilder, Object sort, Object sortType) { EsSearchVo vo = new EsSearchVo(); TransportClient transportClient = getTransportClient(); SearchResponse res = null; SearchResponse searchResponse = null; if(sort!=null&&sortType.toString().equals("DESC")) { searchResponse = transportClient.prepareSearch(indexName) .setQuery(queryBuilder) .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)//這種方式返回的document與使用者要求的size是相等的。 .setSize(size) .addSort(sort.toString(), SortOrder.DESC) .setExplain(true)// 設定是否按查詢匹配度排序 .setScroll(new TimeValue(20000)).execute() //設定TimeValue表示需要保持搜尋的上下文時間。 .actionGet();//注意:首次搜尋已經包含資料 }else if(sort!=null&&sortType.toString().equals("ASC")) { searchResponse = transportClient.prepareSearch(indexName) .setQuery(queryBuilder) .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)//這種方式返回的document與使用者要求的size是相等的。 .setSize(size) .addSort(sort.toString(), SortOrder.ASC) .setExplain(true)// 設定是否按查詢匹配度排序 .setScroll(new TimeValue(20000)).execute() //設定TimeValue表示需要保持搜尋的上下文時間。 .actionGet();//注意:首次搜尋已經包含資料 }else { searchResponse = transportClient.prepareSearch(indexName) .setQuery(queryBuilder) .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)//這種方式返回的document與使用者要求的size是相等的。 .setSize(size) .setExplain(true)// 設定是否按查詢匹配度排序 .setScroll(new TimeValue(20000)).execute() //設定TimeValue表示需要保持搜尋的上下文時間。 .actionGet();//注意:首次搜尋已經包含資料 } //獲取總數量 long totalCount = searchResponse.getHits().getTotalHits(); vo.setTotal(totalCount); int page = 0; int pageCount=0; if(totalCount%size==0) { pageCount = (int)totalCount/(size); }else { pageCount = (int)totalCount/(size)+1; } if(totalCount<size) { page = 1; }else { page=(int)totalCount/(size); } log.info("*************************ES Page Query Size Number is:"+pageCount+"************************"); log.info("*************************ES Page Query Match Data Number is:"+totalCount+"************************"); for (int i = 1; i <=page; i++) { if(pageNum-1==0) { res = searchResponse; break; }else if(pageNum-1==i){ //再次傳送請求,並使用上次搜尋結果的ScrollId res = transportClient.prepareSearchScroll(searchResponse.getScrollId()) .setScroll(new TimeValue(20000)).execute() .actionGet(); break; }else { searchResponse = transportClient.prepareSearchScroll(searchResponse.getScrollId()) .setScroll(new TimeValue(20000)).execute() .actionGet(); } } vo.setSr(res); return vo; } /* public static void main(String[] args) { List<Map<String,Object>> dataMap = new ArrayList<>(); Map<String,Object> m1=new HashMap<>(); m1.put("", true); HttpUtils.doPost(url, params); }*/ /** * 判斷指定的索引名是否存在 * * @param indexName * 索引名 * @return 存在:true; 不存在:false; */ public static boolean isExistsIndex(String indexName) { IndicesExistsResponse response = getTransportClient().admin().indices().exists(new IndicesExistsRequest().indices(new String[] { indexName })) .actionGet(); return response.isExists(); } /** * 判斷指定的索引的型別是否存在 * * @param indexName * 索引名 * @param indexType * 索引型別 * @return 存在:true; 不存在:false; */ public static boolean isExistsType(String indexName, String indexType) { TypesExistsResponse response = getTransportClient().admin().indices().typesExists(new TypesExistsRequest(new String[] { indexName }, indexType)) .actionGet(); return response.isExists(); } /** * 建立索引 * @param indexName * @param indexType * @param isFielddata 是否需要支援排序 */ public static void createIndexType(String indexName, String indexType, boolean isFielddata) { CreateIndexRequestBuilder cib = getTransportClient().admin().indices().prepareCreate(indexName); XContentBuilder mapping; try { mapping = XContentFactory.jsonBuilder().startObject() .startObject("_all").field("enabled", false).endObject()// 關閉_all欄位 .startObject("_source").field("enabled", true).endObject()// 開啟_source欄位 .startObject("properties") // 設定之定義欄位 .startObject("_default_") .field("type", "text") // 設定資料型別 // .field("date_detection", false) // 設定欄位不做時間檢測 .field("fielddata", isFielddata) // 設定支援排序 .endObject() .endObject() .endObject(); cib.addMapping(indexType, mapping); cib.execute().actionGet(); } catch (IOException e) { e.printStackTrace(); } } }
注意事項
① 在ES 5.6.9版本你可以直接用一個JSON字串來進行新增和修改操作,但是在6.2.2版本中這個方法被取締,使用的時候一定 要注意ES的版本,在6.0版本以後你可以直接傳遞一個Map來做資料的增改操作;
/**
* 建立索引
* @param indexName 索引名稱,相當於資料庫名稱
* @param typeName 索引型別,相當於資料庫中的表名
* @param id id名稱,相當於每個表中某一行記錄的標識
* @param jsonData json資料
*/
public static void createIndex(String indexName, String typeName, String id,
Map<String,Object> dataMap) {
TransportClient transportClient = getTransportClient();
if(transportClient!=null) {
IndexRequestBuilder requestBuilder = transportClient.prepareIndex(indexName, typeName, id).setSource(dataMap);
requestBuilder.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
requestBuilder.execute().actionGet();
// transportClient.close();
}else {
log.error(">>>>>>>>>ES連線初始化失敗,建立索引失敗<<<<<<<<<");
}
}
ElasticSearchUtils.createIndex(indexName, indexName, null, tempMap);
② ES工具如果在介面中呼叫頻繁,一定要做好相應的關閉或者構建該工具的單例項以免JAVA虛擬機器被耗資源,這個耗費資源的 度還是非常大的;
//取得例項
public static TransportClient getTransportClient(){
if (client != null)
return client;
else {
try {
Settings settings = Settings.builder()
.put("cluster.name",CLUSTER_NAME)
.put("client.transport.sniff", true) //啟動叢集嗅探
.build();
client = new PreBuiltTransportClient(settings)
.addTransportAddress(new TransportAddress(InetAddress.getByName(IP), PORT));
} catch (UnknownHostException e) {
log.error(">>>>>>>>>ES連線初始化失敗<<<<<<<<<", e);
client = null;
}
}
return client;
}
③ Scroll分頁查詢的效率在資料量上來後比form to操作來的更好,但是要記住,在6.0版本後已經不存在SearType為SCAN的類 型,也就是說,它第一次查詢就會給你返回資料,不會像之前一樣查詢第二次才給你返回資料,所以如果你想要獲取全量數 據,那麼程式碼你就得按規範來修改
/**
* 分頁查詢
* @param index
* @param types
* @return
*/
public static EsSearchVo findByPage(String indexName, Integer size,Integer pageNum,String sortField,QueryBuilder queryBuilder) {
EsSearchVo vo = new EsSearchVo();
TransportClient transportClient = getTransportClient();
SearchResponse res = null;
SearchResponse searchResponse = transportClient.prepareSearch(indexName)
.setQuery(queryBuilder)
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)//這種方式返回的document與使用者要求的size是相等的。
.setSize(size)
.addSort(sortField, SortOrder.DESC)
.setExplain(true)// 設定是否按查詢匹配度排序
.setScroll(new TimeValue(20000)).execute() //設定TimeValue表示需要保持搜尋的上下文時間。
.actionGet();//注意:首次搜尋已經包含資料
//獲取總數量
long totalCount = searchResponse.getHits().getTotalHits();
vo.setTotal(totalCount);
int page = 0;
int pageCount=0;
if(totalCount%size==0) {
pageCount = (int)totalCount/(size);
}else {
pageCount = (int)totalCount/(size)+1;
}
if(totalCount<size) {
page = 1;
}else {
page=(int)totalCount/(size);
}
log.info("*************************ES Page Query Size Number is:"+pageCount+"************************");
log.info("*************************ES Page Query Match Data Number is:"+totalCount+"************************");
for (int i = 1; i <=page; i++) {
if(pageNum-1==0) {
res = searchResponse;
break;
}else if(pageNum-1==i){
//再次傳送請求,並使用上次搜尋結果的ScrollId
res = transportClient.prepareSearchScroll(searchResponse.getScrollId())
.setScroll(new TimeValue(20000)).execute()
.actionGet();
break;
}else {
searchResponse = transportClient.prepareSearchScroll(searchResponse.getScrollId())
.setScroll(new TimeValue(20000)).execute()
.actionGet();
}
}
vo.setSr(res);
return vo;
}
④ 如果你分頁的單頁數量比你ES中存在的資料量還大,舉例說,你ES中該查詢只有5條資料,但是你分頁的pageSize是10,那 麼如果你不對這一步查詢做相關處理的話資料你是拿不到的,處理如下:
if(totalCount<size) {
page = 1;
}else {
page=(int)totalCount/(size);
}
⑤ ES新建索引也就是新增資料的流程中如果伴隨著查詢操作的話,那麼你的自增操作必須加上更新策略支援,不然在此流程中查 詢是 獲取不到資料的,常用策略有RefreshPolicy.IMMEDIATE(更新策略為立即更新),RefreshPolicy.NONE(更新策略為預設,也就是每幾秒執行一次批量更行,這是ES的預設策略),RefreshPolicy.WAIT_UNTIL(這個暫時不清楚,字面意識是等待什麼操作完成)。ElasticSearch 實際上是偽實時的,所有分片之間預設1s同步更新
③ 使用
@SuppressWarnings("unchecked")
private Map<String, Object> inserEsFirst(ConfigVo configVo, AlertWarnVo alertWarnVo, String indexName) {
Map<String,Object> resultMap = new HashMap<>();
//用來關聯元資料與豐富過後的資料的識別符號
String sourceId = new SimpleDateFormat("yyyyMMddhhmmssSSS").format(new Date());
boolean flag = true;
try {
//如果告警狀態沒有,強制設定為open
if(alertWarnVo.getEventValue()==null||StringUtils.isEmpty(alertWarnVo.getEventValue().toString())) {
alertWarnVo.setWarnStatus("OPEN");
}else if("0".equals(alertWarnVo.getEventValue())) {
alertWarnVo.setWarnStatus("CLOSE");
}else if("1".equals(alertWarnVo.getEventValue())) {
alertWarnVo.setWarnStatus("OPEN");
}
Map<String, Object> tempMap = (Map<String, Object>) JSON.parse(JSON.toJSONString(alertWarnVo));
tempMap.put("source", configVo.getDataSource());
//設定預設狀態過濾標識位為false
tempMap.put("isFilter", false);
tempMap.put("sourceId", sourceId);
resultMap.put("sourceId", sourceId);
ElasticSearchUtils.createIndex(indexName, indexName, null, tempMap);
} catch (Exception e) {
log.error(">>>>>>>>>Es Data Insert Failed!!!<<<<<<<<<", e);
flag =false;
}
resultMap.put("flag", flag);
return resultMap;
}
@SuppressWarnings("unchecked")
private Map<String,Object> updateESFilter(ConfigVo configVo, AlertWarnVo o,String indexName) {
Map<String,Object> resultMap = new HashMap<>();
boolean flag = true;
try {
Map<String, Object> tempMap = (Map<String, Object>) JSON.parse(JSON.toJSONString(o));
//根據sourceId存在修改
if(tempMap.get("sourceId")!=null&&StringUtils.isNotEmpty(tempMap.get("sourceId").toString())) {
tempMap.put("isFilter", true);
//indexName與typeName一樣
ElasticSearchUtils.updateByQueryTemp(indexName, indexName, tempMap);
}else {
log.error(">>>>>>>>>Es Data Update Failed Cause SourceId Is Empty!!!<<<<<<<<<" );
flag =false;
}
} catch (Exception e) {
log.error(">>>>>>>>>Es Data Update Failed!!!<<<<<<<<<", e);
flag =false;
}
resultMap.put("flag", flag);
return resultMap;
}
@Override
public Map<String, Object> queryBaseInfo() {
Map<String, Object> dataMap = new HashMap<>();
try {
Calendar baseCal = Calendar.getInstance();
baseCal.set(Calendar.HOUR_OF_DAY,0);
baseCal.set(Calendar.MINUTE, 0);
baseCal.set(Calendar.SECOND, 0);
baseCal.add(Calendar.DATE,0);
//查詢Disaster資料,CLOSED資料不算其中
SearchResponse searcherDisaster = ElasticSearchUtils.searcher(PUtils.getString("es.index.notice"),
PUtils.getString("es.index.notice"),
QueryBuilders.boolQuery()
.must(QueryBuilders.matchQuery("severity", "Disaster"))
.must(QueryBuilders.rangeQuery("eventTime").gt(baseCal.getTimeInMillis()))
.mustNot(QueryBuilders.matchQuery("warnStatus", "CLOSED")));
if(searcherDisaster!=null&&searcherDisaster.getHits().getTotalHits()!=0) {
dataMap.put("DisasterCount", searcherDisaster.getHits().getTotalHits());
}else {
dataMap.put("DisasterCount", 0);
}
//查詢High資料,CLOSED資料不算其中
SearchResponse searcherHigh = ElasticSearchUtils.searcher(PUtils.getString("es.index.notice"),
PUtils.getString("es.index.notice"),
QueryBuilders.boolQuery()
.must(QueryBuilders.matchQuery("severity", "High"))
.must(QueryBuilders.rangeQuery("eventTime").gt(baseCal.getTimeInMillis()))
.mustNot(QueryBuilders.matchQuery("warnStatus", "CLOSED")));
if(searcherHigh!=null&&searcherHigh.getHits().getTotalHits()!=0) {
dataMap.put("HighCount", searcherHigh.getHits().getTotalHits());
}else {
dataMap.put("HighCount", 0);
}
//查詢Average資料,CLOSED資料不算其中
SearchResponse searcherAverage= ElasticSearchUtils.searcher(PUtils.getString("es.index.notice"),
PUtils.getString("es.index.notice"),
QueryBuilders.boolQuery()
.must(QueryBuilders.matchQuery("severity", "Average"))
.must(QueryBuilders.rangeQuery("eventTime").gt(baseCal.getTimeInMillis()))
.mustNot(QueryBuilders.matchQuery("warnStatus", "CLOSED")));
if(searcherAverage!=null&&searcherAverage.getHits().getTotalHits()!=0) {
dataMap.put("AverageCount", searcherAverage.getHits().getTotalHits());
}else {
dataMap.put("AverageCount", 0);
}
//查詢Warning資料,CLOSED資料不算其中
SearchResponse searcherWarning= ElasticSearchUtils.searcher(PUtils.getString("es.index.notice"),
PUtils.getString("es.index.notice"),
QueryBuilders.boolQuery()
.must(QueryBuilders.matchQuery("severity", "Warning"))
.must(QueryBuilders.rangeQuery("eventTime").gt(baseCal.getTimeInMillis()))
.mustNot(QueryBuilders.matchQuery("warnStatus", "CLOSED")));
if(searcherWarning!=null&&searcherWarning.getHits().getTotalHits()!=0) {
dataMap.put("WarningCount", searcherWarning.getHits().getTotalHits());
}else {
dataMap.put("WarningCount", 0);
}
//查詢Not Classified資料,CLOSED資料不算其中
SearchResponse searcherNotClassified= ElasticSearchUtils.searcher(PUtils.getString("es.index.notice"),
PUtils.getString("es.index.notice"),
QueryBuilders.boolQuery()
.must(QueryBuilders.matchQuery("severity", "Not Classified"))
.must(QueryBuilders.rangeQuery("eventTime").gt(baseCal.getTimeInMillis()))
.mustNot(QueryBuilders.matchQuery("warnStatus", "CLOSED")));
if(searcherNotClassified!=null&&searcherNotClassified.getHits().getTotalHits()!=0) {
dataMap.put("NotClassifiedCount", searcherNotClassified.getHits().getTotalHits());
}else {
dataMap.put("NotClassifiedCount", 0);
}
//查詢Information資料,CLOSED資料不算其中
SearchResponse searcherInformation= ElasticSearchUtils.searcher(PUtils.getString("es.index.notice"),
PUtils.getString("es.index.notice"),
QueryBuilders.boolQuery()
.must(QueryBuilders.matchQuery("severity", "Information"))
.must(QueryBuilders.rangeQuery("eventTime").gt(baseCal.getTimeInMillis()))
.mustNot(QueryBuilders.matchQuery("warnStatus", "CLOSED")));
if(searcherInformation!=null&&searcherInformation.getHits().getTotalHits()!=0) {
dataMap.put("InformationCount", searcherInformation.getHits().getTotalHits());
}else {
dataMap.put("InformationCount", 0);
}
//查詢近7天告警總數資料,CLOSED資料不算其中
List<Long> eventsLst = new ArrayList<>();
for(int i=0;i<7;i++) {
RangeQueryBuilder rangequerybuilder = null;
if(i==6) {
Calendar cal = Calendar.getInstance();
cal.set(Calendar.HOUR_OF_DAY,0);
cal.set(Calendar.MINUTE, 0);
cal.set(Calendar.SECOND, 0);
cal.add(Calendar.DATE,(i-6));
rangequerybuilder = QueryBuilders.rangeQuery("eventTime").gt(cal.getTimeInMillis());
}else {
Calendar cal1 = Calendar.getInstance();
cal1.set(Calendar.HOUR_OF_DAY,0);
cal1.set(Calendar.MINUTE, 0);
cal1.set(Calendar.SECOND, 0);
cal1.add(Calendar.DATE,(i-6));
Calendar cal2 = Calendar.getInstance();
cal2.set(Calendar.HOUR_OF_DAY,0);
cal2.set(Calendar.MINUTE, 0);
cal2.set(Calendar.SECOND, 0);
cal2.add(Calendar.DATE,(i-5));
rangequerybuilder = QueryBuilders.rangeQuery("eventTime").gt(cal1.getTimeInMillis()).lt(cal2.getTimeInMillis());
}
SearchResponse searcher = ElasticSearchUtils.searcher(PUtils.getString("es.index.notice"), PUtils.getString("es.index.notice"), rangequerybuilder);
if(searcher!=null&&searcher.getHits().getTotalHits()!=0) {
eventsLst.add(searcher.getHits().getTotalHits());
}else {
eventsLst.add(0L);
}
}
dataMap.put("eventsLst", eventsLst);
} catch (Exception e) {
e.printStackTrace();
logger.error(">>>>>>>>>Query ES Base Source Has failed!!!<<<<<<<<<", e);
dataMap=null;
}
return dataMap;
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public List<Map<String, Object>> queryAllEvents(Integer pageNum, Integer pageSize) {
List<Map<String, Object>> dataLst = new ArrayList<>();
try {
EsSearchVo vo = ElasticSearchUtils.findByPage(PUtils.getString("es.index.notice"),pageSize,pageNum,"eventTime",
QueryBuilders.boolQuery()
.must(QueryBuilders.matchAllQuery()));
if(vo!=null&&vo.getSr()!=null) {
SearchResponse searcher = vo.getSr();
logger.info("******************Total Match Sources Are:"+searcher.getHits().getTotalHits()+"********************");
SearchHits hits = searcher.getHits();
for (SearchHit searchHit : hits.getHits()) {
if(searchHit.getSourceAsString()!=null&&searchHit.getSourceAsString().length()>0) {
Map parseMap = JSON.parseObject(searchHit.getSourceAsString(), Map.class);
parseMap.put("page", pageNum);
parseMap.put("size", pageSize);
parseMap.put("total", vo.getTotal()==null?0:vo.getTotal());
dataLst.add(parseMap);
}
}
}else {
logger.info("******************No Source Matched In ES!!!!********************");
}
} catch (Exception e) {
e.printStackTrace();
logger.error("****************** Query All Es Events Failed,Cause: ********************",e);
dataLst=null;
}
return dataLst;
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public List<Map<String, Object>> queryCloseEvents(Integer pageNum, Integer pageSize) {
List<Map<String, Object>> dataLst = new ArrayList<>();
try {
EsSearchVo vo = ElasticSearchUtils.findByPage(PUtils.getString("es.index.notice"),pageSize,pageNum,"eventTime",
QueryBuilders.boolQuery()
.must(QueryBuilders.matchQuery("warnStatus","CLOSED")));
if(vo!=null&&vo.getSr()!=null) {
SearchResponse searcher = vo.getSr();
logger.info("******************Closed Match Sources Are:"+searcher.getHits().getTotalHits()+"********************");
SearchHits hits = searcher.getHits();
for (SearchHit searchHit : hits.getHits()) {
if(searchHit.getSourceAsString()!=null&&searchHit.getSourceAsString().length()>0) {
Map parseMap = JSON.parseObject(searchHit.getSourceAsString(), Map.class);
parseMap.put("page", pageNum);
parseMap.put("size", pageSize);
parseMap.put("total", vo.getTotal()==null?0:vo.getTotal());
dataLst.add(parseMap);
}
}
}
} catch (Exception e) {
e.printStackTrace();
logger.error("****************** Query Closed Es Events Failed,Cause: ********************",e);
dataLst=null;
}
return dataLst;
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public List<Map<String, Object>> queryOpenEvents(Integer pageNum, Integer pageSize) {
List<Map<String, Object>> dataLst = new ArrayList<>();
try {
EsSearchVo vo = ElasticSearchUtils.findByPage(PUtils.getString("es.index.notice"),pageSize,pageNum,"eventTime",
QueryBuilders.boolQuery()
.must(QueryBuilders.matchQuery("warnStatus","OPEN")));
if(vo!=null&&vo.getSr()!=null) {
SearchResponse searcher = vo.getSr();
logger.info("******************Open Match Sources Are:"+searcher.getHits().getTotalHits()+"********************");
SearchHits hits = searcher.getHits();
for (SearchHit searchHit : hits.getHits()) {
if(searchHit.getSourceAsString()!=null&&searchHit.getSourceAsString().length()>0) {
Map parseMap = JSON.parseObject(searchHit.getSourceAsString(), Map.class);
parseMap.put("page", pageNum);
parseMap.put("size", pageSize);
parseMap.put("total", vo.getTotal()==null?0:vo.getTotal());
dataLst.add(parseMap);
}
}
}
} catch (Exception e) {
e.printStackTrace();
logger.error("****************** Query Open Es Events Failed,Cause: ********************",e);
dataLst=null;
}
return dataLst;
}
特別提醒:在剛啟動ES的時候,ES中是沒有庫的,這個時候要在服務啟動的時候就建立專案中預設用到的ES庫,否則會報錯,建立方式分為兩種,一種為實現InitializingBean介面,或者實現ApplicationListener介面然後新增如下程式碼,前者要在該實現類配置相應的註解,@resource或者compnent都行,然後配置檔案得掃描到該實現類所在的位置;後者要在spring配置檔案中註冊也就是<bean>的方式。
//判斷temp庫是否存在
if (!ElasticSearchUtils.isExistsIndex(PUtils.getString("es.index.temp"))) {
logger.error("ES庫["+PUtils.getString("es.index.temp")+ "]不存在,建立新庫.......");
//建立Temp庫
ElasticSearchUtils.createIndexType(PUtils.getString("es.index.temp"), PUtils.getString("es.index.temp"), true);
}
if (!ElasticSearchUtils.isExistsIndex(PUtils.getString("es.index.notice"))) {
logger.error("ES庫["+PUtils.getString("es.index.notice")+ "]不存在,建立新庫.......");
//建立notice庫
ElasticSearchUtils.createIndexType(PUtils.getString("es.index.notice"), PUtils.getString("es.index.notice"), true);
}
if (!ElasticSearchUtils.isExistsIndex(PUtils.getString("es.index.color"))) {
logger.error("ES庫["+PUtils.getString("es.index.color")+ "]不存在,建立新庫.......");
//建立color庫
ElasticSearchUtils.createIndexType(PUtils.getString("es.index.color"), PUtils.getString("es.index.color"), true);
}
總結:
ES 的ClusterName相當於就是一個數據庫mysql,indexName相當於他的庫名,typeName相當於是表名,_id相當於此表的主鍵,當然你也可以自定義主鍵來完成相應的需求,它的效率與mysql在大資料量中相比要強大N倍。