Elasticsearch 中如何巧妙地使用聚合函式達到資料庫中having的效果
阿新 • • 發佈:2019-02-12
在現實開發中難免會遇到一些業務場景,通過聚合得出相應的頻次並進行篩選
1.使用 minDocCount 直接上程式碼,大家可自行根據業務場景更改
//正確答案 SearchRequestBuilder search = transportlient.prepareSearch("bigdata_idx_2").setTypes("captureCompare"); FilterAggregationBuilder sub= AggregationBuilders.filter("channel_longitudeC").filter(QueryBuilders.rangeQuery("fcmp_time").from(startTime).to(endTime)); //分組欄位是id,排序由多個欄位排序組成 TermsBuilder tb= AggregationBuilders.terms("fcmp_fobj_id").field("fcmp_fobj_id").valueType(Terms.ValueType.STRING).order(Terms.Order.compound( Terms.Order.aggregation("channel_longitudeC",false)//先按count,降序排 //如果count相等情況下,使用code的和排序 )); //求和欄位1 ValueCountBuilder sb= AggregationBuilders.count("channel_longitudeC"); tb.subAggregation(sb).minDocCount(400);//新增到分組聚合請求中 //將分組聚合請求插入到主請求體重 // search.setPostFilter() search.addAggregation(tb);
2.稍微複雜些,還有另外一種場景,就是我聚合的同時,需要把其他相應的欄位資訊也同時返回出來 Top Hits Aggregation
類似SQL : select *,count(*) from XXX group by a ......
SearchResponse response = null; SearchRequestBuilder responsebuilder = transportlient.prepareSearch("syrk_bigdata_capturecmp_passer_idx") .setTypes("captureCompare").setFrom(0).setSize(100000); AggregationBuilder aggregation = AggregationBuilders .terms("agg") .field("idNumb") .subAggregation( AggregationBuilders.topHits("top").setFrom(0) .setSize(1)).size(100000); response = responsebuilder.setQuery(QueryBuilders.boolQuery() .must(QueryBuilders.rangeQuery("fcapTime").from(Long.valueOf(startTime)).to(Long.valueOf(endTime)))) .addSort("idNumb", SortOrder.ASC) .addAggregation(aggregation)// .setSearchType(SearchType.DFS_QUERY_THEN_FETCH) .setExplain(true).execute().actionGet(); SearchHits hits = response.getHits();//最後取結果時不要使用此hits
Terms agg = response.getAggregations().get("agg"); long end = System.currentTimeMillis(); System.out.println("ES run time: " + (end - start) + "ms"); /**插入之前首先清除當天資料,以免重複新增**/ SyrkRegionFcapperPasserStatistics temp = new SyrkRegionFcapperPasserStatistics(); temp.setDate(Long.valueOf(startTime)); try{ syrkRegionFcapperPasserStatisticsService.deletePasser(temp); for (Terms.Bucket entry : agg.getBuckets()) { String key = (String) entry.getKey(); // bucket key long docCount = entry.getDocCount(); // Doc count // We ask for top_hits for each bucket TopHits topHits = entry.getAggregations().get("top"); for (SearchHit hit : topHits.getHits().getHits()) { compareUuid= (String) hit.getSource().get("idNumb"); } /** 讀取資料寫入mysql **/ } logger.info("All Analysis Data has insert : date is "+startTime); }catch (Exception e){ logger.info("Analysis Result Data failed ,date is "+startTime); }
聚合後的總數取相應的 docCount 其他欄位資訊從hits 中獲取
切記,不要取最外層的hits ,因為外層的hits 和聚合的hits數量會不一致,遍歷取回造成資料不一致