ElasticSearch聚合分析
聚合用於分析查詢結果集的統計指標,我們以觀看日誌分析為例,介紹各種常用的ElasticSearch聚合操作。
目錄:
- 查詢用戶觀看視頻數和觀看時長
- 聚合分頁器
- 查詢視頻uv
- 單個視頻uv
- 批量查詢視頻uv
- Having查詢
- 根據 count 進行過濾
- 根據其它指標進行過濾
首先展示一下我們要分析的文檔結構:
{ "video_id": 1289643545120062253, // 視頻id "video_uid": 3931482202390368051, // 視頻發布者id "uid": 47381776787453866, // 觀看用戶id "time": 1533891263224, // 時間發生時間 "watch_duration": 30 // 觀看時長 }
每個文檔記錄了一個觀看事件,我們通過聚合分析用戶的觀看行為。
ElasticSearch引入了兩個相關概念:
- 桶(Buckets): 滿足特定條件的文檔的集合
- 指標(Metrics): 桶中文檔的統計值,如特定字段的平均值
查詢用戶觀看視頻數和觀看時長
首先用sql語句描述這個查詢:
SELECT uid, count(*) as view_count, avg(watch_duration) as avg_duration
FROM view_log
WHERE time >= #{since} AND time <= #{to}
GROUP BY uid;
GET /view_log/_search { "size" : 0, "query": { "range": { "time": { "gte": 0, // since "lte": 0 // to } } }, "aggs": { "agg": { // agg為聚合的名稱 "terms": { // 聚合的條件為 uid 相同 "field": "uid" }, "aggs": { // 添加統計指標(Metrics) "avg_duration": { "avg": { // 統計 watch_duration 的平均值 "field": "watch_duration" } } } } } }
response:
{ "took": 10, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "hits": { "total": 100000, "max_score": 0, "hits": [] }, "aggregations": { "agg": { "buckets": [ { "key": 21836334489858688, "doc_count": 4026, "avg_duration": { "value": 12778.882352941177 } }, { "key": 31489302390368051, "doc_count": 2717, "avg_duration": { "value": 2652.5714285714284 } } ] } }
result.aggregations.agg.buckets列表中包含了查詢的結果。
因為我們按照terms:uid進行聚合,每個bucket為uid相同的文檔集合,key字段即為uid。
doc_count 字段表明bucket中文檔的數目即sql語句中的count(*) as view_count
。
avg_duration.value 表示 watch_duration 的平均值即該用戶的平均觀看時長。
聚合分頁器
在實際應用中用戶的數量非常驚人, 不可能通過一次查詢得到全部結果因此我們需要分頁器分批取回:
GET /view_log/_search
{
"size" : 0,
"query": {
"range": {
"time": {
"gte": 0, // since
"lte": 0 // to
}
}
},
"aggs": {
"agg": {
"terms": {
"field": "uid",
"size": 10000, // bucket 的最大個數
"include": { // 將聚合結果分為10頁,序號為[0,9], 取第一頁
"partition": 0,
"num_partitions": 10
}
},
"aggs": {
"avg_duration": {
"avg": {
"field": "watch_duration"
}
}
}
}
}
}
上述查詢與上節的查詢幾乎完全相同,只是在aggs.agg.terms字段中添加了include字段進行分頁。
查詢視頻uv
單個視頻uv
uv是指觀看一個視頻的用戶數(user view),與此相對沒有按照用戶去重的觀看數稱為pv(page view)。
用SQL語句來描述:
SELECT video_id, count(*) as pv, count(distinct uid) as uv
FROM view_log
WHERE video_id = #{video_id};
ElasticSearch可以方便的進行count(distinct)查詢:
GET /view_log/_search
{
"aggs": {
"uv": {
"cardinality": {
"field": "uid"
}
}
}
}
response:
{
"took": 255,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 17579,
"max_score": 0,
"hits": []
},
"aggregations": {
"uv": {
"value": 11
}
}
}
批量查詢視頻uv
ElasticSearch也可以批量查詢count(distinct), 先用SQL進行描述:
SELECT video_id, count(*) as pv, count(distinct uid) as uv
FROM view_log
GROUP BY video_id;
查詢:
GET /view_log/_search
{
"size": 0,
"aggs": {
"video": {
"terms": {
"field": "video_id"
},
"aggs": {
"uv": {
"cardinality": {
"field": "uid"
}
}
}
}
}
}
response:
{
"took": 313,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 16940,
"max_score": 0,
"hits": []
},
"aggregations": {
"video": {
"buckets": [
{
"key": 25417499722062, // 視頻id
"doc_count": 427, // 視頻觀看次數 pv
"uv": {
"value": 124 // 觀看視頻的用戶數 uv
}
},
{
"key": 72446898144,
"doc_count": 744,
"uv": {
"value":233
}
}
]
}
}
}
Having查詢
SQL可以使用HAVING語句根據聚合結果進行過濾,ElasticSearch可以使用pipeline aggregations達到此效果不過語法較為繁瑣。
根據 count 進行過濾
使用SQL查詢觀看超過200次的視頻:
SELECT video_id, count(*) as view_count
FROM view_log
GROUP BY video_id
HAVING count(*) > 200;
GET /view_log/_search
{
"size": 0,
"aggs": {
"view_count": {
"terms": {
"field": "video_id"
},
"aggs": {
"having": {
"bucket_selector": {
"buckets_path": { // 選擇 view_count 聚合的 doc_count 進行過濾
"view_count": "_count"
},
"script": {
"source": "params.view_count > 200"
}
}
}
}
}
}
}
response:
{
"took": 83,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 775,
"max_score": 0,
"hits": []
},
"aggregations": {
"view_count": {
"buckets": [
{
"key": 35025417499764062,
"doc_count": 529
},
{
"key": 19913672446898144,
"doc_count": 759
}
]
}
}
}
ElasticSearch實現類似HAVING查詢的關鍵在於使用[bucket_selector]選擇聚合結果進行過濾。
根據其它指標進行過濾
接下來我們嘗試查詢平均觀看時長大於5分鐘的視頻, 用SQL描述該查詢:
SELECT video_id FROM view_log
GROUP BY video_id
HAVING avg(watch_duration) > 300;
GET /view_log/_search
{
"size": 0,
"aggs": {
"video": {
"terms": {
"field": "video_id"
},
"aggs": {
"avg_duration": {
"avg": {
"field": "watch_duration"
}
},
"avg_duration_filter": {
"bucket_selector": {
"buckets_path": {
"avg_duration": "avg_duration"
},
"script": {
"source": "params.avg_duration > 200"
}
}
}
}
}
}
}
response:
{
"took": 137,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 255,
"max_score": 0,
"hits": []
},
"aggregations": {
"video": {
"buckets": [
{
"key": 5417499764062,
"doc_count": 91576,
"avg_duration": {
"value": 103
}
},
{
"key": 19913672446898144,
"doc_count": 15771,
"avg_duration": {
"value": 197
}
}
]
}
}
}
ElasticSearch聚合分析