1. 程式人生 > >Elasticsearch原始碼分析十四--搜尋型別

Elasticsearch原始碼分析十四--搜尋型別

  • 簡介
  • query_then_fetch
  • query_and_fetch
  • dfs_query_and_fetch
  • dfs_query_then_fetch
  • count
  • scan

簡介

Elasticsearch允許通過指定搜尋型別來選擇查詢在內部如何處理。不同的搜尋型別適合不同的情況;
可以只在乎效能,但有時查詢的關聯性可能是最重要的因素。使用search_type請求引數指定搜尋型別,
其各種取值介紹參考下文,其中size引數指定一次查詢中返回的最大文件數。預設值為0。在Elasticsearch程式碼中,會將上述幾種搜尋型別分隔成幾個階段phrases順序執行,
前提是可以分成兩個階段。比如query_and_fetch
不能分解成兩個階段,只需一步就能完成查詢。
而dfs_query_then_fetch需要三個階段。這幾種搜尋型別最終會呼叫具體的搜尋類SearchService,
該類的介紹參考另一篇文章。

query_then_fetch

此搜尋型別分兩步,第一步:執行查詢得到對文件進行排序和分級所需資訊。這一步在所有的分片上執行。然後,只在相關分片上
查詢文件的實際內容。不同於query_and_fetch,此查詢型別返回結果的最大數量等於size引數的值。預設使用這個型別。
public class TransportSearchQueryThenFetchAction extends TransportSearchTypeAction
{
'''執行請求''' @Override protected void doExecute(SearchRequest searchRequest, ActionListener<SearchResponse> listener) { new AsyncAction(searchRequest, listener).start(); } private class AsyncAction extends BaseAsyncAction<QuerySearchResult> { ''
'第一階段是queryphrase''' @Override protected String firstPhaseName() { return "query"; } @Override protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest request, SearchServiceListener<QuerySearchResult> listener) { '''呼叫SearchServiceTransportAction傳送query請求''' searchService.sendExecuteQuery(node, request, listener); } '''在其父類TransportSearchTypeAction的onFirstPhaseResult函式中呼叫 在收到第一階段的處理結果後轉移到第二階段''' @Override protected void moveToSecondPhase() { ... } } }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

query_and_fetch

這通常是最快也最簡單的搜尋型別實現。查詢在所有分片上並行執行(當然,任意一個主分片,只查詢一個副本),所有分片
返回等於size值的結果數。返回文件的最大數量等於size的值乘以分片的數量。
public class TransportSearchQueryAndFetchAction extends TransportSearchTypeAction {

        '''執行請求'''
        @Override
    protected void doExecute(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
        new AsyncAction(searchRequest, listener).start();
    }

    private class AsyncAction extends BaseAsyncAction<QuerySearchResult> {

                '''只有query_fetch'''
        @Override
        protected String firstPhaseName() {
            return "query_fetch";
        }

        @Override
        protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest request, SearchServiceListener<QueryFetchSearchResult> listener) {
            '''呼叫SearchServiceTransportAction傳送fetch請求'''
            searchService.sendExecuteFetch(node, request, listener);
        }

                '''在其父類TransportSearchTypeAction的onFirstPhaseResult函式中呼叫
                     在收到第一階段的處理結果後轉移到第二階段,對於該搜尋型別,
                     第二階段無須再查詢'''
        @Override
        protected void moveToSecondPhase() throws Exception {
            try {
                innerFinishHim();
            } catch (Throwable e) {
                ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", e, buildShardFailures());
                if (logger.isDebugEnabled()) {
                    logger.debug("failed to reduce search", failure);
                }
                listener.onFailure(failure);
            }
        }

                '''在其父類TransportSearchTypeAction的onFirstPhaseResult函式中呼叫
                     在收到第一階段的處理結果後轉移到第二階段'''
        @Override
        protected void moveToSecondPhase() 
        { ... }
    }
}  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

dfs_query_and_fetch

跟query_and_fetch類似,但相比query_and_fetch,它包含一個額外階段,在初始查詢中執行分散式詞頻的計算,
以得到返回檔案的更精確的得分,從而讓查詢結果更相關。
public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAction {

        '''執行請求'''
        @Override
    protected void doExecute(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
        new AsyncAction(searchRequest, listener).start();
    }

    private class AsyncAction extends BaseAsyncAction<QuerySearchResult> {

                '''第一階段是dfs'''
        @Override
        protected String firstPhaseName() {
            return "dfs";
        }

        @Override
        protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest request, SearchServiceListener<DfsSearchResult> listener) {
            '''呼叫SearchServiceTransportAction傳送dfs請求'''
            searchService.sendExecuteDfs(node, request, listener);
        }

                '''在其父類TransportSearchTypeAction的onFirstPhaseResult函式中呼叫
                     在收到第一階段的處理結果後轉移到第二階段'''
        protected void moveToSecondPhase() {

            for (final AtomicArray.Entry<DfsSearchResult> entry : firstResults.asList()) {
                DfsSearchResult dfsResult = entry.value;
                DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId());
                if (node.id().equals(nodes.localNodeId())) {
                    localOperations++;
                } else {
                    QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs);
                    '''執行第二階段fetch'''
                    executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest);
                }
            }
            ...
        }
    }
}  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

dfs_query_then_fetch

與前一個dfs_query_and_ fetch一樣,dfs_query_then_fetch類似於相應的query_then_fetch,
但比query_ then_fetch多了一個額外的階段,就像dfs_query_and_fetch一樣。
public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeAction {

        '''執行請求'''
        @Override
    protected void doExecute(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
        new AsyncAction(searchRequest, listener).start();
    }

    private class AsyncAction extends BaseAsyncAction<QuerySearchResult> {

                '''第一階段是dfs'''
        @Override
        protected String firstPhaseName() {
            return "dfs";
        }

        @Override
        protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest request, SearchServiceListener<DfsSearchResult> listener) {
            '''呼叫SearchServiceTransportAction傳送dfs請求'''
            searchService.sendExecuteDfs(node, request, listener);
        }

                '''在其父類TransportSearchTypeAction的onFirstPhaseResult函式中呼叫
                     在收到第一階段的處理結果後轉移到第二階段'''
                protected void moveToSecondPhase() {
            final AggregatedDfs dfs = searchPhaseController.aggregateDfs(firstResults);
            final AtomicInteger counter = new AtomicInteger(firstResults.asList().size());

            int localOperations = 0;
            for (final AtomicArray.Entry<DfsSearchResult> entry : firstResults.asList()) {
                DfsSearchResult dfsResult = entry.value;
                DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId());
                if (node.id().equals(nodes.localNodeId())) {
                    localOperations++;
                } else {
                    QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs);
                    '''執行第二階段query'''
                    executeQuery(entry.index, dfsResult, counter, querySearchRequest, node);
                }
            }
           ...
        }
        void executeQuery(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter, final QuerySearchRequest querySearchRequest, DiscoveryNode node) {
            searchService.sendExecuteQuery(node, querySearchRequest, new SearchServiceListener<QuerySearchResult>() {
                @Override
                public void onResult(QuerySearchResult result) {
                    result.shardTarget(dfsResult.shardTarget());
                    queryResults.set(shardIndex, result);
                    if (counter.decrementAndGet() == 0) {
                            '''執行第三個階段fetch'''
                        executeFetchPhase();
                    }
                }         
        }      
    }
}  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56

count

一個特殊的搜尋,只返回匹配查詢的文件數。如果你只需要結果數量,而不關心文件,
應該使用這個搜尋型別。
public class TransportSearchCountAction extends TransportSearchTypeAction {

        '''執行請求'''
        @Override
    protected void doExecute(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
        new AsyncAction(searchRequest, listener).start();
    }

    private class AsyncAction extends BaseAsyncAction<QuerySearchResult> {

                '''第一階段是query'''
        @Override
        protected String firstPhaseName() {
            return "query";
        }

        @Override
        protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest request, SearchServiceListener<QuerySearchResult> listener) {
            '''呼叫SearchServiceTransportAction傳送query請求'''
            searchService.sendExecuteQuery(node, request, listener);
        }

                '''在其父類TransportSearchTypeAction的onFirstPhaseResult函式中呼叫
                     在收到第一階段的處理結果後轉移到第二階段,此搜尋型別第二個階段無需查詢'''
        protected void moveToSecondPhase() {            
            protected void moveToSecondPhase() throws Exception {
            // no need to sort, since we know we have no hits back
            final InternalSearchResponse internalResponse = searchPhaseController.merge(SearchPhaseController.EMPTY_DOCS, firstResults, (AtomicArray<? extends FetchSearchResultProvider>) AtomicArray.empty());
            String scrollId = null;
            if (request.scroll() != null) {
                scrollId = buildScrollId(request.searchType(), firstResults, null);
            }
            listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures()));
        }
    }
}  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

scan

另一個特殊的搜尋型別,只有在要讓查詢返回大量結果時才用。它跟一般的查詢有點不同,因為在傳送第一個請
求之後, Elasticsearch響應一個滾動識別符號,類似於關係型資料庫中的遊標。
所有查詢需要在_search/scroll REST端點執行,並需要在請求主體中傳送返回的滾動識別符號。
public class TransportSearchScanAction extends TransportSearchTypeAction {

        '''執行請求'''
        @Override
    protected void doExecute(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
        new AsyncAction(searchRequest, listener).start();
    }

    private class AsyncAction extends BaseAsyncAction<QuerySearchResult> {

                '''第一階段是init_scan'''
        @Override
        protected String firstPhaseName() {
            return "init_scan";
        }

        @Override
        protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest request, SearchServiceListener<QuerySearchResult> listener) {
            '''呼叫SearchServiceTransportAction傳送scan請求'''
            searchService.sendExecuteScan(node, request, listener);
        }

                '''在其父類TransportSearchTypeAction的onFirstPhaseResult函式中呼叫
                     在收到第一階段的處理結果後轉移到第二階段,此搜尋型別第二個階段無需查詢'''
        @Override
        protected void moveToSecondPhase() throws Exception {
            final InternalSearchResponse internalResponse = searchPhaseController.merge(SearchPhaseController.EMPTY_DOCS, firstResults, (AtomicArray<? extends FetchSearchResultProvider>) AtomicArray.empty());
            String scrollId = null;
            if (request.scroll() != null) {
                scrollId = buildScrollId(request.searchType(), firstResults, ImmutableMap.of("total_hits", Long.toString(internalResponse.hits().totalHits())));
            }
            listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures()));
        }
    }
}