Elasticsearch JAVA api輕鬆搞定groupBy聚合
本文給出如何使用Elasticsearch的Java API做類似SQL的group by聚合。
為了簡單起見,只給出一級groupby即group by field1(而不涉及到多級,例如group by field1, field2, ...);如果你需要多級的groupby,在實現上可能需要拆分的更加細緻。
即將給出的方法,適用於如下的場景:
場景1:找出分組中的所有桶,例如,select group_name from index_name group by group_name;
場景2:靈活新增一個或者多個聚合函式,例如,select group_name, max(count), avg(count) group by group_name;
1、用法
GroupBy類是我們的實現。
1)測試用例
public static void main(String[] args) { /* * 初始化es客戶端 * */ ESClient esClient = new ESClient( "dqa-cluster", "10.93.21.21:9300,10.93.18.34:9300,10.93.18.35:9300,100.90.62.33:9300,100.90.61.14:9300", false); /* * 為了演示, 構造了一個距離查詢, 相當於where子句. * */ GeoDistanceRangeQueryBuilder queryBuilder = QueryBuilders.geoDistanceRangeQuery("location") .point(39.971424, 116.398251) .from("0m") .to(String.format("%fm", 500.0)) .includeLower(true) .includeUpper(true) .optimizeBbox("memory") .geoDistance(GeoDistance.SLOPPY_ARC); SearchRequestBuilder search = esClient.getClient().prepareSearch("moon").setTypes("bj") .setSearchType(SearchType.DFS_QUERY_AND_FETCH) .setQuery(queryBuilder); /* * GroupBy類就是我們的實現, 初始化的時候傳入的引數依次是, search, 桶命名, 分桶欄位, 排序asc * select date as date_group from index group by date; * */ GroupBy groupBy = new GroupBy(search, "date_group", "date", true); /* * 新增各種分組函式 * 這裡我實現了10種, 下面是其中的6種 * */ groupBy.addSumAgg("pre_total_fee_sum", "pre_total_fee"); groupBy.addAvgAgg("pre_total_fee_avg", "pre_total_fee"); groupBy.addPercentilesAgg("pre_total_fee_percent", "pre_total_fee"); groupBy.addPercentileRanksAgg("pre_total_fee_percentRank", "pre_total_fee", new double[]{13, 16, 20}); groupBy.addStatsAgg("pre_total_fee_stats", "pre_total_fee"); groupBy.addCardinalityAgg("type_card", "type"); /* * 獲取groupBy聚合的結果 * 結果是兩級Map, 這裡的實現是TreeMap因為要保護桶的排序 * */ Map<String, Object> groupbyResponse = groupBy.getGroupbyResponse(); for (Map.Entry<String, Object> entry : groupbyResponse.entrySet()) { String bucketKey = entry.getKey(); Map<String, String> subAggMap = (Map<String, String>) entry.getValue(); System.out.println(String.format("%st%st%s", bucketKey, "pre_total_fee_sum", subAggMap.get("pre_total_fee_sum"))); System.out.println(String.format("%st%st%s", bucketKey, "pre_total_fee_avg", subAggMap.get("pre_total_fee_avg"))); System.out.println(String.format("%st%st%s", bucketKey, "pre_total_fee_percent", subAggMap.get("pre_total_fee_percent"))); System.out.println(String.format("%st%st%s", bucketKey, "pre_total_fee_percentRank", subAggMap.get("pre_total_fee_percentRank"))); System.out.println(String.format("%st%st%s", bucketKey, "pre_total_fee_stats", subAggMap.get("pre_total_fee_stats"))); System.out.println(String.format("%st%st%s", bucketKey, "type_card", subAggMap.get("type_card"))); } }
2)初始化
初始化的時候,相當於構造了這樣一個SQL:select date as date_group from index group by date;
傳入search物件,相當於where子句
傳入分桶命名, 相當於 as date_group
傳入分桶欄位,相當於date
傳入排序,asc=true
3)初始化完成後,可以新增各種聚合函式,也就是場景2。
GroupBy類裡實現了10種聚合函式
4)讀取結果
結果的返回是兩級Map,為了保護分桶的排序,實現中使用了TreeMap。
這裡需要注意的是,有些聚合函式的返回,並不是一個值,而是一組值,如Percentiles、Stats等等,這裡我們把這一組值壓縮成JSONString了。
5)列印輸出
我們以日期進行了分桶,同一個分桶中的聚合結果,sum、avg、cardinality都是單個的值。而percentiles、percentileRanks、stats是壓縮的jsonstring。
2、實現
先上程式碼,然後在後面進行講解。
public class GroupBy {
private SearchRequestBuilder search;
private String termsName;
private TermsBuilder termsBuilder;
private List<Map<String, Object>> subAggList = new ArrayList<Map<String, Object>>();
public GroupBy(SearchRequestBuilder search, String termsName, String fieldName, boolean asc) {
this.search = search;
this.termsName = termsName;
termsBuilder = AggregationBuilders.terms(termsName).field(fieldName).order(Terms.Order.term(asc)).size(0);
}
private void addSubAggList(String aggName, MetricsAggregationBuilder aggBuilder) {
Map<String, Object> subAgg = new HashMap<String, Object>();
subAgg.put("aggName", aggName);
subAgg.put("aggBuilder", aggBuilder);
subAggList.add(subAgg);
}
public void addSumAgg(String aggName, String fieldName) {
SumBuilder builder = AggregationBuilders.sum(aggName).field(fieldName);
termsBuilder.subAggregation(builder);
addSubAggList(aggName, builder);
}
public boolean bucketSumAgg(Terms.Bucket bucket, String aggName, MetricsAggregationBuilder aggBuilder, Map<String, String> tmpMap) {
if (aggBuilder instanceof SumBuilder) {
tmpMap.put(aggName, bucket.getAggregations().get(aggName).getProperty("value").toString());
return true;
} else {
return false;
}
}
public void addCountAgg(String aggName, String fieldName) {
ValueCountBuilder builder = AggregationBuilders.count(aggName).field(fieldName);
termsBuilder.subAggregation(builder);
addSubAggList(aggName, builder);
}
public boolean bucketCountAgg(Terms.Bucket bucket, String aggName, MetricsAggregationBuilder aggBuilder, Map<String, String> tmpMap) {
if (aggBuilder instanceof ValueCountBuilder) {
tmpMap.put(aggName, bucket.getAggregations().get(aggName).getProperty("value").toString());
return true;
} else {
return false;
}
}
public void addAvgAgg(String aggName, String fieldName) {
AvgBuilder builder = AggregationBuilders.avg(aggName).field(fieldName);
termsBuilder.subAggregation(builder);
addSubAggList(aggName, builder);
}
public boolean bucketAvgAgg(Terms.Bucket bucket, String aggName, MetricsAggregationBuilder aggBuilder, Map<String, String> tmpMap) {
if (aggBuilder instanceof AvgBuilder) {
tmpMap.put(aggName, bucket.getAggregations().get(aggName).getProperty("value").toString());
return true;
} else {
return false;
}
}
public void addMinAgg(String aggName, String fieldName) {
MinBuilder builder = AggregationBuilders.min(aggName).field(fieldName);
termsBuilder.subAggregation(builder);
addSubAggList(aggName, builder);
}
public boolean bucketMinAgg(Terms.Bucket bucket, String aggName, MetricsAggregationBuilder aggBuilder, Map<String, String> tmpMap) {
if (aggBuilder instanceof MinBuilder) {
tmpMap.put(aggName, bucket.getAggregations().get(aggName).getProperty("value").toString());
return true;
} else {
return false;
}
}
public void addMaxAgg(String aggName, String fieldName) {
MaxBuilder builder = AggregationBuilders.max(aggName).field(fieldName);
termsBuilder.subAggregation(builder);
addSubAggList(aggName, builder);
}
public boolean bucketMaxAgg(Terms.Bucket bucket, String aggName, MetricsAggregationBuilder aggBuilder, Map<String, String> tmpMap) {
if (aggBuilder instanceof MaxBuilder) {
tmpMap.put(aggName, bucket.getAggregations().get(aggName).getProperty("value").toString());
return true;
} else {
return false;
}
}
public void addStatsAgg(String aggName, String fieldName) {
StatsBuilder builder = AggregationBuilders.stats(aggName).field(fieldName);
termsBuilder.subAggregation(builder);
addSubAggList(aggName, builder);
}
public boolean bucketStatsAgg(Terms.Bucket bucket, String aggName, MetricsAggregationBuilder aggBuilder, Map<String, String> tmpMap) {
if (aggBuilder instanceof StatsBuilder) {
Stats stats = bucket.getAggregations().get(aggName);
JSONObject jsonObject = new JSONObject();
jsonObject.put("min", stats.getMin());
jsonObject.put("max", stats.getMax());
jsonObject.put("sum", stats.getMax());
jsonObject.put("count", stats.getCount());
jsonObject.put("avg", stats.getAvg());
tmpMap.put(aggName, jsonObject.toJSONString());
return true;
} else {
return false;
}
}
public void addExtendedStatsAgg(String aggName, String fieldName) {
ExtendedStatsBuilder builder = AggregationBuilders.extendedStats(aggName).field(fieldName);
termsBuilder.subAggregation(builder);
addSubAggList(aggName, builder);
}
public boolean bucketExtendedStatsAgg(Terms.Bucket bucket, String aggName, MetricsAggregationBuilder aggBuilder, Map<String, String> tmpMap) {
if (aggBuilder instanceof ExtendedStatsBuilder) {
ExtendedStats extendedStats = bucket.getAggregations().get(aggName);
JSONObject jsonObject = new JSONObject();
jsonObject.put("min", extendedStats.getMin());
jsonObject.put("max", extendedStats.getMax());
jsonObject.put("sum", extendedStats.getMax());
jsonObject.put("count", extendedStats.getCount());
jsonObject.put("avg", extendedStats.getAvg());
jsonObject.put("stdDeviation", extendedStats.getStdDeviation());
jsonObject.put("sumOfSquares", extendedStats.getSumOfSquares());
jsonObject.put("variance", extendedStats.getVariance());
tmpMap.put(aggName, jsonObject.toJSONString());
return true;
} else {
return false;
}
}
public void addPercentilesAgg(String aggName, String fieldName) {
PercentilesBuilder builder = AggregationBuilders.percentiles(aggName).field(fieldName);
termsBuilder.subAggregation(builder);
addSubAggList(aggName, builder);
}
public void addPercentilesAgg(String aggName, String fieldName, double[] percentiles) {
PercentilesBuilder builder = AggregationBuilders.percentiles(aggName).field(fieldName).percentiles(percentiles);
termsBuilder.subAggregation(builder);
addSubAggList(aggName, builder);
}
public boolean bucketPercentilesAgg(Terms.Bucket bucket, String aggName, MetricsAggregationBuilder aggBuilder, Map<String, String> tmpMap) {
if (aggBuilder instanceof PercentilesBuilder) {
Percentiles percentiles = bucket.getAggregations().get(aggName);
JSONObject jsonObject = new JSONObject();
for (Percentile percentile : percentiles) {
jsonObject.put(String.valueOf(percentile.getPercent()), percentile.getValue());
}
tmpMap.put(aggName, jsonObject.toJSONString());
return true;
} else {
return false;
}
}
public void addPercentileRanksAgg(String aggName, String fieldName, double[] percentiles) {
PercentileRanksBuilder builder = AggregationBuilders.percentileRanks(aggName).field(fieldName).percentiles(percentiles);
termsBuilder.subAggregation(builder);
addSubAggList(aggName, builder);
}
public boolean bucketPercentileRanksAgg(Terms.Bucket bucket, String aggName, MetricsAggregationBuilder aggBuilder, Map<String, String> tmpMap) {
if (aggBuilder instanceof PercentileRanksBuilder) {
PercentileRanks percentileRanks = bucket.getAggregations().get(aggName);
JSONObject jsonObject = new JSONObject();
for (Percentile percentile : percentileRanks) {
jsonObject.put(String.valueOf(percentile.getPercent()), percentile.getValue());
}
tmpMap.put(aggName, jsonObject.toJSONString());
return true;
} else {
return false;
}
}
public void addCardinalityAgg(String aggName, String fieldName) {
CardinalityBuilder builder = AggregationBuilders.cardinality(aggName).field(fieldName);
termsBuilder.subAggregation(builder);
addSubAggList(aggName, builder);
}
public boolean bucketCardinalityAgg(Terms.Bucket bucket, String aggName, MetricsAggregationBuilder aggBuilder, Map<String, String> tmpMap) {
if (aggBuilder instanceof CardinalityBuilder) {
tmpMap.put(aggName, bucket.getAggregations().get(aggName).getProperty("value").toString());
return true;
} else {
return false;
}
}
public List<Terms.Bucket> getTermsBucket() {
search.addAggregation(termsBuilder);
Terms termsGroup = search.get().getAggregations().get(termsName);
return termsGroup.getBuckets();
}
public Map<String, Object> getGroupbyResponse() {
Map<String, Object> aggResponseMap = new TreeMap<String, Object>();
for (Terms.Bucket bucket : getTermsBucket()) {
String bucketKeyAsString = bucket.getKeyAsString();
Map<String, String> tmpMap = new TreeMap<String, String>();
for (Map<String, Object> subAgg : subAggList) {
String subAggName = subAgg.get("aggName").toString();
MetricsAggregationBuilder subAggBuilder = (MetricsAggregationBuilder) subAgg.get("aggBuilder");
if (bucketAvgAgg(bucket, subAggName, subAggBuilder, tmpMap)) continue;
if (bucketMaxAgg(bucket, subAggName, subAggBuilder, tmpMap)) continue;
if (bucketMinAgg(bucket, subAggName, subAggBuilder, tmpMap)) continue;
if (bucketSumAgg(bucket, subAggName, subAggBuilder, tmpMap)) continue;
if (bucketCountAgg(bucket, subAggName, subAggBuilder, tmpMap)) continue;
if (bucketCardinalityAgg(bucket, subAggName, subAggBuilder, tmpMap)) continue;
if (bucketPercentileRanksAgg(bucket, subAggName, subAggBuilder, tmpMap)) continue;
if (bucketPercentilesAgg(bucket, subAggName, subAggBuilder, tmpMap)) continue;
if (bucketExtendedStatsAgg(bucket, subAggName, subAggBuilder, tmpMap)) continue;
if (bucketStatsAgg(bucket, subAggName, subAggBuilder, tmpMap)) continue;
}
aggResponseMap.put(bucketKeyAsString, tmpMap);
}
return aggResponseMap;
}
}
1)建構函式
建構函式中,核心邏輯是termsBuilder = AggregationBuilders.terms(termsName).field(fieldName).order(Terms.Order.term(asc)).size(0);
例項化了termsBuilder也就是分桶。
後面呼叫add...函式簇新增聚合函式的時候,都是通過termsBuilder.subAggregation(builder)在分桶的基礎上添加了子聚合。
最後在獲取結果的時候search.addAggregation(termsBuilder);將termsBuilder新增到查詢上,進行聚合查詢。
2)新增聚合函式add...函式簇
以sum函式為例
public void addSumAgg(String aggName, String fieldName) {
SumBuilder builder = AggregationBuilders.sum(aggName).field(fieldName);
termsBuilder.subAggregation(builder);
addSubAggList(aggName, builder);
}
a)初始化了一個SumBuilder聚合操作,然後作為termsBuilder的子聚合。
b)addSubAggList方法在subAggList屬性(subAggList屬性是一個List<Map<String, Object>>)上儲存了所有添加了的子聚合的名字和builder。這樣做是為了在解析結果的時候,知道是哪種type的聚合(instanceof),以便使用不同的邏輯去解析。
private void addSubAggList(String aggName, MetricsAggregationBuilder aggBuilder) {
Map<String, Object> subAgg = new HashMap<String, Object>();
subAgg.put("aggName", aggName);
subAgg.put("aggBuilder", aggBuilder);
subAggList.add(subAgg);
}
3)按型別獲取結果
還是以sum函式為例
public boolean bucketSumAgg(Terms.Bucket bucket, String aggName, MetricsAggregationBuilder aggBuilder, Map<String, String> tmpMap) {
if (aggBuilder instanceof SumBuilder) {
tmpMap.put(aggName, bucket.getAggregations().get(aggName).getProperty("value").toString());
return true;
} else {
return false;
}
}
a)這裡先判斷了aggBuilder是哪種型別的(instanceof),如果是SumBuilder型別的,就按照sum的結果型別去讀取返回結果。
b)sum的返回結果就是一個值,當遇到percentiles這種型別的,返回結果不是一個值,此時為了簡單,我將結果壓縮成了jsonstring,也相當於一個值,可以自行參看程式碼。
c)後面依賴return true實現了一個邏輯,一旦命中了型別,就不再繼續判斷了,提升效率。
d)tmpMap是外部傳入的一個全域性接收器,用來儲存結果。
4)解析所有的子聚合結果
public Map<String, Object> getGroupbyResponse() {
Map<String, Object> aggResponseMap = new TreeMap<String, Object>();
for (Terms.Bucket bucket : getTermsBucket()) {
String bucketKeyAsString = bucket.getKeyAsString();
Map<String, String> tmpMap = new TreeMap<String, String>();
for (Map<String, Object> subAgg : subAggList) {
String subAggName = subAgg.get("aggName").toString();
MetricsAggregationBuilder subAggBuilder = (MetricsAggregationBuilder) subAgg.get("aggBuilder");
if (bucketAvgAgg(bucket, subAggName, subAggBuilder, tmpMap)) continue;
if (bucketMaxAgg(bucket, subAggName, subAggBuilder, tmpMap)) continue;
if (bucketMinAgg(bucket, subAggName, subAggBuilder, tmpMap)) continue;
if (bucketSumAgg(bucket, subAggName, subAggBuilder, tmpMap)) continue;
if (bucketCountAgg(bucket, subAggName, subAggBuilder, tmpMap)) continue;
if (bucketCardinalityAgg(bucket, subAggName, subAggBuilder, tmpMap)) continue;
if (bucketPercentileRanksAgg(bucket, subAggName, subAggBuilder, tmpMap)) continue;
if (bucketPercentilesAgg(bucket, subAggName, subAggBuilder, tmpMap)) continue;
if (bucketExtendedStatsAgg(bucket, subAggName, subAggBuilder, tmpMap)) continue;
if (bucketStatsAgg(bucket, subAggName, subAggBuilder, tmpMap)) continue;
}
aggResponseMap.put(bucketKeyAsString, tmpMap);
}
return aggResponseMap;
}
這裡是解析結果的程式碼。tmpMap定義為全域性接收器。
a)通過遍歷subAggList儲存的所有子聚合函式,獲取所有的子聚合結果,並儲存成兩級TreeMap。
b)對每個迭代,呼叫所有的bucket...函式簇,這裡通過if判斷是否命中型別,如果命中了,就通過continue不再繼續檢查。
c) aggResponseMap使用treeMap是為了保持bucket的有序。
3、十種聚合函式
最後列出我們實現的十種聚合函式,你可以根據自己的需求繼續新增。
1)返回單個值:sum、avg、min、max、count、cardinality(有誤差)
2)percentiles:分位數查詢,傳入分位數,獲取分位數上的值;percentileRanks,分位數排名查詢,傳入值,返回對應的分位數;互為逆向操作。
3)stats和extendedStats,extended聚合更詳細的資訊max、min、avg、sum、平方和、標準差等。