1. 程式人生 > >Elastic Search(ES)使用筆記

Elastic Search(ES)使用筆記

ElasticSearch介紹:

ElasticSearch是一個基於Lucene的搜尋伺服器。它提供了一個分散式多使用者能力的全文搜尋引擎,基於RESTful web介面。Elasticsearch是用Java開發的,並作為Apache許可條款下的開放原始碼釋出,是當前流行的企業級搜尋引擎。設計用於雲端計算中,能夠達到實時搜尋,穩定,可靠,快速,安裝使用方便。

我們建立一個網站或應用程式,並要新增搜尋功能,但是想要完成搜尋工作的建立是非常困難的。我們希望搜尋解決方案要執行速度快,我們希望能有一個零配置和一個完全免費的搜尋模式,我們希望能夠簡單地使用JSON通過HTTP來索引資料,我們希望我們的搜尋伺服器始終可用,我們希望能夠從一臺開始並擴充套件到數百臺,我們要實時搜尋,我們要簡單的多租戶,我們希望建立一個雲的解決方案。因此我們利用Elasticsearch來解決所有這些問題及可能出現的更多其它問題。其作用如果你用過Solr的話,他們效果是差不多的,看你怎麼用,但是在資料量過億級別的話,ES的資料處理速度就比solr快很多。

使用

① maven依賴匯入:

        <dependency>
			<groupId>org.elasticsearch</groupId>
			<artifactId>elasticsearch</artifactId>
			<version>6.2.2</version>
		</dependency>

        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>transport</artifactId>
            <version>6.2.2</version>
        </dependency>

② 工具類編寫:

package com.mvs.utils;

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Stack;

import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.admin.indices.exists.types.TypesExistsRequest;
import org.elasticsearch.action.admin.indices.exists.types.TypesExistsResponse;
import org.elasticsearch.action.deletebyquery.DeleteByQueryAction;
import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder;
import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.MultiSearchRequestBuilder;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MatchPhraseQueryBuilder;
import org.elasticsearch.index.query.Operator;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Repository;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.infochang.comm.utils.PUtils;
import com.mvs.common.BizException;
import com.mvs.model.EsSearchVo;

@Repository
@SuppressWarnings("resource")
public class ElasticSearchUtils{
//public class ElasticSearchUtils implements InitializingBean {
	    private static Logger log = LoggerFactory.getLogger(ElasticSearchUtils.class);
	    private static TransportClient client;  
	    
	    //取得例項  
	    public static  TransportClient getTransportClient(){  
	    	if (client != null)
	    		return client;
	    	else {
		    	try {
			    	  Settings settings = Settings.builder()
													    			.put("cluster.name",PUtils.getString("es.cluster.name"))  
													                .put("client.transport.sniff", true)  //啟動叢集嗅探
													                .build(); 
			    	  
						client = new PreBuiltTransportClient(settings)
								     .addTransportAddress(new TransportAddress(InetAddress.getByName(PUtils.getString("es.cluster.url")), 
								     Integer.valueOf(PUtils.getString("es.cluster.port"))));
					} catch (UnknownHostException e) {
						log.error(">>>>>>>>>ES連線初始化失敗<<<<<<<<<", e);
						client = null;
					} 
	    	}
	    	return client;
	    }  
	    
	    /** 
	     * 建立索引 
	     * @param indexName 索引名稱,相當於資料庫名稱 
	     * @param typeName 索引型別,相當於資料庫中的表名 
	     * @param id id名稱,相當於每個表中某一行記錄的標識 
	     * @param jsonData json資料 
	     */  
		public static void createIndex(String indexName, String typeName, String id,  
	            Map<String,Object> dataMap) {  
	    	TransportClient transportClient = getTransportClient();
	    	if(transportClient!=null) {
	    		if(dataMap!=null&&dataMap.get("summary")!=null) {
	    			log.info(">>>>>>>>>ES Temp Insert Data Summary Is:"+dataMap.get("summary").toString()+"<<<<<<<<<");
	    		}
				IndexRequestBuilder requestBuilder = transportClient.prepareIndex(indexName, typeName, id).setSource(dataMap);
				requestBuilder.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
				requestBuilder.execute().actionGet();
//				transportClient.close();
	    	}else {
	    		log.error(">>>>>>>>>ES連線初始化失敗,建立索引失敗<<<<<<<<<");
	    	}
	    }  
		
		/** 
	     * 批量建立索引 
	     * @param indexName 索引名稱,相當於資料庫名稱 
	     * @param typeName 索引型別,相當於資料庫中的表名 
	     * @param id id名稱,相當於每個表中某一行記錄的標識 
	     * @param jsonData json資料 
	     */  
	public static void createIndexDoBatch(String indexName, String typeName, String id,  List<Map<String,Object>> dataLst) { 
	    	TransportClient transportClient = getTransportClient();
	    	if(transportClient!=null) {
	    		BulkRequestBuilder bulkRequest = transportClient.prepareBulk();
	    		if(dataLst!=null&&dataLst.size()>0) {
    	    		for (Map<String,Object> dataMap : dataLst){
    	    			bulkRequest.add(transportClient.prepareIndex(indexName, typeName).setSource(dataMap));
    	    		}
	    		}
	    		bulkRequest.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
	    		bulkRequest.execute().actionGet();
	    	}
	    } 
	    /** 
	     * 執行搜尋 
	     * @param indexname 索引名稱 
	     * @param type 索引型別 
	     * @param queryBuilder 查詢條件 
	     * @return 
	     */  
	    public static SearchResponse searcher(String indexName, String typeName,  QueryBuilder queryBuilder) { 
	    	TransportClient transportClient = getTransportClient();
	        SearchResponse searchResponse = transportClient.prepareSearch(indexName)  
	                .setTypes(typeName).setQuery(queryBuilder).execute()  
	                .actionGet();//執行查詢  
//	        transportClient.close();
	        return searchResponse;  
	    }  
	    
	    /** 
	     * 執行搜尋 
	     * @param indexname 索引名稱 
	     * @param type 索引型別 
	     * @param queryBuilder 查詢條件 
	     * @return 
	     */  
	    @SuppressWarnings("unchecked")
		public static List<Map<String,Object>> searcher(QueryBuilder queryBuilder,String indexName, String typeName) { 
	    	List<Map<String,Object>> dataMap = new ArrayList<>();
	    	TransportClient transportClient = getTransportClient();
	    	try {
				SearchResponse searchResponse = transportClient.prepareSearch(indexName)  
				        .setTypes(typeName).setQuery(queryBuilder).execute()  
				        .actionGet();//執行查詢  
				
				if(searchResponse!=null) {
					SearchHits hits = searchResponse.getHits();
					for (int i = 0; i < hits.getHits().length; i++) {  
				        if (null == hits.getHits()[i]) {  
				            continue;
				        } else {  
				        	dataMap.add(JSON.parseObject(hits.getHits()[i].getSourceAsString(), Map.class));
				          }  
				       }
				    }else {
				    	 log.error("ES未查詢到任何結果!!!");  
				    }
			} catch (Exception e) {
				 log.error("ES查詢異常!!!");  
				 throw new BizException(e.getMessage(), e.getCause());
			}
	        return dataMap;  
	    }  
	    
	    /** 
	     * 更新索引 
	     * @param indexName 索引名稱 
	     * @param typeName 索引型別 
	     * @param id id名稱 
	     * @param jsonData json資料 
	     */  
		public static void updateIndex(String indexName, String typeName, String id, Map<String,Object> dataMap) {  
	    	TransportClient transportClient = getTransportClient();
			UpdateRequestBuilder updateRequest = transportClient.prepareUpdate(indexName, typeName, id).setDoc(dataMap);
			updateRequest.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
	        updateRequest.execute().actionGet();
//	        transportClient.close();
	        
	    }  
	    
		 public static Map<String, Object> getEsMatchFields(Map<String,Object> dataMap) {
		    	Map<String,Object> fieldMap = new HashMap<>();
		    	try {
					String fields = PUtils.getString("zabbix.es.match.field");
					if(StringUtils.isBlank(fields)) {
						log.error("*********請先在配置檔案中配置Zabbix命中欄位*********");
						fieldMap = null;
					}else {
						String[] fieldArr = fields.split(",",-1);
						for (String key : fieldArr) {
						    if(dataMap.get(key) == null){
						        log.error("*********當前資料不包含要命中的欄位,請檢查資料欄位*********");
                                break;
						    }
							String fieldValue = dataMap.get(key).toString();
							if(StringUtils.isBlank(fieldValue)) {
								log.error("*********當前資料不包含要命中的欄位,請檢查資料欄位*********");
								break;
							}else {
								fieldMap.put(key, fieldValue);
							}
						}
					}
				} catch (Exception e) {
					log.error("*********設定Zabbix命中欄位資料異常*********",e);
				}
				return fieldMap;
			}
		
		
		     /** 
			 * 根據查詢資料更新
			 * @param indexName 索引名稱 
			 * @param typeName 索引型別 
			 * @param flag 
			 * @param jsonData json資料 
			 */  
			@SuppressWarnings({ "rawtypes"})
			public static void updateByQueryNotice(String indexName, String typeName, Map<String,Object> dataMap, boolean flag) {  
				try {
					    TransportClient transportClient = getTransportClient();
					    Map<String, Object> conMap = getEsMatchFields(dataMap);
						if(conMap!=null&&conMap.size()>0) {
							List<String> fieldLst =  new ArrayList<>(conMap.keySet()) ;
							BoolQueryBuilder builder = null;
							for (int i=0;i< fieldLst.size(); i++) {
							    if(i==0) {
							    	builder = QueryBuilders.boolQuery().mustNot(QueryBuilders.matchQuery("warnStatus", "CLOSE"))
							    			                                                           .must(QueryBuilders.matchQuery(fieldLst.get(i),conMap.get(fieldLst.get(i)).toString()));
							    }else {
							    	builder =  builder.must(QueryBuilders.matchQuery(fieldLst.get(i),conMap.get(fieldLst.get(i)).toString()));
                                            
							    }
						    }
							SearchRequestBuilder responsebuilder = transportClient.prepareSearch(indexName).setTypes(typeName);
						    SearchResponse myresponse=responsebuilder.setQuery(builder).execute().actionGet();
							                    
							    if(myresponse!=null) {
							    	SearchHits hits = myresponse.getHits();
							    	for (int i = 0; i < hits.getHits().length; i++) {  
							            if (null == hits.getHits()[i]) {  
							                log.error("ES未查詢到任何結果!!!");
							            } else {  
							                	 if(flag) {
							                		 log.info("ES Match Data Length Is:"+hits.getTotalHits());
							                		 Map map = JSONObject.parseObject(hits.getHits()[i].getSourceAsString(),Map.class);
							                		 if(map.get("repeat_count")!=null&&StringUtils.isNotEmpty(map.get("repeat_count").toString())) {
							                			 dataMap.put("repeat_count",(Integer)map.get("repeat_count")+1);
							                			 updateIndex(indexName,typeName,hits.getHits()[i].getId(),dataMap);
							                		 }
							                	 }else {
							                		 updateIndex(indexName,typeName,hits.getHits()[i].getId(),dataMap);
							                	 }
							              }  
							           }
							        }else {
							        	 log.error("ES未查詢到任何結果!!!");  
							        }
				}else {
					log.error("當前搜尋沒有條件資料,請先確認條件是否新增");  
				}
		} catch (Exception e) {
			log.error("ES Query Search Failed!!!",e);  
		}
			}
			
			
			 /** 
			 * 根據查詢資料更新
			 * @param indexName 索引名稱 
			 * @param typeName 索引型別 
			 * @param flag 
			 * @param jsonData json資料 
			 */  
			@SuppressWarnings({ "unchecked"})
			public static List<Map<String,Object>> getZabbixMatchData(String indexName, String typeName, Map<String,Object> dataMap) {  
				List<Map<String,Object>> resultMap = new ArrayList<>();
				TransportClient transportClient = getTransportClient();
				try {
						if(dataMap!=null&&dataMap.size()>0) {
							List<String> fieldLst =  new ArrayList<>(dataMap.keySet()) ;
							BoolQueryBuilder builder = null;
						for (int i=0;i< fieldLst.size(); i++) {
							    if(i==0) {
							    	builder = QueryBuilders.boolQuery().mustNot(QueryBuilders.matchQuery("warnStatus", "CLOSE"))
							    			                                                           .must(QueryBuilders.matchQuery(fieldLst.get(i),dataMap.get(fieldLst.get(i)).toString()));
							    }else {
							    	builder =  builder.must(QueryBuilders.matchQuery(fieldLst.get(i),dataMap.get(fieldLst.get(i)).toString()));
                                            
							    }
						    }
							SearchRequestBuilder responsebuilder = transportClient.prepareSearch(indexName).setTypes(typeName);
						    SearchResponse myresponse=responsebuilder.setQuery(builder).execute().actionGet();
							                    
						    if(myresponse!=null) {
						    	SearchHits hits = myresponse.getHits();
						    	for (int i = 0; i < hits.getHits().length; i++) {  
						            if (null == hits.getHits()[i]) {  
						                continue;
						            } else {  
						            	resultMap.add(JSON.parseObject(hits.getHits()[i].getSourceAsString(), Map.class));
						              }  
						           }
						        }else {
						        	 log.error("ES未查詢到任何結果!!!");  
						        }
						}else {
							log.error("當前搜尋沒有條件資料,請先確認條件是否新增");  
						}
		} catch (Exception e) {
			log.error("ES Query Search Failed!!!",e);  
		}
			return resultMap;
	}
			
			
			 /** 
			 * 根據查詢資料更新
			 * @param indexName 索引名稱 
			 * @param typeName 索引型別 
			 * @param flag 
			 * @param jsonData json資料 
			 */  
			@SuppressWarnings({ "rawtypes"})
			public static void updateByQueryNew(String indexName, String typeName, Map<String,Object> dataMap, boolean flag) {  
				try {
					    TransportClient transportClient = getTransportClient();
						JSONObject jsonObject = (JSONObject) JSONObject.toJSON(dataMap);
						if(jsonObject!=null&&jsonObject.size()>0) {
							Set<String> keySet = jsonObject.keySet();
							List<SearchRequestBuilder > dataLst = new ArrayList<>();
					        for (String string : keySet) {
					        	SearchRequestBuilder requestBuilder = transportClient.prepareSearch().setQuery(
					        			QueryBuilders.matchQuery(string, jsonObject.get(string)).operator(Operator.AND))
					        			.addAggregation(AggregationBuilders.terms(string).field(jsonObject.get(string).toString()));
					        	dataLst.add(requestBuilder);
							}
					        
							
							if(dataLst==null||dataLst.size()==0) {
								log.error(">>>>>>>>>Update Data is Not Right,Pleace Check Out Data Fileds!!!<<<<<<<<<");
							}else {
								    //取最後一條聚合查詢的資料
								    MultiSearchRequestBuilder mrBuilder = transportClient.prepareMultiSearch();
							        mrBuilder.add(dataLst.get(dataLst.size()-1));
							        
							        MultiSearchResponse multiResponse = mrBuilder.execute().actionGet();
							        if(multiResponse!=null) {
							        	 for (MultiSearchResponse.Item item : multiResponse.getResponses()) {
									            SearchResponse response = item.getResponse();
									            if(response!=null&&response.getHits().getTotalHits()>0) {
									            	 SearchHit[] hits = response.getHits().getHits();
									            	 log.info("ES Match Data Length Is:"+response.getHits().getTotalHits());
											         for (SearchHit searchHit : hits) {
											            	 if(flag) {
										                		 Map map = JSONObject.parseObject(searchHit.getSourceAsString(),Map.class);
										                		 if(map.get("repeat_count")!=null&&StringUtils.isNotEmpty(map.get("repeat_count").toString())) {
										                			 dataMap.put("repeat_count",(Integer)map.get("repeat_count")+1);
										                			 updateIndex(indexName,typeName,searchHit.getId(),dataMap);
										                		 }
										                	 }else {
										                		 updateIndex(indexName,typeName,searchHit.getId(),dataMap);
										                	 }
														}
									            }else {
									            	log.error(">>>>>>>>>ES DO NOT MATCH ANAY DATA!!!<<<<<<<<<");
									            }
									        }
							        }else {
							        	log.error(">>>>>>>>>ES DO NOT MATCH ANAY DATA!!!<<<<<<<<<");
							        }
							     }
						}
				} catch (Exception e) {
					log.error("ES Query Search Failed!!!",e);  
				}
			}
			
         /** 
		 * 根據查詢資料更新
		 * @param indexName 索引名稱 
		 * @param typeName 索引型別 
		 * @param flag 
		 * @param jsonData json資料 
		 */  
		public static void updateByQueryTemp(String indexName, String typeName, Map<String,Object> dataMap) {  
			try {
				    TransportClient transportClient = getTransportClient();
					JSONObject jsonObject = (JSONObject) JSONObject.toJSON(dataMap);
					if(jsonObject!=null&&jsonObject.size()>0) {
						Set<String> keySet = jsonObject.keySet();
						List<MatchPhraseQueryBuilder > dataLst = new ArrayList<>();
						for (String key : keySet) {
							if(key.equals("sourceId")) {
								dataLst.add(QueryBuilders.matchPhraseQuery(key,jsonObject.get(key)));
							}
					    }
						
						if(dataLst.size()!=1) {
							log.error(">>>>>>>>>Update Data is Not Right,Pleace Check Out Data Fileds!!!<<<<<<<<<");
						}else {
								  QueryBuilder qb  = QueryBuilders.boolQuery().must(dataLst.get(0));
								  SearchRequestBuilder responsebuilder = transportClient.prepareSearch(indexName).setTypes(typeName);
								  SearchResponse myresponse=responsebuilder.setQuery(qb).execute().actionGet();
								                    
								    if(myresponse!=null) {
								    	SearchHits hits = myresponse.getHits();
								    	for (int i = 0; i < hits.getHits().length; i++) {  
								            if (null == hits.getHits()[i]) {  
								                log.error("ES未查詢到任何結果!!!");
								            } else {  
								                		 log.info("ES Match Data Length Is:"+hits.getTotalHits());
								                		 // log.info("ES Temp Update Data Summary Is:"+dataMap.get("summary").toString());
								                		 updateIndex(indexName,typeName,hits.getHits()[i].getId(),dataMap);
								              }  
								           }
								        }else {
								        	 log.error("ES未查詢到任何結果!!!");  
								        }
						     }
					}
			} catch (Exception e) {
				log.error("ES Query Search Failed!!!",e);  
			}
		}
						
        /** 
        * 根據查詢資料更新
        * @param indexName 索引名稱 
        * @param typeName 索引型別 
        * @param flag 
        * @param jsonData json資料 
        */  
       public static void updateByQueryTempAlert(String indexName, String typeName, Map<String,Object> dataMap) {  
           try {
                   TransportClient transportClient = getTransportClient();
                   JSONObject jsonObject = (JSONObject) JSONObject.toJSON(dataMap);
                   if(jsonObject!=null&&jsonObject.size()>0) {
                       Set<String> keySet = jsonObject.keySet();
                       List<MatchPhraseQueryBuilder > dataLst = new ArrayList<>();
                       for (String key : keySet) {
                           if(key.equals("alertNum")) {
                               dataLst.add(QueryBuilders.matchPhraseQuery(key,jsonObject.get(key)));
                           }
                       }
                       
                       if(dataLst.size()!=1) {
                           log.error(">>>>>>>>>Update Data is Not Right,Pleace Check Out Data Fileds!!!<<<<<<<<<");
                       }else {
                                 QueryBuilder qb  = QueryBuilders.boolQuery().must(dataLst.get(0));
                                 SearchRequestBuilder responsebuilder = transportClient.prepareSearch(indexName).setTypes(typeName);
                                 SearchResponse myresponse=responsebuilder.setQuery(qb).execute().actionGet();
                                                   
                                   if(myresponse!=null) {
                                       SearchHits hits = myresponse.getHits();
                                       for (int i = 0; i < hits.getHits().length; i++) {  
                                           if (null == hits.getHits()[i]) {  
                                               log.error("ES未查詢到任何結果!!!");
                                           } else {  
                                                        log.info("ES Match Data Length Is:"+hits.getTotalHits());
                                                        // log.info("ES Temp Update Data Summary Is:"+dataMap.get("summary").toString());
                                                        updateIndex(indexName,typeName,hits.getHits()[i].getId(),dataMap);
                                             }  
                                          }
                                       }else {
                                            log.error("ES未查詢到任何結果!!!");  
                                       }
                            }
                   }
           } catch (Exception e) {
               log.error("ES Query Search Failed!!!",e);  
           }
       }	
		
	    /** 
	     * 刪除指定索引 
	     * @param indexName 
	     * @param typeName 
	     * @param id 
	     */  
	    public static void deleteIndex(String indexName, String typeName, String id) {  
	    	TransportClient transportClient = getTransportClient();
	    	transportClient.prepareDelete(indexName, typeName, id).setRefreshPolicy(RefreshPolicy.IMMEDIATE).get();  
//	    	transportClient.close();
	    } 
	    
	    /** 
	     * 判斷一個index中的type是否有資料 
	     * @param index 
	     * @param type 
	     * @return 
	     * @throws Exception 
	     */  
	    public static Boolean existDocOfType(String index, String type) throws Exception {  
	        SearchRequestBuilder builder = client.prepareSearch(index).setTypes(type)  
	                .setSearchType(SearchType.QUERY_THEN_FETCH)  
	                .setSize(1);  
	        SearchResponse response = builder.execute().actionGet();  
	        long docNum = response.getHits().getTotalHits();  
	        if (docNum == 0) {  
	            return false;  
	        }  
	        return true;  
	    }  
	  
	    /** 
	     * 根據type來刪除資料 
	     * @param index 
	     * @param types 
	     * @return 
	     */  
	    public static long deleteDocByType(String index, String[] types) {  
	    	TransportClient transportClient = getTransportClient();
	        long oldTime = System.currentTimeMillis();  
	        StringBuilder b = new StringBuilder();  
	        b.append("{\"query\":{\"match_all\":{}}}");  
	        DeleteByQueryResponse response = new DeleteByQueryRequestBuilder(transportClient, DeleteByQueryAction.INSTANCE)  
	        .setIndices(index).setTypes(types)  
	        .setSource(b.toString())  
	        .execute().actionGet();  
	        Stack<String> allTypes = new Stack<String>();  
	        for(String type : types){  
	            allTypes.add(type);  
	        }  
	        while(!allTypes.isEmpty()){  
	            String type = allTypes.pop();  
	            while(true){  
	                try {  
	                    if (existDocOfType(index, type) == false) {  
	                        break;  
	                    }  
	                } catch (Exception e) {  
	                    log.error("queryError: " + e.getMessage());  
	                }  
	            }  
	        }  
	        System.out.println(System.currentTimeMillis() - oldTime);  
	        return response.getTotalDeleted();  
	    }  
	    
	    /** 
	     * 根據欄位刪除資料
	     * @param (indexName,typeName,field,value)
	     * @return 
	     */
	    public  static boolean deleteByField(String indexName,String typeName,String field,String value) {
	    	boolean flag = false;
	    	try {
				SearchResponse searchResponse = searcher(indexName,typeName,QueryBuilders.matchQuery(field, value));
				if(searchResponse!=null) {
					SearchHits hits = searchResponse.getHits();
					for (int i = 0; i < hits.getHits().length; i++) {  
				        if (null == hits.getHits()[i]) {  
				            log.error("ES未查詢到任何結果!!!");
				        } else {  
				            		 log.info("Color Rules Will Be Deleted Length IS: "+hits.getTotalHits());
				            		 deleteIndex(indexName,typeName,hits.getHits()[i].getId());
				          }  
				       }
				    }
				flag = true;
				log.info("ES Data Delete By Field Success!!!");
			} catch (Exception e) {
				log.error("ES Data Delete By Field Exception!!!",e);
				flag = false;
			}
	    	return flag;
	    }
	    
	    /** 
	     * 分頁查詢
	     * @param sortType 
	     * @param sort 
	     * @param index 
	     * @param types 
	     * @return 
	     */  
	   public static EsSearchVo findByPage(String indexName, Integer size,Integer pageNum,QueryBuilder queryBuilder, Object sort, Object sortType)  {  
		   EsSearchVo vo = new EsSearchVo();
	    	TransportClient transportClient = getTransportClient();
	    	SearchResponse res = null;
	    	SearchResponse searchResponse = null;
	    	if(sort!=null&&sortType.toString().equals("DESC")) {
	    		searchResponse = transportClient.prepareSearch(indexName)
						.setQuery(queryBuilder)       
                        .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)//這種方式返回的document與使用者要求的size是相等的。
                        .setSize(size)
                        .addSort(sort.toString(), SortOrder.DESC)
		                .setExplain(true)// 設定是否按查詢匹配度排序
		                .setScroll(new TimeValue(20000)).execute()  //設定TimeValue表示需要保持搜尋的上下文時間。
		                .actionGet();//注意:首次搜尋已經包含資料
	    	}else if(sort!=null&&sortType.toString().equals("ASC")) {
	    		searchResponse = transportClient.prepareSearch(indexName)
						.setQuery(queryBuilder)       
                        .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)//這種方式返回的document與使用者要求的size是相等的。
                        .setSize(size)
                        .addSort(sort.toString(), SortOrder.ASC)
		                .setExplain(true)// 設定是否按查詢匹配度排序
		                .setScroll(new TimeValue(20000)).execute()  //設定TimeValue表示需要保持搜尋的上下文時間。
		                .actionGet();//注意:首次搜尋已經包含資料
	    	}else {
	    		searchResponse = transportClient.prepareSearch(indexName)
						.setQuery(queryBuilder)       
                        .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)//這種方式返回的document與使用者要求的size是相等的。
                        .setSize(size)
		                .setExplain(true)// 設定是否按查詢匹配度排序
		                .setScroll(new TimeValue(20000)).execute()  //設定TimeValue表示需要保持搜尋的上下文時間。
		                .actionGet();//注意:首次搜尋已經包含資料
	    	}
	    	  
	        //獲取總數量  
	        long totalCount = searchResponse.getHits().getTotalHits();  
	        vo.setTotal(totalCount);
	        
	        int page  = 0;
	        int pageCount=0;
	        if(totalCount%size==0) {
	        	pageCount = (int)totalCount/(size);
	        }else {
	        	pageCount = (int)totalCount/(size)+1;
	        }
	        if(totalCount<size) {
	        	page = 1;
	        }else {
	        	page=(int)totalCount/(size);
	        }
	        
	        log.info("*************************ES Page Query Size Number is:"+pageCount+"************************");  
	        log.info("*************************ES Page Query Match Data Number is:"+totalCount+"************************");  
	        for (int i = 1; i <=page; i++) {  
	        	if(pageNum-1==0) {
	        		res = searchResponse; 
	        		break;
	        	}else if(pageNum-1==i){
	            //再次傳送請求,並使用上次搜尋結果的ScrollId  
	        		res = transportClient.prepareSearchScroll(searchResponse.getScrollId())  
														                    .setScroll(new TimeValue(20000)).execute()  
														                    .actionGet();  
	                    
	              break;
	        	}else {
	        		searchResponse = transportClient.prepareSearchScroll(searchResponse.getScrollId())  
		                    .setScroll(new TimeValue(20000)).execute()  
		                    .actionGet();  
	        	}
	        }  
	        vo.setSr(res);
	        return vo;
	    }
	   		
	/*   public static void main(String[] args) {
		   List<Map<String,Object>> dataMap = new ArrayList<>();
		   Map<String,Object> m1=new HashMap<>();
		   m1.put("", true);
		   HttpUtils.doPost(url, params);
	}*/
	   
    /**
     * 判斷指定的索引名是否存在
     * 
     * @param indexName
     *            索引名
     * @return 存在:true; 不存在:false;
     */
    public static boolean isExistsIndex(String indexName) {
        IndicesExistsResponse response = getTransportClient().admin().indices().exists(new IndicesExistsRequest().indices(new String[] { indexName }))
                .actionGet();
        return response.isExists();
    }
    
    /**
     * 判斷指定的索引的型別是否存在
     * 
     * @param indexName
     *            索引名
     * @param indexType
     *            索引型別
     * @return 存在:true; 不存在:false;
     */
    public static boolean isExistsType(String indexName, String indexType) {
        TypesExistsResponse response = getTransportClient().admin().indices().typesExists(new TypesExistsRequest(new String[] { indexName }, indexType))
                .actionGet();
        return response.isExists();
    }
    
    /**
     * 建立索引
     * @param indexName
     * @param indexType
     * @param isFielddata 是否需要支援排序
     */
    public static void createIndexType(String indexName, String indexType, boolean isFielddata) {
        CreateIndexRequestBuilder cib = getTransportClient().admin().indices().prepareCreate(indexName);
        XContentBuilder mapping;
        try {
            mapping = XContentFactory.jsonBuilder().startObject()
                        .startObject("_all").field("enabled", false).endObject()// 關閉_all欄位
                        .startObject("_source").field("enabled", true).endObject()// 開啟_source欄位
                        .startObject("properties") // 設定之定義欄位
                            .startObject("_default_")
                            .field("type", "text") // 設定資料型別
//                            .field("date_detection", false) // 設定欄位不做時間檢測
                            .field("fielddata", isFielddata) // 設定支援排序
                            .endObject()
                        .endObject()
                       .endObject();
            cib.addMapping(indexType, mapping);
            cib.execute().actionGet();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }


}

注意事項

 ① 在ES 5.6.9版本你可以直接用一個JSON字串來進行新增和修改操作,但是在6.2.2版本中這個方法被取締,使用的時候一定       要注意ES的版本,在6.0版本以後你可以直接傳遞一個Map來做資料的增改操作;

 /** 
	     * 建立索引 
	     * @param indexName 索引名稱,相當於資料庫名稱 
	     * @param typeName 索引型別,相當於資料庫中的表名 
	     * @param id id名稱,相當於每個表中某一行記錄的標識 
	     * @param jsonData json資料 
	     */  
		public static void createIndex(String indexName, String typeName, String id,  
	            Map<String,Object> dataMap) {  
	    	TransportClient transportClient = getTransportClient();
	    	if(transportClient!=null) {
				IndexRequestBuilder requestBuilder = transportClient.prepareIndex(indexName, typeName, id).setSource(dataMap);
				requestBuilder.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
				requestBuilder.execute().actionGet();
//				transportClient.close();
	    	}else {
	    		log.error(">>>>>>>>>ES連線初始化失敗,建立索引失敗<<<<<<<<<");
	    	}
	    }  
ElasticSearchUtils.createIndex(indexName, indexName, null, tempMap);

② ES工具如果在介面中呼叫頻繁,一定要做好相應的關閉或者構建該工具的單例項以免JAVA虛擬機器被耗資源,這個耗費資源的      度還是非常大的;

  //取得例項  
	    public static  TransportClient getTransportClient(){  
	    	if (client != null)
	    		return client;
	    	else {
		    	try {
			    	  Settings settings = Settings.builder()
													    			.put("cluster.name",CLUSTER_NAME)  
													                .put("client.transport.sniff", true)  //啟動叢集嗅探
													                .build(); 
			    	
						client = new PreBuiltTransportClient(settings)
								     .addTransportAddress(new TransportAddress(InetAddress.getByName(IP), PORT));
					} catch (UnknownHostException e) {
						log.error(">>>>>>>>>ES連線初始化失敗<<<<<<<<<", e);
						client = null;
					} 
	    	}
	    	return client;
	    }  

③ Scroll分頁查詢的效率在資料量上來後比form to操作來的更好,但是要記住,在6.0版本後已經不存在SearType為SCAN的類      型,也就是說,它第一次查詢就會給你返回資料,不會像之前一樣查詢第二次才給你返回資料,所以如果你想要獲取全量數        據,那麼程式碼你就得按規範來修改

/** 
	     * 分頁查詢
	     * @param index 
	     * @param types 
	     * @return 
	     */  
	   public static EsSearchVo findByPage(String indexName, Integer size,Integer pageNum,String sortField,QueryBuilder queryBuilder)  {  
		   EsSearchVo vo = new EsSearchVo();
	    	TransportClient transportClient = getTransportClient();
	    	SearchResponse res = null;
	    	SearchResponse searchResponse = transportClient.prepareSearch(indexName)
	    																								.setQuery(queryBuilder)       
	    			                                                                                    .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)//這種方式返回的document與使用者要求的size是相等的。
	    			                                                                                    .setSize(size)
																						                .addSort(sortField, SortOrder.DESC)
																						                .setExplain(true)// 設定是否按查詢匹配度排序
																						                .setScroll(new TimeValue(20000)).execute()  //設定TimeValue表示需要保持搜尋的上下文時間。
																						                .actionGet();//注意:首次搜尋已經包含資料  
	        //獲取總數量  
	        long totalCount = searchResponse.getHits().getTotalHits();  
	        vo.setTotal(totalCount);
	        
	        int page  = 0;
	        int pageCount=0;
	        if(totalCount%size==0) {
	        	pageCount = (int)totalCount/(size);
	        }else {
	        	pageCount = (int)totalCount/(size)+1;
	        }
	        if(totalCount<size) {
	        	page = 1;
	        }else {
	        	page=(int)totalCount/(size);
	        }
	        
	        log.info("*************************ES Page Query Size Number is:"+pageCount+"************************");  
	        log.info("*************************ES Page Query Match Data Number is:"+totalCount+"************************");  
	        for (int i = 1; i <=page; i++) {  
	        	if(pageNum-1==0) {
	        		res = searchResponse; 
	        		break;
	        	}else if(pageNum-1==i){
	            //再次傳送請求,並使用上次搜尋結果的ScrollId  
	        		res = transportClient.prepareSearchScroll(searchResponse.getScrollId())  
														                    .setScroll(new TimeValue(20000)).execute()  
														                    .actionGet();  
	                    
	              break;
	        	}else {
	        		searchResponse = transportClient.prepareSearchScroll(searchResponse.getScrollId())  
		                    .setScroll(new TimeValue(20000)).execute()  
		                    .actionGet();  
	        	}
	        }  
	        vo.setSr(res);
	        return vo;
	    }

④ 如果你分頁的單頁數量比你ES中存在的資料量還大,舉例說,你ES中該查詢只有5條資料,但是你分頁的pageSize是10,那        麼如果你不對這一步查詢做相關處理的話資料你是拿不到的,處理如下:

 if(totalCount<size) {
	        	page = 1;
	        }else {
	        	page=(int)totalCount/(size);
	        }

⑤ ES新建索引也就是新增資料的流程中如果伴隨著查詢操作的話,那麼你的自增操作必須加上更新策略支援,不然在此流程中查 詢是 獲取不到資料的,常用策略有RefreshPolicy.IMMEDIATE(更新策略為立即更新),RefreshPolicy.NONE(更新策略為預設,也就是每幾秒執行一次批量更行,這是ES的預設策略),RefreshPolicy.WAIT_UNTIL(這個暫時不清楚,字面意識是等待什麼操作完成)。ElasticSearch 實際上是偽實時的,所有分片之間預設1s同步更新

③ 使用

@SuppressWarnings("unchecked")
	private Map<String, Object> inserEsFirst(ConfigVo configVo, AlertWarnVo alertWarnVo, String indexName) {
		Map<String,Object> resultMap = new HashMap<>();
		 //用來關聯元資料與豐富過後的資料的識別符號
	    String sourceId = new SimpleDateFormat("yyyyMMddhhmmssSSS").format(new Date());
		boolean  flag = true;
		try {
				//如果告警狀態沒有,強制設定為open
	    	    if(alertWarnVo.getEventValue()==null||StringUtils.isEmpty(alertWarnVo.getEventValue().toString())) {
	    	    	alertWarnVo.setWarnStatus("OPEN");
	    	    }else if("0".equals(alertWarnVo.getEventValue())) {
	    	    	alertWarnVo.setWarnStatus("CLOSE");
	    	    }else if("1".equals(alertWarnVo.getEventValue())) {
	    	    	alertWarnVo.setWarnStatus("OPEN");
	    	    }
			    Map<String, Object> tempMap = (Map<String, Object>) JSON.parse(JSON.toJSONString(alertWarnVo));
	    	    tempMap.put("source", configVo.getDataSource());
	    	    //設定預設狀態過濾標識位為false
	    	    tempMap.put("isFilter", false);
	    	    tempMap.put("sourceId", sourceId);
	    	    resultMap.put("sourceId", sourceId);
			   ElasticSearchUtils.createIndex(indexName, indexName, null, tempMap);
		} catch (Exception e) {
			log.error(">>>>>>>>>Es Data Insert Failed!!!<<<<<<<<<", e);
			flag =false;
		}
		resultMap.put("flag", flag);
		return resultMap;
	}

	@SuppressWarnings("unchecked")
	private Map<String,Object> updateESFilter(ConfigVo configVo, AlertWarnVo o,String indexName) {
		Map<String,Object> resultMap = new HashMap<>();
		boolean  flag = true;
		try {
			    Map<String, Object> tempMap = (Map<String, Object>) JSON.parse(JSON.toJSONString(o));
			    //根據sourceId存在修改
			    if(tempMap.get("sourceId")!=null&&StringUtils.isNotEmpty(tempMap.get("sourceId").toString())) {
			    	    tempMap.put("isFilter", true);
					    //indexName與typeName一樣
					    ElasticSearchUtils.updateByQueryTemp(indexName, indexName, tempMap);
			    }else {
			    	log.error(">>>>>>>>>Es Data Update Failed Cause SourceId Is Empty!!!<<<<<<<<<" );
			    	flag =false;
			    }
		} catch (Exception e) {
			log.error(">>>>>>>>>Es Data Update Failed!!!<<<<<<<<<", e);
			flag =false;
		}
		resultMap.put("flag", flag);
		return resultMap;
	}
@Override
	public Map<String, Object> queryBaseInfo() {
		Map<String, Object> dataMap = new HashMap<>();
		try {
			Calendar baseCal = Calendar.getInstance();
			baseCal.set(Calendar.HOUR_OF_DAY,0);
			baseCal.set(Calendar.MINUTE, 0);
			baseCal.set(Calendar.SECOND, 0);
			baseCal.add(Calendar.DATE,0);
			//查詢Disaster資料,CLOSED資料不算其中
			SearchResponse searcherDisaster = ElasticSearchUtils.searcher(PUtils.getString("es.index.notice"),
									                                           PUtils.getString("es.index.notice"), 
									                                           QueryBuilders.boolQuery()
									                                           .must(QueryBuilders.matchQuery("severity", "Disaster"))
									                                           .must(QueryBuilders.rangeQuery("eventTime").gt(baseCal.getTimeInMillis()))
									                                           .mustNot(QueryBuilders.matchQuery("warnStatus", "CLOSED")));
			if(searcherDisaster!=null&&searcherDisaster.getHits().getTotalHits()!=0) {
				dataMap.put("DisasterCount", searcherDisaster.getHits().getTotalHits());
			}else {
				dataMap.put("DisasterCount", 0);
			}
			//查詢High資料,CLOSED資料不算其中
			SearchResponse searcherHigh = ElasticSearchUtils.searcher(PUtils.getString("es.index.notice"),
														                PUtils.getString("es.index.notice"), 
														                QueryBuilders.boolQuery()
														                .must(QueryBuilders.matchQuery("severity", "High"))
														                .must(QueryBuilders.rangeQuery("eventTime").gt(baseCal.getTimeInMillis()))
														                .mustNot(QueryBuilders.matchQuery("warnStatus", "CLOSED")));
			if(searcherHigh!=null&&searcherHigh.getHits().getTotalHits()!=0) {
			dataMap.put("HighCount", searcherHigh.getHits().getTotalHits());
			}else {
			dataMap.put("HighCount", 0);
			}
			//查詢Average資料,CLOSED資料不算其中
			SearchResponse searcherAverage= ElasticSearchUtils.searcher(PUtils.getString("es.index.notice"),
															                PUtils.getString("es.index.notice"), 
															                QueryBuilders.boolQuery()
															                .must(QueryBuilders.matchQuery("severity", "Average"))
															                .must(QueryBuilders.rangeQuery("eventTime").gt(baseCal.getTimeInMillis()))
															                .mustNot(QueryBuilders.matchQuery("warnStatus", "CLOSED")));
			if(searcherAverage!=null&&searcherAverage.getHits().getTotalHits()!=0) {
			dataMap.put("AverageCount", searcherAverage.getHits().getTotalHits());
			}else {
			dataMap.put("AverageCount", 0);
			}									
												
			//查詢Warning資料,CLOSED資料不算其中
			SearchResponse searcherWarning= ElasticSearchUtils.searcher(PUtils.getString("es.index.notice"),
																                PUtils.getString("es.index.notice"), 
																                QueryBuilders.boolQuery()
																                .must(QueryBuilders.matchQuery("severity", "Warning"))
																                .must(QueryBuilders.rangeQuery("eventTime").gt(baseCal.getTimeInMillis()))
																                .mustNot(QueryBuilders.matchQuery("warnStatus", "CLOSED")));
			if(searcherWarning!=null&&searcherWarning.getHits().getTotalHits()!=0) {
			dataMap.put("WarningCount", searcherWarning.getHits().getTotalHits());
			}else {
			dataMap.put("WarningCount", 0);
			}	
			//查詢Not Classified資料,CLOSED資料不算其中
			SearchResponse searcherNotClassified= ElasticSearchUtils.searcher(PUtils.getString("es.index.notice"),
					PUtils.getString("es.index.notice"), 
					QueryBuilders.boolQuery()
					.must(QueryBuilders.matchQuery("severity", "Not Classified"))
					.must(QueryBuilders.rangeQuery("eventTime").gt(baseCal.getTimeInMillis()))
					.mustNot(QueryBuilders.matchQuery("warnStatus", "CLOSED")));
			if(searcherNotClassified!=null&&searcherNotClassified.getHits().getTotalHits()!=0) {
				dataMap.put("NotClassifiedCount", searcherNotClassified.getHits().getTotalHits());
			}else {
				dataMap.put("NotClassifiedCount", 0);
			}	
			//查詢Information資料,CLOSED資料不算其中
			SearchResponse searcherInformation= ElasticSearchUtils.searcher(PUtils.getString("es.index.notice"),
					PUtils.getString("es.index.notice"), 
					QueryBuilders.boolQuery()
					.must(QueryBuilders.matchQuery("severity", "Information"))
					.must(QueryBuilders.rangeQuery("eventTime").gt(baseCal.getTimeInMillis()))
					.mustNot(QueryBuilders.matchQuery("warnStatus", "CLOSED")));
			if(searcherInformation!=null&&searcherInformation.getHits().getTotalHits()!=0) {
				dataMap.put("InformationCount", searcherInformation.getHits().getTotalHits());
			}else {
				dataMap.put("InformationCount", 0);
			}	
			//查詢近7天告警總數資料,CLOSED資料不算其中
			List<Long> eventsLst = new ArrayList<>();
			for(int i=0;i<7;i++) {
				RangeQueryBuilder rangequerybuilder = null;
				if(i==6) {
					Calendar cal = Calendar.getInstance();
					cal.set(Calendar.HOUR_OF_DAY,0);
					cal.set(Calendar.MINUTE, 0);
					cal.set(Calendar.SECOND, 0);
					cal.add(Calendar.DATE,(i-6));
					rangequerybuilder = QueryBuilders.rangeQuery("eventTime").gt(cal.getTimeInMillis());
				}else {
					Calendar cal1 = Calendar.getInstance();
					cal1.set(Calendar.HOUR_OF_DAY,0);
					cal1.set(Calendar.MINUTE, 0);
					cal1.set(Calendar.SECOND, 0);
					cal1.add(Calendar.DATE,(i-6));
					
					Calendar cal2 = Calendar.getInstance();
					cal2.set(Calendar.HOUR_OF_DAY,0);
					cal2.set(Calendar.MINUTE, 0);
					cal2.set(Calendar.SECOND, 0);
					cal2.add(Calendar.DATE,(i-5));
					rangequerybuilder = QueryBuilders.rangeQuery("eventTime").gt(cal1.getTimeInMillis()).lt(cal2.getTimeInMillis());
				}
				
				SearchResponse searcher = ElasticSearchUtils.searcher(PUtils.getString("es.index.notice"), PUtils.getString("es.index.notice"), rangequerybuilder);
				if(searcher!=null&&searcher.getHits().getTotalHits()!=0) {
					eventsLst.add(searcher.getHits().getTotalHits());
					}else {
						eventsLst.add(0L);
					}		
			}
			
			dataMap.put("eventsLst", eventsLst);
		} catch (Exception e) {
			e.printStackTrace();
			logger.error(">>>>>>>>>Query ES Base Source Has failed!!!<<<<<<<<<", e);
			dataMap=null;
		}
		return dataMap;
	}

	@SuppressWarnings({ "rawtypes", "unchecked" })
	@Override
	public List<Map<String, Object>> queryAllEvents(Integer pageNum, Integer pageSize) {
		List<Map<String, Object>> dataLst = new ArrayList<>();
		try {
			EsSearchVo vo = ElasticSearchUtils.findByPage(PUtils.getString("es.index.notice"),pageSize,pageNum,"eventTime",
			        QueryBuilders.boolQuery()
			        .must(QueryBuilders.matchAllQuery()));
			if(vo!=null&&vo.getSr()!=null) {
				SearchResponse searcher = vo.getSr();
				logger.info("******************Total Match Sources Are:"+searcher.getHits().getTotalHits()+"********************");
				SearchHits hits = searcher.getHits();
				for (SearchHit searchHit : hits.getHits()) {
					if(searchHit.getSourceAsString()!=null&&searchHit.getSourceAsString().length()>0) {
						Map parseMap = JSON.parseObject(searchHit.getSourceAsString(), Map.class);
						parseMap.put("page", pageNum);
						parseMap.put("size", pageSize);
						parseMap.put("total", vo.getTotal()==null?0:vo.getTotal());
						dataLst.add(parseMap);
					}
				}
			}else {
				logger.info("******************No Source Matched In ES!!!!********************");
			}
			
		} catch (Exception e) {
			e.printStackTrace();
			logger.error("****************** Query All Es Events Failed,Cause: ********************",e);
			dataLst=null;
		} 
		
		return dataLst;
	}

	@SuppressWarnings({ "rawtypes", "unchecked" })
	@Override
	public List<Map<String, Object>> queryCloseEvents(Integer pageNum, Integer pageSize) {
		List<Map<String, Object>> dataLst = new ArrayList<>();
		try {
			EsSearchVo vo = ElasticSearchUtils.findByPage(PUtils.getString("es.index.notice"),pageSize,pageNum,"eventTime",
					QueryBuilders.boolQuery()
			        .must(QueryBuilders.matchQuery("warnStatus","CLOSED")));
			if(vo!=null&&vo.getSr()!=null) {
				SearchResponse searcher = vo.getSr();
				logger.info("******************Closed Match Sources Are:"+searcher.getHits().getTotalHits()+"********************");
				SearchHits hits = searcher.getHits();
				for (SearchHit searchHit : hits.getHits()) {
					if(searchHit.getSourceAsString()!=null&&searchHit.getSourceAsString().length()>0) {
						Map parseMap = JSON.parseObject(searchHit.getSourceAsString(), Map.class);
						parseMap.put("page", pageNum);
						parseMap.put("size", pageSize);
						parseMap.put("total", vo.getTotal()==null?0:vo.getTotal());
						dataLst.add(parseMap);
					}
				}
			}
			
		} catch (Exception e) {
			e.printStackTrace();
			logger.error("****************** Query Closed Es Events Failed,Cause: ********************",e);
			dataLst=null;
		} 
		
		return dataLst;
	}

	@SuppressWarnings({ "rawtypes", "unchecked" })
	@Override
	public List<Map<String, Object>> queryOpenEvents(Integer pageNum, Integer pageSize) {
		List<Map<String, Object>> dataLst = new ArrayList<>();
		try {
			EsSearchVo vo = ElasticSearchUtils.findByPage(PUtils.getString("es.index.notice"),pageSize,pageNum,"eventTime",
			        QueryBuilders.boolQuery()
			        .must(QueryBuilders.matchQuery("warnStatus","OPEN")));
			if(vo!=null&&vo.getSr()!=null) {
				SearchResponse searcher = vo.getSr();
				logger.info("******************Open Match Sources Are:"+searcher.getHits().getTotalHits()+"********************");
				SearchHits hits = searcher.getHits();
				for (SearchHit searchHit : hits.getHits()) {
					if(searchHit.getSourceAsString()!=null&&searchHit.getSourceAsString().length()>0) {
						Map parseMap = JSON.parseObject(searchHit.getSourceAsString(), Map.class);
						parseMap.put("page", pageNum);
						parseMap.put("size", pageSize);
						parseMap.put("total", vo.getTotal()==null?0:vo.getTotal());
						dataLst.add(parseMap);
					}
				}
			}
			
		} catch (Exception e) {
			e.printStackTrace();
			logger.error("****************** Query Open Es Events Failed,Cause: ********************",e);
			dataLst=null;
		} 
		
		return dataLst;
	}

特別提醒:在剛啟動ES的時候,ES中是沒有庫的,這個時候要在服務啟動的時候就建立專案中預設用到的ES庫,否則會報錯,建立方式分為兩種,一種為實現InitializingBean介面,或者實現ApplicationListener介面然後新增如下程式碼,前者要在該實現類配置相應的註解,@resource或者compnent都行,然後配置檔案得掃描到該實現類所在的位置;後者要在spring配置檔案中註冊也就是<bean>的方式。

//判斷temp庫是否存在
        if (!ElasticSearchUtils.isExistsIndex(PUtils.getString("es.index.temp"))) {
            logger.error("ES庫["+PUtils.getString("es.index.temp")+ "]不存在,建立新庫.......");
            //建立Temp庫
            ElasticSearchUtils.createIndexType(PUtils.getString("es.index.temp"), PUtils.getString("es.index.temp"), true);
        }
        if (!ElasticSearchUtils.isExistsIndex(PUtils.getString("es.index.notice"))) {
            logger.error("ES庫["+PUtils.getString("es.index.notice")+ "]不存在,建立新庫.......");
            //建立notice庫
            ElasticSearchUtils.createIndexType(PUtils.getString("es.index.notice"), PUtils.getString("es.index.notice"), true);
        }
        if (!ElasticSearchUtils.isExistsIndex(PUtils.getString("es.index.color"))) {
            logger.error("ES庫["+PUtils.getString("es.index.color")+ "]不存在,建立新庫.......");
            //建立color庫
            ElasticSearchUtils.createIndexType(PUtils.getString("es.index.color"), PUtils.getString("es.index.color"), true);
        }

總結:

ES 的ClusterName相當於就是一個數據庫mysql,indexName相當於他的庫名,typeName相當於是表名,_id相當於此表的主鍵,當然你也可以自定義主鍵來完成相應的需求,它的效率與mysql在大資料量中相比要強大N倍。