ES對應mysql的group by分組查詢javaApi,多對多關係的分組查詢
ES對應mysql的group by分組查詢javaApi,多對多關係的分組查詢
比如我這邊有個下列訂單索引資料,現在的需求是按使用者(fmerchantId)和支付方式(fchannelId)進行分組統計訂單總金額(famt)和總筆數,其中使用者和支付方式是多對多的關係,就是一個使用者會對應多個支付方式,一個支付方式會對應多個使用者
{ "famt": "2", "fbankCode": "0000_0002", "fbankName": "支付寶", "fchannelCode": "ALIPAY", "fchannelId": "993", "fchannelName": "支付寶", "fchannelTradeNo": "2020072222001439181419030679", "fchgAgenCode": "111111", "fcreateDate": "2020-07-22", "fcreateDay": "22", "fcreateMonth": "07", "fcreateTime": "2020-07-22 14:46:57", "fcreateYear": "2020", "fdeviceType": "phone", "fmerchantId": "5200002020072001", "fmerchantName": "測試使用者", "forderNo": "483325941654679552", "fpayCode": "36000019115000000287", "fthirdpayTradeNo": "2007221446570296", "ftradeStatus": "1" }
1.建立查詢條件,相當於mysql的where條件
其中SearchSourceBuilder相當於mysql中的一條完整sql語句,BoolQueryBuilder相當於where條件,根據需求自行新增where條件,最後把boolQueryBuilder的where條件新增到SearchSourceBuilder的sql中
//查詢條件 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); //成功狀態的支付訂單 boolQueryBuilder.must(QueryBuilders.termsQuery("ftradeStatus", Arrays.asList("1", "2", "3", "4"))); //使用者,支付方式不能為空 boolQueryBuilder.must(QueryBuilders.existsQuery("fmerchantId")); boolQueryBuilder.must(QueryBuilders.existsQuery("fchannelId")); boolQueryBuilder.must(QueryBuilders.existsQuery("fchannelName")); String startDate = queryMap.get("startDate") == null ? null : queryMap.get("startDate").toString(); String endDate = queryMap.get("endDate") == null ? null : queryMap.get("endDate").toString(); //大於等於 if (!StringUtils.isEmpty(startDate)) { boolQueryBuilder.must(QueryBuilders.rangeQuery("fcreateDate").gte(startDate)); } //小於 if (!StringUtils.isEmpty(endDate)) { boolQueryBuilder.must(QueryBuilders.rangeQuery("fcreateDate").lte(endDate)); } //新增查詢條件 searchSourceBuilder.query(boolQueryBuilder);
2.新增分組條件,相當於group by條件
TermsAggregationBuilder相當於mysql中的group by分組查詢條件欄位,建立要分組的各個欄位TermsAggregationBuilder,AggregationBuilders.terms("fchannelTypeId").field(
"fchannelId").size(searchSize).order(BucketOrder.key(true))中terms是建立的別名欄位(類似mysql select a as "A"),field是索引中的欄位,size可設定查詢數量大小,order進行排序。
然後進行group by欄位的拼接,用termsAggregationBuilder.subAggregation(termsAggregationBuilder3),相當於group by a,b欄位,a和b都是欄位.
注意ES中是可以拼接物件的,比如我先執行termsAggregationBuilder2.subAggregation(AggregationBuilders.sum("money").field("famt")),這是根據使用者ID和使用者總金額分組了;再執行termsAggregationBuilder.subAggregation(termsAggregationBuilder2),相當於使用者ID和使用者總金額分組當作一個物件b和支付方式a欄位一起分組了(group by a,b),其中a是一個欄位,b是一個物件(b中包含使用者和總金額的分組),這就是ES的分組內再分組
//按聚合名稱標識對桶進行升序排序
TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders.terms("fchannelTypeId").field(
"fchannelId").size(searchSize).order(BucketOrder.key(true));//支付方式ID
TermsAggregationBuilder termsAggregationBuilder2 = AggregationBuilders.terms("fmerchantTypeId").field(
"fmerchantId");//使用者ID編號
TermsAggregationBuilder termsAggregationBuilder3 = AggregationBuilders.terms("fbankTypeCode").field(
"fbankCode");//渠道商編號
TermsAggregationBuilder termsAggregationBuilder4 = AggregationBuilders.terms("fbankTypeName").field(
"fbankName.keyword");//渠道商名稱
TermsAggregationBuilder termsAggregationBuilder5 = AggregationBuilders.terms("fchannelTypeName").field(
"fchannelName.keyword");//支付渠道名稱
//1.先按支付渠道,渠道商編號,渠道商名稱,支付渠道名稱進行分組
termsAggregationBuilder.subAggregation(termsAggregationBuilder3).subAggregation(termsAggregationBuilder4).subAggregation(termsAggregationBuilder5);
//2.再在商戶編號裡統計金額分組
termsAggregationBuilder2.subAggregation(AggregationBuilders.sum("money").field("famt"));
//2.1按金額倒序排列
List<FieldSortBuilder> fieldSorts=new ArrayList<>();
fieldSorts.add(new FieldSortBuilder("money").order(SortOrder.DESC));
termsAggregationBuilder2.subAggregation(new BucketSortPipelineAggregationBuilder("bucket_field", fieldSorts).size(searchSize));
//3.拼接分組
termsAggregationBuilder.subAggregation(termsAggregationBuilder2);
3.執行查詢語句
這個總的語句相當與 select(支付方式,其他欄位,(使用者,sum(amt) ) as b )as a,sum(amt) from 表 group by a ,其中a是以支付方式為主鍵的一個分組物件,a物件中包含了支付方式,其他欄位和使用者物件b的分組。b物件是以使用者為主鍵的使用者,使用者總金額分組。
//總的分組,把第二步建立的分組看作一個物件,在進行總分組
searchSourceBuilder.aggregation(termsAggregationBuilder);
searchSourceBuilder.aggregation(AggregationBuilders.sum("totalAmt").field("famt"));
//執行ES的查詢
SearchResponse response = ESUtils.findAll(payTradeIndex, payTradeType, searchSourceBuilder, null);
4.取值物件
取值總金額,上面最外層的sum(amt)就是所用訂單的總金額
取值a物件的分組,可以獲取分組欄位的值和b物件
再取值b物件裡面的使用者和金額(這個金額就是對應的每個使用者和支付方式分組統計的總金額了)
Aggregations aggregations = response.getAggregations();
Sum totalAmtSum = aggregations.get("totalAmt");
//總金額
double totalAmt = totalAmtSum.getValue();
DecimalFormat df = new DecimalFormat("0.00");
String totalMoney = df.format(totalAmt / 100);
Map<String, Aggregation> aggMap = response.getAggregations().asMap();
ParsedStringTerms codeTerms = (ParsedStringTerms) aggMap.get("fchannelTypeId");
Iterator<Terms.Bucket> codeBucketIt = (Iterator<Terms.Bucket>) codeTerms.getBuckets().iterator();
while (codeBucketIt.hasNext()) {
Terms.Bucket codeBucket = codeBucketIt.next();
//使用者編號的分組terms物件
ParsedStringTerms nameTerms = (ParsedStringTerms) codeBucket.getAggregations().asMap().get("fmerchantTypeId");
//渠道商名稱
ParsedStringTerms nameTerms1 = (ParsedStringTerms) codeBucket.getAggregations().asMap().get("fbankTypeName");
String fbankTypeName = nameTerms1.getBuckets().get(0).getKey().toString();
//渠道商編號
ParsedStringTerms nameTerms2 = (ParsedStringTerms) codeBucket.getAggregations().asMap().get("fbankTypeCode");
String fbankTypeCode = nameTerms2.getBuckets().get(0).getKey().toString();
//支付渠道名稱
ParsedStringTerms nameTerms3 = (ParsedStringTerms) codeBucket.getAggregations().asMap().get("fchannelTypeName");
String fchannelTypeName = nameTerms3.getBuckets().get(0).getKey().toString();
Iterator<Terms.Bucket> nameBucketIt = (Iterator<Terms.Bucket>) nameTerms.getBuckets().iterator();
while (nameBucketIt.hasNext()){
Terms.Bucket nameBucket = nameBucketIt.next();
//金額
Sum term = nameBucket.getAggregations().get("money");
String money = df.format(term.getValue() / 100);
//使用者編號
String fmerchantTypeId = nameBucket.getKey().toString();
//統計筆數
Long count = nameBucket.getDocCount();
esDataList.add(new MerchantChannelCountModel(fmerchantTypeId,null,fbankTypeCode,fbankTypeName,
String.valueOf(codeBucket.getKey()),fchannelTypeName,money,
count.intValue()));
}
}