1. 程式人生 > 實用技巧 >es 工具類

es 工具類

package com.zxwa.ntmss.process.common.util.es;


import cn.hutool.core.collection.CollectionUtil;
import com.google.common.collect.Lists;
import com.zxwa.ntmss.process.common.util.StringUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.lucene.search.join.ScoreMode; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.join.query.HasChildQueryBuilder; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.script.Script; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.cardinality.Cardinality; import org.elasticsearch.search.aggregations.metrics.cardinality.CardinalityAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.sum.Sum; import org.elasticsearch.search.aggregations.metrics.sum.SumAggregationBuilder; import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.transport.client.PreBuiltTransportClient; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.math.BigDecimal; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.*; public class ElasticSearchUtil { private static Logger logger = LoggerFactory.getLogger(ElasticSearchUtil.class); private static final String FILE_PATH = "conf.properties"; private static TransportClient client = null; static { try { PropertiesConfiguration config = new PropertiesConfiguration(FILE_PATH); String clusterName = config.getString("es.cluster.name"); List<String> addressList = config.getList("es.address"); int port = config.getInt("es.port"); Settings settings = Settings.builder().put("cluster.name", clusterName).build(); client = new PreBuiltTransportClient(settings); for (String address : addressList) { client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(address), port)); } } catch (UnknownHostException e) { logger.error("獲取elasticSearch連線失敗", e); } catch (ConfigurationException e) { logger.error("獲取配置檔案失敗", e); } } public static TransportClient getClient() { return client; } /** * 從es中分頁獲取rowKey資料 * * @param tableName * @param query * @param pageIndex * @param pageSize * @param sortFild * @param desc * @return */ public static ESRowkeyResult getRowKeyFromEsByPage(QueryBuilder query, Integer pageIndex, Integer pageSize, String sortFild, Boolean desc, String... tableName) { String[] index = new String[tableName.length]; for (int i = 0; i < tableName.length; i++) { // es 中index只支援小寫 index[i] = tableName[i].toLowerCase(); } SearchRequestBuilder requestBuilder = client.prepareSearch(index).setQuery(query) // Query .setFrom((pageIndex - 1) * pageSize).setSize(pageSize); if (StringUtils.isNotEmpty(sortFild)) { requestBuilder.addSort(sortFild, desc ? SortOrder.DESC : SortOrder.ASC); } SearchResponse response = requestBuilder.get(); List<String> rowKeyList = new ArrayList<>(); for (SearchHit hit : response.getHits()) { String rowKey = hit.getId(); rowKeyList.add(rowKey); } ESRowkeyResult result = new ESRowkeyResult(); result.setRowkeys(rowKeyList); result.setTotal((int) response.getHits().getTotalHits()); return result; } /** * 方法名:getRowKeyFromEsByPage<br> * 描述:分頁查詢es中rowkey<br> * * @param query * @param pageIndex * @param pageSize * @param sortMap * @param tableName * @return*/ public static ESRowkeyResult getRowKeyFromEsByPage(QueryBuilder query, Integer pageIndex, Integer pageSize, Map<String, Boolean> sortMap, String... tableName) { String[] index = new String[tableName.length]; for (int i = 0; i < tableName.length; i++) { // es 中index只支援小寫 index[i] = tableName[i].toLowerCase(); } SearchRequestBuilder requestBuilder = client.prepareSearch(index).setQuery(query) // Query .setFrom((pageIndex - 1) * pageSize).setSize(pageSize); if (sortMap != null && !sortMap.isEmpty()) { for (String key : sortMap.keySet()) { Boolean value = sortMap.get(key); requestBuilder.addSort(key, value ? SortOrder.DESC : SortOrder.ASC); } } SearchResponse response = requestBuilder.get(); List<String> rowKeyList = new ArrayList<>(); for (SearchHit hit : response.getHits()) { String rowKey = hit.getId(); rowKeyList.add(rowKey); } ESRowkeyResult result = new ESRowkeyResult(); result.setRowkeys(rowKeyList); result.setTotal((int) response.getHits().getTotalHits()); return result; } /** * 方法名:getRowKeyFromEsByPage<br> * 描述:支援一個維度的聚合分頁查詢<br> * * @param query * @param groupbyName * @param unique * @param page * @param sortMap * @param tableName<br> * @return com.zxwa.ntmss.common.plugin.es.ESRowkeyResult<br> */ public static ESRowkeyResult getRowKeyFromEsByPage(QueryBuilder query, String groupbyName, String unique, Pagination page, Map<String, Boolean> sortMap, String... tableName) { if (StringUtils.isEmpty(groupbyName) || StringUtils.isEmpty(unique)) { throw new RuntimeException("引數為空."); } String[] index = new String[tableName.length]; for (int i = 0; i < tableName.length; i++) { // es 中index只支援小寫 index[i] = tableName[i].toLowerCase(); } // Script script = new Script("doc['shopType'].values + '#'+ doc['commodityId'].values"); Script script = new Script(unique); CardinalityAggregationBuilder cardinalityBuilder = AggregationBuilders.cardinality(groupbyName).script(script); SearchRequestBuilder searchBuilder = client.prepareSearch(index).setQuery(query).addAggregation(cardinalityBuilder); SearchResponse searchResponse = searchBuilder.get(); Cardinality cardinality = searchResponse.getAggregations().get(groupbyName); long total = cardinality.getValue(); int endIndex = 10; int pageSize = 10; int finalIndex = 10; if (page != null) { pageSize = page.getLimit(); endIndex = pageSize * page.getPage(); finalIndex = pageSize * page.getPage(); if (finalIndex > total) { finalIndex = (int) total; } } TermsAggregationBuilder groupTermsBuilder2 = AggregationBuilders.terms(groupbyName).script(script).size(endIndex).order(Terms.Order.compound( Terms.Order.aggregation(groupbyName, true) )); CardinalityAggregationBuilder cardinalityBuilder2 = AggregationBuilders.cardinality(groupbyName).script(script); groupTermsBuilder2.subAggregation(cardinalityBuilder2); SearchRequestBuilder searchbuilder2 = client.prepareSearch(index).setQuery(query).addAggregation(groupTermsBuilder2); if (sortMap != null && !sortMap.isEmpty()) { for (String key : sortMap.keySet()) { Boolean value = sortMap.get(key); searchbuilder2.addSort(key, value ? SortOrder.DESC : SortOrder.ASC); } } SearchResponse response = searchbuilder2.get(); Terms terms2 = response.getAggregations().get(groupbyName); List<String> rowKeyList = new ArrayList<>(); List<? extends Terms.Bucket> buckets = terms2.getBuckets(); if (CollectionUtil.isNotEmpty(buckets)) { for (int i = endIndex - pageSize; i < finalIndex; i++) { String rowkey = buckets.get(i).getKeyAsString(); rowkey = rowkey.replaceAll("\\[", "").replaceAll("\\]", ""); rowKeyList.add(rowkey); } } ESRowkeyResult result = new ESRowkeyResult(); result.setRowkeys(rowKeyList); result.setTotal((int) total); return result; } public static ESRowkeyResult getTableNameRowKeyFromEsByPage(QueryBuilder query, Integer pageIndex, Integer pageSize, Map<String, Boolean> sortMap, String... tableName) { String[] index = new String[tableName.length]; for (int i = 0; i < tableName.length; i++) { // es 中index只支援小寫 index[i] = tableName[i].toLowerCase(); } SearchRequestBuilder requestBuilder = client.prepareSearch(index) .setQuery(query) // Query .setFrom((pageIndex - 1) * pageSize) .setSize(pageSize); if (!sortMap.isEmpty()) { for (String key : sortMap.keySet()) { Boolean value = sortMap.get(key); requestBuilder.addSort(key, value ? SortOrder.DESC : SortOrder.ASC); } } SearchResponse response = requestBuilder.get(); List<String> rowKeyList = new ArrayList<>(); for (SearchHit hit : response.getHits()) { String rowKey = hit.getId(); String tanlename = hit.getIndex(); rowKeyList.add(tanlename + "@#" + rowKey); } ESRowkeyResult result = new ESRowkeyResult(); result.setRowkeys(rowKeyList); result.setTotal((int) response.getHits().getTotalHits()); return result; } /** * 根據條件從es中獲取rowKey * * @param query * @param sortFild * @param desc * @param tableName * @return */ public static List<String> getRowKeyFromEs(QueryBuilder query, String sortFild, Boolean desc, String... tableName) { String[] index = new String[tableName.length]; for (int i = 0; i < tableName.length; i++) { // es 中index只支援小寫 index[i] = tableName[i].toLowerCase(); } SearchRequestBuilder requestBuilder = client.prepareSearch(index).setQuery(query); if (StringUtils.isNotEmpty(sortFild)) { requestBuilder.addSort(sortFild, desc ? SortOrder.DESC : SortOrder.ASC); } SearchResponse response = requestBuilder.get(); List<String> rowKeyList = new ArrayList<>(); for (SearchHit hit : response.getHits()) { String rowKey = hit.getId(); rowKeyList.add(rowKey); } return rowKeyList; } /** * 方法名:getRowKeyFromEsDefine * 描述: 查詢RowKey * 引數:<br> * * @param query 查詢條件 * @param sortFild 排序欄位 * @param desc * @param count 自定義查詢資料總量 * @param tableName * @return java.util.List<java.lang.String */ public static List<String> getRowKeyFromEsDefine(QueryBuilder query, String sortFild, Boolean desc, int count, String... tableName) { String[] index = new String[tableName.length]; for (int i = 0; i < tableName.length; i++) { // es 中index只支援小寫 index[i] = tableName[i].toLowerCase(); } SearchRequestBuilder requestBuilder = client.prepareSearch(index).setQuery(query).setSize(count); if (StringUtils.isNotEmpty(sortFild)) { requestBuilder.addSort(sortFild, desc ? SortOrder.DESC : SortOrder.ASC); } SearchResponse response = requestBuilder.get(); List<String> rowKeyList = new ArrayList<>(); for (SearchHit hit : response.getHits()) { String rowKey = hit.getId(); rowKeyList.add(rowKey); } return rowKeyList; } /** * 根據條件統計總數 * * @param tableName * @param query * @return */ public static long getResultCountFromEs(QueryBuilder query, String... tableName) { String[] index = new String[tableName.length]; for (int i = 0; i < tableName.length; i++) { // es 中index只支援小寫 index[i] = tableName[i].toLowerCase(); } SearchResponse response = client.prepareSearch(index).setQuery(query).setSize(1).get(); long count = response.getHits().getTotalHits(); return count; } /** * 多維度分組統計 * * @param query * @param groupBy 分組統計欄位 可以為多值 用 ,分割 * @param tableName * @param size 統計出來的大小 按照數量排序(比如可以取前十) * @return */ public static Map<String, Object> staticsGroupCount(QueryBuilder query, String groupBy, int size, String... tableName) { if (StringUtils.isEmpty(groupBy)) { return null; } String[] index = new String[tableName.length]; for (int i = 0; i < tableName.length; i++) { // es 中index只支援小寫 index[i] = tableName[i].toLowerCase(); } SearchRequestBuilder requestBuilder = client.prepareSearch(index); if (query != null) { requestBuilder.setQuery(query); } String[] facets = groupBy.split(","); AggregationBuilder aggregation = null; for (int i = 0; i < facets.length; i++) { if (i == 0) { aggregation = AggregationBuilders.terms(facets[i]).field(facets[i]).size(size); } else { aggregation.subAggregation(AggregationBuilders.terms(facets[i]).field(facets[i]).size(size)); } } requestBuilder.addAggregation(aggregation); SearchResponse response = requestBuilder.execute().actionGet(); Map<String, Object> resultMap = new HashMap<>(); Terms terms = response.getAggregations().get(facets[0]); parseFacetPivotField(resultMap, terms, facets, 1); return resultMap; } /** * 方法名:parseFacetPivotField * 描述: 將es多維統計結果遞迴轉換為需要的map結構 * 引數:<br> * * @param resultMap * @param terms * @param facets * @param index * @return void */ public static void parseFacetPivotField(Map<String, Object> resultMap, Terms terms, String[] facets, int index) { for (Terms.Bucket entry : terms.getBuckets()) { if (index < facets.length) { Terms tempTerms = entry.getAggregations().get(facets[index]); Map<String, Object> pivotFieldTempMap = new HashMap<String, Object>(); resultMap.put(entry.getKeyAsString(), pivotFieldTempMap); parseFacetPivotField(pivotFieldTempMap, tempTerms, facets, index + 1); } else { resultMap.put(entry.getKeyAsString(), entry.getDocCount()); } } } /** * 方法名:getRowKeyAndTableNameFromEs<br> * 描述:根據查詢條件獲取rowkey和tableName,以"@##@"隔開<br> * * @param query * @param tableName*/ public static List<String> getRowKeyAndTableNameFromEs(QueryBuilder query, String... tableName) { String[] index = new String[tableName.length]; for (int i = 0; i < tableName.length; i++) { // es 中index只支援小寫 index[i] = tableName[i].toLowerCase(); } SearchRequestBuilder requestBuilder = client.prepareSearch(index).setQuery(query); SearchResponse response = requestBuilder.get(); List<String> rowKeyList = new ArrayList<>(); for (SearchHit hit : response.getHits()) { String rowKey = hit.getId(); String tablename = hit.getIndex(); rowKeyList.add(rowKey + "@##@" + tablename); } return rowKeyList; } /** * @param tableName 表名稱 * @param query 查詢條件 * @param groupBy1 分組欄位1 * @param groupBy2 分組欄位2 * @param sumField 求和欄位 ,不傳的話按照分組取數量 * @param size 需要返回結果集數量 * @return map : key=groupBy1#groupBy2 value=計算結果值 */ public static LinkedHashMap<String, String> querySumOrCountStatByGroup2Dim(String tableName, QueryBuilder query, String groupBy1, String groupBy2, String sumField, int size) { LinkedHashMap<String, String> resultMap = new LinkedHashMap<String, String>(); String sumName = "sum_" + sumField; //構建查詢請求體 SearchRequestBuilder search = client.prepareSearch(tableName); //分組欄位是id,排序由多個欄位排序組成 TermsAggregationBuilder tb = AggregationBuilders.terms(groupBy1).field(groupBy1); TermsAggregationBuilder tb1 = AggregationBuilders.terms(groupBy2).field(groupBy2); if (StringUtils.isNotEmpty(sumField)) { tb1.order(Terms.Order.compound( Terms.Order.aggregation(sumName, false)//先按count,降序排 )).size(size); //求和欄位1 SumAggregationBuilder sb = AggregationBuilders.sum(sumName).field(sumField); tb1.subAggregation(sb);//新增到分組聚合請求中 } else { tb1.order(Terms.Order.compound( Terms.Order.count(false) )).size(size); } tb.subAggregation(tb1); if (query != null) { search.setQuery(query); } //將分組聚合請求插入到主請求體重 search.addAggregation(tb); //傳送查詢,獲取聚合結果 Terms terms = search.get().getAggregations().get(groupBy1); for (Terms.Bucket entry : terms.getBuckets()) { Terms tempTerms = entry.getAggregations().get(groupBy2); for (Terms.Bucket entry1 : tempTerms.getBuckets()) { if (StringUtils.isNotEmpty(sumField)) { Sum sum = entry1.getAggregations().get(sumName); resultMap.put(entry.getKeyAsString() + "#" + entry1.getKeyAsString(), StringUtils.formatNum(sum.getValue())); } else { resultMap.put(entry.getKeyAsString() + "#" + entry1.getKeyAsString(), String.valueOf(entry1.getDocCount())); } } } List<Map.Entry<String, String>> list = new ArrayList<Map.Entry<String, String>>(resultMap.entrySet()); Collections.sort(list, new Comparator<Map.Entry<String, String>>() { //降序排序 public int compare(Map.Entry<String, String> o1, Map.Entry<String, String> o2) { return Double.valueOf(o2.getValue()).compareTo(Double.valueOf(o1.getValue())); } }); resultMap = new LinkedHashMap<String, String>(); if (list.size() > size) { list = list.subList(0, size); } for (Map.Entry<String, String> mapping : list) { resultMap.put(mapping.getKey(), mapping.getValue()); } return resultMap; } /** * 更新索引 * * @param tableName * @param docId * @param fieldMap * @return */ public static boolean updateData(String tableName, String docId, Map<String, Object> fieldMap) { if (StringUtils.isEmpty(tableName) || StringUtils.isEmpty(docId)) { throw new RuntimeException("tableName or docId is null"); } if (MapUtils.isEmpty(fieldMap)) { throw new RuntimeException("no data"); } boolean flag = false; UpdateRequest request = new UpdateRequest(); try { XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject(); for (String key : fieldMap.keySet()) { xContentBuilder.field(key, fieldMap.get(key)); } xContentBuilder.endObject(); request.index(tableName).type(tableName).id(docId).doc(xContentBuilder).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); UpdateResponse response = client.update(request).get(); flag = response.status().getStatus() == RestStatus.OK.getStatus(); } catch (Exception e) { logger.error("#####更新索引失敗.#####", e); } return flag; } /** * es 遊標獲取總數 * * @param tableName 表名 * @param query 查詢條件 */ public void esScrollDataHelp(String tableName, QueryBuilder query) { TransportClient client = ElasticSearchUtil.getClient(); SearchResponse scrollResp = client.prepareSearch(tableName) .setTypes(tableName) .setScroll(new TimeValue(60000)) .setQuery(query) .addSort(SortBuilders.fieldSort("_doc")) .setSize(100).get(); long totalCount = scrollResp.getHits().getTotalHits();// 獲取總數量 System.out.println("totalCount:" + totalCount); do { for (SearchHit hit : scrollResp.getHits().getHits()) { String id = hit.getId(); System.out.println(id); } scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet(); } while (scrollResp.getHits().getHits().length != 0); } private static void scrollOutput(SearchResponse response) { SearchHits hits = response.getHits(); for (int j = 0; j < hits.getHits().length; j++) { try { String id = hits.getHits()[j].getId(); System.out.println(id); } catch (Exception e) { e.printStackTrace(); } } } /** * 遊標滾動獲取 * * @param tableName 表名 * @param query 查詢條件 */ public void scrollByPage(String tableName, QueryBuilder query) { TransportClient client = ElasticSearchUtil.getClient(); SearchResponse response = client.prepareSearch(tableName) .setTypes(tableName) .setQuery(query) .addSort(SortBuilders.fieldSort("_doc")) .setSize(100) .setScroll(new TimeValue(60000)).get(); long totalCount = response.getHits().getTotalHits();// 獲取總數量 int page = (int) totalCount / 100; //計算總頁數,每次搜尋數量為分片數*設定的size大小 System.out.println("totalCount:" + totalCount); scrollOutput(response); for (int i = 0; i < page; i++) { response = client.prepareSearchScroll(response.getScrollId()).setScroll(new TimeValue(60000)).get(); scrollOutput(response); } } /** * 多欄位分組並求和;實現sql:select field1, field2, sum(field3) from table_name group by field1, field2; */ public static void twiceAggSum(QueryBuilder query, String tableName, String groupField1, String groupField2, String sumField) { String group1_ = "group_" + groupField1; String group2_ = "group_" + groupField2; String sumField_ = "sum_" + sumField; //分組欄位是id,排序由多個欄位排序組成 TermsAggregationBuilder tb1 = AggregationBuilders.terms(group1_).field(groupField1); TermsAggregationBuilder tb2 = AggregationBuilders.terms(group2_).field(groupField2); tb2.order(Terms.Order.compound(Terms.Order.aggregation(sumField_, false))); SumAggregationBuilder sb = AggregationBuilders.sum(sumField_).field(sumField); tb2.subAggregation(sb); tb1.subAggregation(tb2); SearchRequestBuilder search = client.prepareSearch(tableName); if (query != null) { search.setQuery(query); } search.addAggregation(tb1); Terms userAgg = search.get().getAggregations().get(group1_); for (Terms.Bucket entry : userAgg.getBuckets()) { Terms sexAgg = entry.getAggregations().get(group2_); for (Terms.Bucket entry2 : sexAgg.getBuckets()) { Sum sum = entry2.getAggregations().get(sumField_); System.out.println("cityCode:" + entry.getKey() + "\t-------shopType:" + entry2.getKey() + "\t----------sum:" + new BigDecimal(sum.getValue()).setScale(0, BigDecimal.ROUND_HALF_UP).toPlainString()); } System.out.println(); } } /** * 多欄位分組並求和;實現sql:select field1, field2, sum(field3) from table_name group by field1, field2; */ public static void twiceAggCount(QueryBuilder query, String tableName, String groupField1, String groupField2) { String group1_ = "group_" + groupField1; String group2_ = "group_" + groupField2; TermsAggregationBuilder tb1 = AggregationBuilders.terms(group1_).field(groupField1); TermsAggregationBuilder tb2 = AggregationBuilders.terms(group2_).field(groupField2); tb1.subAggregation(tb2); SearchRequestBuilder searchRequestBuilder = client.prepareSearch(tableName); SearchResponse searchResponse = searchRequestBuilder.addAggregation(tb1).setQuery(query).execute().actionGet(); Map<String, Aggregation> aggMap = searchResponse.getAggregations().asMap(); StringTerms gradeTerms = (StringTerms) aggMap.get(group1_); Iterator<StringTerms.Bucket> iterator = gradeTerms.getBuckets().iterator(); while (iterator.hasNext()) { StringTerms.Bucket gradeBucket = iterator.next(); // System.out.println(gradeBucket.getKey() + "年級有" + gradeBucket.getDocCount() + "個學生。"); StringTerms classTerms = (StringTerms) gradeBucket.getAggregations().asMap().get(group2_); Iterator<StringTerms.Bucket> classBucketIt = classTerms.getBuckets().iterator(); while (classBucketIt.hasNext()) { StringTerms.Bucket classBucket = classBucketIt.next(); System.out.println("cityCode:" + gradeBucket.getKey() + "\t-------shopType:" + classBucket.getKey() + "\t----------sum:" + classBucket.getDocCount()); } System.out.println(); } } public static void main(String[] args) { BoolQueryBuilder query = QueryBuilders.boolQuery().must(QueryBuilders.fuzzyQuery("shopName", "安徽")); SearchRequestBuilder zx_cloud_shop_common = client.prepareSearch("zx_cloud_shop_common"); SearchRequestBuilder searchRequestBuilder = zx_cloud_shop_common.setQuery(query); SearchResponse searchResponse = searchRequestBuilder.execute().actionGet(); Iterator<SearchHit> iterator = searchResponse.getHits().iterator(); while (iterator.hasNext()){ SearchHit next = iterator.next(); String sourceAsString = next.getSourceAsString(); System.out.println(sourceAsString); } } }