1. 程式人生 > >在es中用scroll查詢與completableFuture

在es中用scroll查詢與completableFuture

一般而言,es返回資料的上限是10000條,如果超過這個數量,就必須使用scroll查詢。

所謂scroll查詢就類似DBMS中的遊標,或者快照吧,利用查詢條件,在第一次查詢時,在所有的結果上形成了一個快照,然後再分批分次的讀取出來。

要完成一個scroll查詢分兩個階段:

階段一:帶查詢引數

1 POST /twitter/_search?scroll=1m
2 {
3     "size": 100,
4     "query": {
5         "match" : {
6             "title" : "elasticsearch"
7         }
8     }
9 }

這個查詢條件比較簡單,只是示意。

關鍵是有兩點:1.post路徑中的scroll關鍵字,指明是一個scroll查詢;2,scroll=1m意味著查詢結果資料在es的伺服器有效期是一分鐘。

在查詢結果的返回值中會帶有一個scroll id的引數,這個引數在第二次查詢的時候需要。

階段二:不帶引數查詢

POST  /_search/scroll 
{
    "scroll" : "1m", 
    "scroll_id" : "DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAAAD4WYm9laVYtZndUQlNsdDcwakFMNjU1QQ==" 
}

這個查詢請求中,會帶上第一次請求得到的scroll_id這個欄位。

然後迴圈往復,第三次查詢需要帶上第二次查詢返回的scroll_id,以此類推,就這個例子而言,當判斷返回的資料條數小於100條的時候,就可以結束請求了。

使用scroll查詢的兩個優勢是:

1.無論查詢的資料量是多大,都能夠查詢成功。

2.準確反映了第一次查詢當時的查詢結果,第一次查詢之後的查詢請求不會包含新的資料。

但也有一個缺點:

1.因為查詢的遞進的,第二次依賴於第一次,第三次依賴於第二次,所以如果資料量很多,查詢的耗時就比較長。

如何解決耗時長這個問題了?就不能使用scroll來查詢了,使用常規的查詢,但是啟用多執行緒去查。

GET /_search
{
    "query
" : { "term" : { "user" : "kimchy" } } }

加入在常規的查詢中有timestamp這樣的自動,我們可以預先對timestamp進行劃分,比如分出10份,當然前提是我們假設資料在時間上是均勻的,然後每個時間切分啟用一個執行緒去查詢。在java中有completableFuture能夠比較好的支援這種查詢場景。

CompletableFuture<JSONArray>[] futures = (CompletableFuture<JSONArray>[]) new CompletableFuture[count];
                for (int i = 0; i < count; i++) {
                    CompletableFuture<JSONArray> future = CompletableFuture.
                            supplyAsync(new JSONAarrySupplier(this.queryString,timestamp[i])
                            .exceptionally(ex -> {
                                logger.error(ex.getMessage());
                                return null;});
                    futures[i] = future;
                }
                CompletableFuture<List<JSONArray>> allFuture = myAllOf(futures);
                result = allFuture.get();

如上述,在一開始建立了一個future陣列,然後根據時間切片,構建查詢請求,並放入completableFuture中。

在最後呼叫get方法,拿到所有執行緒執行完的結果。

這裡有一個點要注意,就是completableFuture.allOf方法本身返回的是void,如果我們的future是有返回值的話,就不能直接呼叫java自身提供的,需要改下一下,如上其實呼叫了下面的方法:

 public static CompletableFuture<List<JSONArray>> myAllOf(CompletableFuture<?>... futures) {
        return CompletableFuture.allOf(futures)
                .thenApply(x -> Arrays.stream(futures)
                        .map(f -> (JSONArray) f.join())
                        .collect(toList())
                ).exceptionally(ex -> {
                    logger.error(ex.getMessage());
                    return null;});
    }

這個方法中實現了返回值的轉換。

這種多執行緒的查詢,相對於scroll去查詢,在網路不是瓶頸的前提下,效能還是有很大提升。

 

綜上所述:

1.如果對時間不敏感,還是推薦使用scroll查詢,畢竟反映了查詢時間點的實際情況。

2.如果對時間敏感,則需要合理挑選查詢分片條件,形成合理的多執行緒查詢。

參考https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html