1. 程式人生 > >Elasticsearch 中如何巧妙地使用聚合函式達到資料庫中having的效果

Elasticsearch 中如何巧妙地使用聚合函式達到資料庫中having的效果

在現實開發中難免會遇到一些業務場景,通過聚合得出相應的頻次並進行篩選

1.使用 minDocCount  直接上程式碼,大家可自行根據業務場景更改

//正確答案
SearchRequestBuilder search = transportlient.prepareSearch("bigdata_idx_2").setTypes("captureCompare");
FilterAggregationBuilder sub= AggregationBuilders.filter("channel_longitudeC").filter(QueryBuilders.rangeQuery("fcmp_time").from(startTime).to(endTime));
//分組欄位是id,排序由多個欄位排序組成
TermsBuilder tb= AggregationBuilders.terms("fcmp_fobj_id").field("fcmp_fobj_id").valueType(Terms.ValueType.STRING).order(Terms.Order.compound(
Terms.Order.aggregation("channel_longitudeC",false)//先按count,降序排
//如果count相等情況下,使用code的和排序
));
//求和欄位1
ValueCountBuilder sb= AggregationBuilders.count("channel_longitudeC");
tb.subAggregation(sb).minDocCount(400);//新增到分組聚合請求中

//將分組聚合請求插入到主請求體重
// search.setPostFilter()
search.addAggregation(tb);

2.稍微複雜些,還有另外一種場景,就是我聚合的同時,需要把其他相應的欄位資訊也同時返回出來 Top Hits Aggregation

類似SQL : select *,count(*) from XXX group by a ......

SearchResponse response = null;
		SearchRequestBuilder responsebuilder = transportlient.prepareSearch("syrk_bigdata_capturecmp_passer_idx")
				.setTypes("captureCompare").setFrom(0).setSize(100000);
		AggregationBuilder aggregation = AggregationBuilders
				.terms("agg")
				.field("idNumb")
				.subAggregation(
						AggregationBuilders.topHits("top").setFrom(0)
								.setSize(1)).size(100000);
		response = responsebuilder.setQuery(QueryBuilders.boolQuery()
				.must(QueryBuilders.rangeQuery("fcapTime").from(Long.valueOf(startTime)).to(Long.valueOf(endTime))))
				.addSort("idNumb", SortOrder.ASC)
				.addAggregation(aggregation)// .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
				.setExplain(true).execute().actionGet();
		SearchHits hits = response.getHits();//最後取結果時不要使用此hits
Terms agg = response.getAggregations().get("agg"); long end = System.currentTimeMillis(); System.out.println("ES run time: " + (end - start) + "ms"); /**插入之前首先清除當天資料,以免重複新增**/ SyrkRegionFcapperPasserStatistics temp = new SyrkRegionFcapperPasserStatistics(); temp.setDate(Long.valueOf(startTime)); try{ syrkRegionFcapperPasserStatisticsService.deletePasser(temp); for (Terms.Bucket entry : agg.getBuckets()) { String key = (String) entry.getKey(); // bucket key long docCount = entry.getDocCount(); // Doc count // We ask for top_hits for each bucket TopHits topHits = entry.getAggregations().get("top"); for (SearchHit hit : topHits.getHits().getHits()) { compareUuid= (String) hit.getSource().get("idNumb"); } /** 讀取資料寫入mysql **/ } logger.info("All Analysis Data has insert : date is "+startTime); }catch (Exception e){ logger.info("Analysis Result Data failed ,date is "+startTime); }


聚合後的總數取相應的 docCount  其他欄位資訊從hits 中獲取

切記,不要取最外層的hits ,因為外層的hits 和聚合的hits數量會不一致,遍歷取回造成資料不一致