javaAPI實現elasticsearch5.5.2的聚合分析
前言:
在前面的幾篇文章講到了elasticsearch的搜尋,但是elasticsearch還有強大的聚合分析功能,通過聚合,我們會得到一個數據的概覽,這樣對大資料提取統計指標時就變得遊刃有餘。聚合允許我們向資料提出一些複雜的問題。雖然功能完全不同於搜尋,但它們使用相同的資料結構,執行速度很快並且就像搜尋一樣幾乎是實時的。
一、聚合(Aggregations)的介紹
elasticsearch的聚合(Aggregations)類似於資料庫sql中的分組 group by,count、sum等函式,除此之外他還有更多強大的資料統計分析介面。
聚合有兩個核心概念:
1.桶(bucket):對資料進行分組。比如一個物件User他有一個屬性是city,有如下資料:1.張三 上海 ;2.李四 北京; 3.王五 北京,我們就可以基於city劃分buckets,一個是北京bucket,一個是上海bucket,按照某個欄位進行bucket劃分,那個欄位的值相同的那些資料,就會被劃分到一個bucket中,相當於sql中的group by分組。
2.指標(metric):對一個數據分組執行的統計。當我們有了一堆bucket之後,就可以對每個bucket中的資料進行聚合分詞了,比如說計算一個bucket內所有資料的數量、平均值、最大值等的,metric就是對一個bucket執行的某種聚合分析的操作,相當於資料庫中的avg,sum函式
二、Bucket aggregations 聚合分組的使用
官方參考資料:
我會在第一個例子中將完整的程式碼寫出來,其他的例子仿照第一個進行
1. Terms Aggregation
terms按照某個欄位進行分組,下面的程式碼中也涉及了排序問題,詳細程式碼看註釋
/**TermAggs 包含了Order * termAggs統計每個顏色的個數 * @Title: countByColor * @Description: TODO(統計每個顏色的銷量) * @param @throws UnknownHostException * @return void * @autor lpl * @date 2017年11月6日 * @throws */ @Test public void countByColor() throws UnknownHostException{ //進行聚合查詢,terms按照某個欄位中進行分組 TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders .terms("popular_colors") //aggs的名稱,自定義,取資料的時候與其一致即可 .field("color") //需要分組的欄位 //.order(Terms.Order.term(true)) //true表示按照term分組的asc排序 .order(Terms.Order.count(true)); //true表asc按照doc_count排序,false表示desc排序 //查詢資料進行聚合分析 SearchResponse response = EsClient.client().prepareSearch("tvs").setTypes("sales") .addAggregation(termsAggregationBuilder) .setSize(0) .execute() .actionGet(); //獲取聚合 Terms terms = response.getAggregations().get("popular_colors"); //遍歷處理資料 for (Terms.Bucket entry : terms.getBuckets()) { Map<String,Object> map = new HashMap<String,Object>(); String key = (String) entry.getKey(); // Term long docCount = entry.getDocCount(); // Doc count //將結果放到Map中 map.put("color",key); map.put("docCount",docCount); System.out.println(map); } }
在Terms Aggregation涉及到一個Order分組問題,在上面的程式碼上我已經做了簡單的註釋,但是還存在一種排序方式,就是我們按照某一個統計出來欄位進行排序,比如:按照height欄位的平均值進行排序,fasle代表了desc排序
AggregationBuilders .terms("genders") .field("gender") .order(Terms.Order.aggregation("avg_height", false)) .subAggregation( AggregationBuilders.avg("avg_height").field("height") )
2. Histogram Aggregation 按照區間進行分組
按照區間分組有Histogram Aggregation 和 Date Histogram Aggregation,我們以兩個簡單的需求進行編寫具體的程式碼
2.1Histogram Aggregation:以2000位區間單位統計電視的銷量
/**
* 以2000位區間單位統計電視的銷量
* @Title: HistogramTest
* @param @throws UnknownHostException
* @return void
* @autor lpl
* @date 2017年11月6日
* @throws
*/
@Test
public void HistogramTest() throws UnknownHostException{
//histogram:類似於terms,也是進行bucket分組操作,interval:2000,
//劃分範圍,0~2000,2000~4000,4000~6000,6000~8000,8000~10000
HistogramAggregationBuilder histogramAggregationBuilder = AggregationBuilders
.histogram("his_price")
.field("price")
.interval(2000);
//查詢資料進行聚合分析
SearchResponse response = EsClient.client()
.prepareSearch("tvs").setTypes("sales")
.addAggregation(histogramAggregationBuilder)
.setSize(0)
.execute()
.actionGet();
Histogram agg = response.getAggregations().get("his_price");
// For each entry
for (Histogram.Bucket entry : agg.getBuckets()) {
Map<String,Object> map = new HashMap<String,Object>();
Number key = (Number) entry.getKey(); // 區間key
long docCount = entry.getDocCount(); // Doc count
//資料封裝
map.put("key",key);
map.put("doc_count",docCount);
System.out.println(map);
}
}
2.2Date Histogram Aggregation:統計每個月份的電視銷量,這個需求中還涉及了一些其他知識點,在程式碼註釋中有體現/**
* 統計每個月份的電視銷量
* @Title: dateHistogramTest
* @Description: TODO(這裡用一句話描述這個方法的作用)
* @param @throws UnknownHostException
* @return void
* @autor lpl
* @date 2017年11月6日
* @throws
*/
@Test
public void dateHistogramTest() throws UnknownHostException{
//Date histogram的用法與histogram差不多,只不過區間上支援了日期的表示式。
DateHistogramAggregationBuilder dateHistogramInterval = AggregationBuilders
.dateHistogram("date_hist") //聚合的名稱,可以隨意去取,下邊取資料一致即可
.field("sold_date") //需要進行分組的欄位
.dateHistogramInterval(DateHistogramInterval.MONTH) //設定時間區間
.minDocCount(0) //設定最小的 doc_count,
.format("yyyy-MM-dd") //將返回的時間結果進行格式化
.missing("2016-05-01"); //預設欄位:當遇到沒有值的欄位,就會按照該欄位的值來計算
//查詢資料進行聚合分析
SearchResponse response = EsClient.client()
.prepareSearch("tvs").setTypes("sales")
.addAggregation(dateHistogramInterval)
.setSize(0)
.execute()
.actionGet();
//獲得我們設定的聚合
Histogram agg = response.getAggregations().get("date_hist");
//迴圈資料輸出
for (Histogram.Bucket entry : agg.getBuckets()) {
Map<String,Object> map = new HashMap<String,Object>();
//處理
DateTime key = (DateTime) entry.getKey(); // Key
String keyAsString = entry.getKeyAsString(); // Key as String
long docCount = entry.getDocCount(); // Doc count
//封裝
map.put("date",keyAsString);
map.put("doc_count",docCount);
System.out.println(map);
}
}
3.Range Aggregation按照範圍進行分組
這個功能介紹中,我會減少程式碼量,只把關鍵程式碼貼出來,程式碼中會有註釋
3.1.數字範圍:
AggregationBuilder rangAgg = AggregationBuilders
.range("rang_price")
.field("price")
.addUnboundedTo(1000) //從負無窮到1000
.addRange(1001, 1500) //從1001-1500
.addRange(1501, 2000) //從1501-2000
.addUnboundedFrom(2001);//2000到正無窮
// 獲得我們設定的聚合
Range agg = response.getAggregations().get("rang_price");
// 迴圈資料輸出
for (Range.Bucket entry : agg.getBuckets()) {
Map<String, Object> map = new HashMap<String, Object>();
// 處理
String key = entry.getKeyAsString(); // Range as key
Number from = (Number) entry.getFrom(); // Bucket from
Number to = (Number) entry.getTo(); // Bucket to
long docCount = entry.getDocCount(); // Doc count
// 封裝
map.put("key", key);
map.put("doc_count", docCount);
System.out.println(map);
}
3.2.Date Range Aggregation日期範圍AggregationBuilder aggregation =
AggregationBuilders
.dateRange("agg")
.field("dateOfBirth") //field名稱
.format("yyyy") //日期格式
.addUnboundedTo("1950") // from -infinity to 1950 (excluded)
.addRange("1950", "1960") // from 1950 to 1960 (excluded)
.addUnboundedFrom("1960"); // from 1960 to +infinity
for (Range.Bucket entry : agg.getBuckets()) {
String key = entry.getKeyAsString(); // Date range as key
DateTime fromAsDate = (DateTime) entry.getFrom(); // Date bucket from as a Date
DateTime toAsDate = (DateTime) entry.getTo(); // Date bucket to as a Date
long docCount = entry.getDocCount(); // Doc count
}
3.3 IP Range Aggregation ip範圍分組
IpRangeAggregationBuilder aggregation1 =
AggregationBuilders
.ipRange("agg")
.field("ip")
.addUnboundedTo("192.168.1.0") // 從負無窮到192.168.1.0
.addRange("192.168.1.0", "192.168.2.0") // 從 192.168.1.0 到 192.168.2.0 (excluded)
.addUnboundedFrom("192.168.2.0"); // 從 192.168.2.0 到 +infinity
for (Range.Bucket entry : aggregation1.getBuckets()) {
String key = entry.getKeyAsString(); // Ip 範圍 as key
String fromAsString = entry.getFromAsString(); // Ip bucket from as a String
String toAsString = entry.getToAsString(); // Ip bucket to as a String
long docCount = entry.getDocCount(); // Doc count
}
4. Filters Aggregation 過濾分組:根據過濾條件進行分組AggregationBuilder aggregation =
AggregationBuilders
.filters("agg",
new FiltersAggregator.KeyedFilter("men", QueryBuilders.termQuery("gender", "male")),
new FiltersAggregator.KeyedFilter("women", QueryBuilders.termQuery("gender", "female")));
// For each entry
for (Filters.Bucket entry : agg.getBuckets()) {
String key = entry.getKeyAsString(); // bucket key
long docCount = entry.getDocCount(); // Doc count
logger.info("key [{}], doc_count [{}]", key, docCount);
}
key [men], doc_count [4982]
key [women], doc_count [5018]
三、Metrics aggregations 聚合分析的使用
官方參考文件:
3.1常用的:Avg,Max,Sum等等
我通過一個需求將metric的javaAPI進行說明,需求是:查詢電視銷售每個顏色的平均價格、最高價格、總價格並按照平均價格降序排序,對應的sql語句為:
select avg(price) avg_price,max(price) max_price,sum(price) sum_price from tvs.sales group by color order by avg_price
/**
* 查詢電視銷售每個顏色的平均價格、最高價格、總價格並按照平均價格降序排序
* @Title: metrics
* @Description: TODO(這裡用一句話描述這個方法的作用)
* @param @throws UnknownHostException
* @return void
* @autor lpl
* @date 2017年11月6日
* @throws
*/
@Test
public void metrics() throws UnknownHostException{
// 建立查詢索引和type
SearchRequestBuilder srBuilder =EsClient.client()
.prepareSearch("tvs")
.setTypes("sales");
// colorAggs
// 如果需要進行排序的話,可以使用order(Order.aggregation("avg_price",true))
// "avg_price"對應下班的字聚合的名稱,true表示升序排序。false表示倒敘排序
TermsAggregationBuilder colorsAgg = AggregationBuilders
.terms("colors")
.field("color")
.order(Order.aggregation("avg_price", true));
// 平均值
AvgAggregationBuilder avgAgg = AggregationBuilders.avg("avg_price").field("price");
// 最大值a
MaxAggregationBuilder maxAgg = AggregationBuilders.max("max_price").field("price");
// 總和
SumAggregationBuilder sumAgg = AggregationBuilders.sum("sum_price").field("price");
SearchResponse response = srBuilder.setSize(0)
.addAggregation(colorsAgg
.subAggregation(avgAgg)
.subAggregation(maxAgg)
.subAggregation(sumAgg))
.execute()
.actionGet();
Terms terms = response.getAggregations().get("colors");
//處理結果
for (Terms.Bucket entry : terms.getBuckets()) {
Map<String, Object> map = new HashMap<String, Object>();
// 獲得按照顏色進行分組的桶
String keyAsString = entry.getKeyAsString();
// 獲得每個分組的數量
long docCount = entry.getDocCount();
// 獲得平均價格
Avg avg = entry.getAggregations().get("avg_price");
// 獲得最大價格
Max max = entry.getAggregations().get("max_price");
// 獲得價格總和
Sum sum = entry.getAggregations().get("sum_price");
map.put("key", keyAsString);
map.put("docCount", docCount);
map.put("avg", avg.getValue());
map.put("max", max.getValue());
map.put("sum", sum.getValue());
System.out.println(map);
}
}
3.2 Percentile Aggregation百分比聚合分析PercentilesAggregationBuilder aggregation =
AggregationBuilders
.percentiles("agg")
.field("height")
.percentiles(1.0, 5.0, 10.0, 20.0, 30.0, 75.0, 95.0, 99.0);
// sr is here your SearchResponse object
Percentiles agg = response.getAggregations().get("agg");
// For each entry
for (Percentile entry : agg) {
double percent = entry.getPercent(); // Percent
double value = entry.getValue(); // Value
}
percent [1.0], value [0.814338896154595]
percent [5.0], value [0.8761912455821302]
percent [25.0], value [1.173346540141847]
percent [50.0], value [1.5432023318692198]
percent [75.0], value [1.923915462033674]
percent [95.0], value [2.2273644908535335]
percent [99.0], value [2.284989339108279]
上邊就是我對elasticsearch的聚合分析的一些javaAPI使用的總結,當然我只寫出了一部分,強大的elasticsearch還有很多的介面,大家可以通過上邊我提供的連線進入elasticsearch的官網進行查詢。在前面的文章中我也對elasticsearch的常用搜索、curd、高亮顯示、搜尋建議進行了記錄,如果大家有興趣可以去翻閱。文章中有哪些地方有誤請大家及時指出,感激不盡,相互交流,共同進步!