1. 程式人生 > 實用技巧 >Elasticsearch 搜尋數量不能超過10000的解決方案

Elasticsearch 搜尋數量不能超過10000的解決方案

目錄

一. 問題描述

開發環境:JDK1.8、Elasticsearch7.3.1、RestHighLevelClient

問題:最近在通過Java客戶端操作ES進行分頁查詢(from+size)時,需要返回滿足條件的資料總數。我發現滿足條件的資料總數一旦超過10000條,使用SearchResponse的getHits().getTotalHits().value返回的結果永遠是10000。為什麼會被限制只能搜尋10000條資料呢?如何查詢精確的資料總數呢?

Tips: 本文側重點在如何精確的獲取資料總數,如果想知道如何深度搜索,請參考我的另一篇部落格Elasticsearch from+size與scroll混合使用實現深度分頁搜尋

二. 問題分析

檢視官方文件:Elasticsearch 7.3

Elasicsearch通過index.max_result_window引數控制了能夠獲取的資料總數from+size的最大值,預設是10000條。但是,由於資料需要從其它節點分別上報到協調節點,因此搜尋請求的資料越多,會導致在協調節點佔用分配給Elasticsearch的堆記憶體和搜尋、排序時間越大。針對這種滿足條件數量較多的深度搜索,官方建議我們使用Scroll。

三. 解決方案

3.1 調大index.max_result_window(不推薦)

既然知道了是index.max_result_window引數限制了搜尋數量,我們可以通過適當調高index.max_result_window的值,以此來滿足需求。設定方法如下:

  • kibana上執行
新建索引: 
PUT your_index
{
  "settings": {
    "max_result_window": "100000"
  }
}

在原有索引的基礎上,調大index.max_result_window的預設值:
PUT your_index/_settings?preserve_existing=true
{
  "max_result_window": "100000"
}

  • 伺服器上執行
curl -H "Content-Type: application/json" -X PUT 'http://127.0.0.1:9200/your_index/_settings?preserve_existing=true' -d '{"max_result_window" : "100000"}'

這個方案我個人不太推薦,除非能預估出生產環境中索引內資料總量可能達到的上限,否則在未來實際資料量可能會超過設定的值,仍然會再次引發搜尋數量受限的問題。

3.2 cardinality(不推薦)

cardinality字面意思是基數,作為聚合函式,它的作用與Mysql中的distinct類似,用於統計給定欄位的不同值的數量。值得注意的是,cardinality獲取的僅僅是估計值。使用方式如下:

SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();

// 設定聚合函式
AggregationBuilder aggregationBuilder = AggregationBuilders.cardinality("distinct_id").field("_id");
sourceBuilder.aggregation(aggregationBuilder);

// 呼叫ES客戶端,發起請求,得到響應結果
response = search("INDEX_NAME索引名稱", sourceBuilder);

// 獲取總記錄數
total = ((ParsedCardinality)response.getAggregations().getAsMap().get("distinct_id")).getValue();

其中,“distinct_id"是我為聚合函式隨便起的名稱,可以任意指定,”_id"是希望進行分組統計的欄位名稱。上方這一段程式碼實際上可以翻譯成以下執行語句:

GET index_name/_search
{
  "aggs": {
    "distinct_id": {
      "cardinality": {
        "field": "_id"
      }
    }
  }
}

3.3 track_total_hits(推薦)

文件:track_total_hits
使用方式:

SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.trackTotalHits(true);
// 省略查詢方法...
SearchResponse sumResponse = search(sourceBuilder);
if(sumResponse != null) {
    // 滿足條件的總記錄數
    long total = sumResponse.getHits().getTotalHits().value;
}



Elasticsearch from+size與scroll混合使用實現深度分頁搜尋

目錄

一. 需求

環境準備: JDK1.8 Elasticsearch7.3.1 RestHighLevelClient客戶端
對Elasticsearch做深度分頁,比如第1500頁,每頁20條記錄,且需要支援前後翻頁。

二. 思考

由於index.max_result_window的限制,直接使用from+size無法搜尋滿足條件10000條以上的記錄。如果貿然增大index.max_result_window值,那麼你怎麼知道系統未來會在索引記憶體多少條資料?

就算這一次設定值暫時解決了問題,那麼未來又陷入瓶頸了怎麼辦?重新設值嗎?調大後會增大記憶體壓力的問題難道就不需要考慮嗎?

這時就需要使用scroll了,但scroll不能盲目的使用,它雖然支援深度分頁,純粹的使用scroll只能不斷地向後翻頁,我們還需要考慮如何向前翻頁。

三. 實現方案

不改變index.max_result_window的預設值,但搜尋手段根據搜尋數量劃分為以下兩種:

  1. 搜尋數量<=10000
    使用from+size的方式分頁和搜尋資料。
  2. 搜尋數量>10000
    使用scroll的方式搜尋資料。針對對每次分頁查詢請求,我都會建立遊標,接著手動滾動到包含請求資料的那一屏,最後取出請求頁面中的目標資料。

比如現在準備查詢第1413頁,頁面容量為10條資料,遊標每次移動1000條記錄,總記錄數為1000000(這個值不重要了)。如果以1作為第一條資料的下標,則有以下規律:

滾屏次數資料的下標範圍
1 1~1000
2 1001~2000
15 14001 ~ 15000
n (n-1) * 1000 + 1 ~ n*1000

第1413頁的第一條資料的下標=(1413-1)*10+1=14121
第1413頁的最後一條資料的下標=14121+10-1=14130
只需要移動15次遊標,則在第15次遊標查詢返回的1000條資料中,一定包含了第1413頁的所有資料。

但我們還需要考慮另一種情況,比如現在準備查詢第934頁,頁面容量為15條資料,遊標仍然保持每次移動1000條記錄。
第934頁的第一條資料的下標=(934-1)*15+1=13996
第934頁的最後一條資料的下標=13996+15-1=14010
注意,我們的遊標只能獲取13001~14000和14001~15000範圍內的資料,第934頁會橫跨兩次遊標執行結果,針對這種情況,我在程式碼中做了特殊處理。

接下來是程式碼:

  • 定義搜尋條件
// 自定義搜尋條件
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must(QueryBuilders.matchQuery("name", "麥當勞"));

SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(boolQueryBuilder);
// 設定請求超時時間
sourceBuilder.timeout(new TimeValue(20, TimeUnit.SECONDS));
// 排序
sourceBuilder.sort("salary", SortOrder.ASC);
  • 與ES客戶端互動的底層邏輯
    esClient就是RestHighLevelClient的物件
protected SearchResponse search(String requestIndexName, SearchSourceBuilder sourceBuilder) throws Exception {
    SearchRequest searchRequest = new SearchRequest(requestIndexName);
    searchRequest.source(sourceBuilder);
    return esClient.search(searchRequest, RequestOptions.DEFAULT);
}

protected SearchResponse search(String requestIndexName,SearchSourceBuilder searchSourceBuilder,
                                        TimeValue timeValue) throws IOException {
    SearchRequest searchRequest = new SearchRequest(requestIndexName);
    searchSourceBuilder.size(ElasticsearchConstant.MAX_SCROLL_NUM);
    searchRequest.source(searchSourceBuilder);
    searchRequest.scroll(timeValue);
    return esClient.search(searchRequest, RequestOptions.DEFAULT);
}

protected SearchResponse searchScroll(String scrollId, TimeValue timeValue) throws IOException {
    SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId);
    searchScrollRequest.scroll(timeValue);
    return esClient.scroll(searchScrollRequest, RequestOptions.DEFAULT);
}
  • 搜尋邏輯(核心程式碼)
// 本次搜尋滿足條件的資料總數
long total = 0;
// 精度
int accuracy = 1;
// 希望被忽略的記錄條數
int ignoreLogNum = (pageNum - 1) * pageSize;
// 待查詢頁面內第一條記錄的下標
int firstSelectLogNum = 1;
// 待查詢頁面內最後一條記錄的下標
int lastSelectLogNum = -1;
// 當前遊標查詢返回結果中最後一條記錄的下標
int lastAllowLogNum = -1;
// 遊標Id
String scrollId = null;
// Elasticsearch 搜尋返回結果物件
SearchResponse response = null;

try {
    firstSelectLogNum = ignoreLogNum + 1;
    lastSelectLogNum = firstSelectLogNum + pageSize - 1;
    String indexName = ElasticsearchConstant.SUB_INDEX_NAME_PREFIX + bizSubLogQuery.getProductNum().toLowerCase();
    if(firstSelectLogNum > ElasticsearchConstant.MAX_RESULT_WINDOW) {
        // 構建遊標查詢 此時遊標已經移動了1次
        response = search(indexName, sourceBuilder, TimeValue.timeValueMinutes(1));
        if(response != null && response.getHits().getHits().length > 0) {
            // 遊標總共需要移動的次數
            int scrollNum = firstSelectLogNum / ElasticsearchConstant.MAX_SCROLL_NUM + 1;
            lastAllowLogNum = scrollNum * ElasticsearchConstant.MAX_SCROLL_NUM;
            accuracy = firstSelectLogNum - (firstSelectLogNum / ElasticsearchConstant.MAX_SCROLL_NUM) * ElasticsearchConstant.MAX_SCROLL_NUM;
            // 遊標Id
            scrollId = response.getScrollId();
            // 遊標還需移動scrollNum-1次
            while(--scrollNum > 0 && scrollId != null) {
                response = searchScroll(scrollId, TimeValue.timeValueMinutes(1));
                scrollId = response.getScrollId();
            }
        }
    } else {
        // 分頁引數
        sourceBuilder.from((pageNum - 1) * pageSize);
        sourceBuilder.size(pageSize);

        // 獲取滿足記錄的總條數
        response = search(indexName, sourceBuilder);
    }

    // 查詢總數
    sourceBuilder.size(0);
    sourceBuilder.trackTotalHits(true);
    SearchResponse sumResponse = search(indexName, sourceBuilder);
    if(sumResponse != null) {
        total = sumResponse.getHits().getTotalHits().value;
    }
} catch (ElasticsearchStatusException ese) {
    if (RestStatus.NOT_FOUND == ese.status()) {
        log.error("待搜尋的產品不存在");
    } else {
        log.error(ese.getMessage());
    }
} catch (IOException ioe) {
    log.error("搜尋失敗,網路連接出現異常", ioe);
} catch (Exception e) {
    log.error("搜尋失敗,未知異常", e);
}

if (response == null) {
    return new PageInfo<>();
}

// 搜尋結果,使用集合來存放
List<Map<String, String>> list = new ArrayList<>();

// 遊標一次性最高可能返回1000條資料,需要通過頁面容量來約束
int maxPageSize = pageSize;

for (int i = 0; i < response.getHits().getHits().length; i++) {
    if(i+1 >= accuracy) {
        SearchHit hit = response.getHits().getAt(i);
        if(--maxPageSize < 0) {
            break;
        }
        try {
            list.add(JacksonUtils.jsonStrToMap(hit.getSourceAsString(), String.class, String.class));
        } catch (JsonProcessingException e) {
            log.error("jackson轉換異常", e);
        }
    }
}

if(scrollId != null && maxPageSize>0 && lastAllowLogNum!=-1 && lastSelectLogNum>lastAllowLogNum) {
    // 存在目標資料不在本次遊標查詢的結果範圍內
    // 需要再次移動遊標 (務必保證遊標移動的步長大於頁面容量)
    try {
        response = searchScroll(scrollId, TimeValue.timeValueMinutes(1));
        for(int i = 0; i < maxPageSize && i < response.getHits().getHits().length; i++) {
            SearchHit hit = response.getHits().getAt(i);
            try {
                list.add(JacksonUtils.jsonStrToMap(hit.getSourceAsString(), String.class, String.class));
            } catch (JsonProcessingException e) {
                log.error("jackson轉換異常", e);
            }
        }
    } catch (IOException ioe) {
        log.error("搜尋失敗,網路連接出現異常", ioe);
    }
}