ES5.6 search流程與scroll
ES search大致流程
請求轉化,由RestSearchAction轉為TransportSearchAction,執行其doExecute()方法
1、如果查詢請求的索引含有正則表示式和別名,找出具體的索引
indices = indexNameExpressionResolver.concreteIndices(clusterState, searchRequest.indicesOptions(),
timeProvider.getAbsoluteStartMillis(), localIndices.indices());
2、找出routing,並根據routing找出shard
Map<String, Set<String>> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(), searchRequest.indices()); GroupShardsIterator<ShardIterator> localShardsIterator = clusterService.operationRouting().searchShards(clusterState, concreteIndices, routingMap, searchRequest.preference());
3、解析請求中的權重boost
Map<String, Float> concreteIndexBoosts = resolveIndexBoosts(searchRequest, clusterState);
4、根據請求的型別選擇不同的query類
switch(searchRequest.searchType()) { case DFS_QUERY_THEN_FETCH: searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup, aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators, timeProvider, clusterStateVersion, task); break; case QUERY_AND_FETCH: case QUERY_THEN_FETCH: searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup, aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators, timeProvider, clusterStateVersion, task); break; default: throw new IllegalStateException("Unknown search type: [" + searchRequest.searchType() + "]"); }
插播一條DFS_QUERY_THEN_FETCH與QUERY_THEN_FETCH的區別。
- QUERY_THEN_FETCH
請求執行有兩個階段。第一個階段就是查詢所有相關的shards。所有的shards執行請求並根據form和size返回一個排好序的結果。協調節點獲取到所有的資訊後merge並resort,然後根據form和size取出結果。第二個階段則是根據第一階段的結果在相應的shard上取出資料。 - DFS_QUERY_THEN_FETCH
與QUERY_THEN_FETCH幾乎一樣。只有在第一階段為了更精確的打分,計算的是分散式的term frequencies。
5、SearchQueryThenFetchAsyncAction中則是執行標準的查詢流程:
- query
- fetch
- merge
SearchQueryThenFetchAsyncAction標準查詢流程
1、Query
首先遍歷shard,執行shard查詢請求。
for (final SearchShardIterator shardIt : shardsIts) {
shardIndex++;
final ShardRouting shard = shardIt.nextOrNull();
if (shard != null) {
performPhaseOnShard(shardIndex, shardIt, shard);
} else {
// really, no shards active in this group
onShardFailure(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
}
}
每個shard通過SearchTransportService傳送query請求:
getSearchTransport().sendExecuteQuery(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()),
buildShardSearchRequest(shardIt), getTask(), listener);
本地則是執行TransportService的sendLocalRequest方法.最終呼叫的是
handler.messageReceived(request, channel);
此處的channel是DirectResponseChannel,而此處的handler則是TransportSearchAction 初始化的時候寫入SearchTransportService中的。
transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH,
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
@Override
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
SearchPhaseResult result = searchService.executeQueryPhase(request, (SearchTask)task);
channel.sendResponse(result);
}
});
因此,最終執行的是searchService的executeQueryPhase方法。
真正的query本地查詢:
1)生成一個searchContext,然後將request寫入context:
final DefaultSearchContext searchContext = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget,
engineSearcher, indexService, indexShard, bigArrays, threadPool.estimatedTimeInMillisCounter(), timeout, fetchPhase);
parseSource(context, request.source());
2)queryPhase預處理。比如說如果請求沒有query,預設為match_all,如果沒有boost,採用預設的值等等
queryPhase.preProcess(context);
3)執行查詢
loadOrExecuteQueryPhase(request, context);
loadOrExecuteQueryPhase中則是呼叫queryPhase執行查詢:
queryPhase.execute(context);
在queryPhase中執行真正的query查詢:
1)重寫query,比如說 講一個prefixQuery轉化為多個包含termQuery的BoolQuery
assert query == searcher.rewrite(query); // already rewritten
2)將各個子查詢求用Collector包裹,Collector傳給lucene,進行真正的lucene查詢
if (terminateAfterSet) {
final Collector child = collector;
// throws Lucene.EarlyTerminationException when given count is reached
collector = Lucene.wrapCountBasedEarlyTerminatingCollector(collector, searchContext.terminateAfter());
if (doProfile) {
collector = new InternalProfileCollector(collector, CollectorResult.REASON_SEARCH_TERMINATE_AFTER_COUNT,
Collections.singletonList((InternalProfileCollector) child));
}
}
if (searchContext.parsedPostFilter() != null) {
final Collector child = collector;
// this will only get applied to the actual search collector and not
// to any scoped collectors, also, it will only be applied to the main collector
// since that is where the filter should only work
final Weight filterWeight = searcher.createNormalizedWeight(searchContext.parsedPostFilter().query(), false);
collector = new FilteredCollector(collector, filterWeight);
if (doProfile) {
collector = new InternalProfileCollector(collector, CollectorResult.REASON_SEARCH_POST_FILTER,
Collections.singletonList((InternalProfileCollector) child));
}
}
// plug in additional collectors, like aggregations
final List<Collector> subCollectors = new ArrayList<>();
subCollectors.add(collector);
subCollectors.addAll(searchContext.queryCollectors().values());
collector = MultiCollector.wrap(subCollectors);
//最後包裹一個Cancellable
if (collector != null) {
final Collector child = collector;
collector = new CancellableCollector(searchContext.getTask()::isCancelled, searchContext.lowLevelCancellation(), collector);
if (doProfile) {
collector = new InternalProfileCollector(collector, CollectorResult.REASON_SEARCH_CANCELLED,
Collections.singletonList((InternalProfileCollector) child));
}
}
3)執行lucene的查詢
searcher.search(query, collector);
4)將查詢結果以TopDocs的方式返回
queryResult.topDocs(topDocsCallable.call(), sortValueFormats);
topDocsCallable是根據不同的查詢封裝的。舉個例子:
if (query.getClass() == MatchAllDocsQuery.class) {
collector = null;
topDocsCallable = new Callable<TopDocs>() {
@Override
public TopDocs call() throws Exception {
int count = searcher.getIndexReader().numDocs();
return new TopDocs(count, Lucene.EMPTY_SCORE_DOCS, 0);
}
};
} else if (query.getClass() == TermQuery.class && searcher.getIndexReader().hasDeletions() == false) {
final Term term = ((TermQuery) query).getTerm();
collector = null;
topDocsCallable = new Callable<TopDocs>() {
@Override
public TopDocs call() throws Exception {
int count = 0;
for (LeafReaderContext context : searcher.getIndexReader().leaves()) {
count += context.reader().docFreq(term);
}
return new TopDocs(count, Lucene.EMPTY_SCORE_DOCS, 0);
}
};
}
2、fetch階段(+merge階段):
執行的是FetchSearchPhase的dorun方法。
1)首先reduce
final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = resultConsumer.reduce();
resultConsumer 是在構造SearchQueryThenFetchAsyncAction的時候建立的,執行的是SearchPhaseController的reducedQueryPhase方法。
searchPhaseController.newSearchPhaseResults(request, shardsIts.size())
2)查詢reduce出的文件id,按照shardId存放
final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(numShards, reducedQueryPhase.scoreDocs);
如果是scroll查詢,將lastEmittedDocPerShard存入ShardFetchSearchRequest:
final ScoreDoc[] lastEmittedDocPerShard = isScrollSearch ?
searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, numShards)
: null;
ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult().getRequestId(), i, entry,
lastEmittedDocPerShard, searchShardTarget.getOriginalIndices());
3)針對於每個shard,建立fetch請求,並執行
ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult().getRequestId(), i, entry,
lastEmittedDocPerShard, searchShardTarget.getOriginalIndices());
executeFetch(i, searchShardTarget, counter, fetchSearchRequest, queryResult.queryResult(),
connection);
本地節點收到請求後執行的是SearchService的executeFetchPhase方法。這主要涉及些lucene的東西,不在詳細解析。
4)當從所有的shard完獲取完結果之後執行:
final Runnable finishPhase = ()
-> moveToNextPhase(searchPhaseController, scrollId, reducedQueryPhase, queryAndFetchOptimization ?
queryResults : fetchResults);
moveToNextPhase中開始執行merge:
final InternalSearchResponse internalResponse = searchPhaseController.merge(context.getRequest().scroll() != null,
reducedQueryPhase, fetchResultsArr.asList(), fetchResultsArr::get);
merge結束後,通過nextPhaseFactory將結果傳送出去:
(response, scrollId) -> new ExpandSearchPhase(context, response,
(finalResponse) -> sendResponsePhase(finalResponse, scrollId, context))
private static SearchPhase sendResponsePhase(InternalSearchResponse response, String scrollId, SearchPhaseContext context) {
return new SearchPhase("response") {
@Override
public void run() throws IOException {
context.onResponse(context.buildSearchResponse(response, scrollId));
}
};
}
context則是SearchQueryThenFetchAsyncAction的父類AbstractSearchAsyncAction,呼叫buildSearchResponse方法構造response,並回調listener。
public final void onResponse(SearchResponse response) {
listener.onResponse(response);
}
scroll查詢
scroll查詢原理
在第一次查詢時,記錄上一次查詢的位置,在接下來的查詢中獲取到上次查詢的位置,接著查詢。
比如說將查詢order by time offset 0 limit 100,改寫成order by time where time>0 limit 100,記錄最後一個$time_max,接下來的查詢order by time offset 100 limit 100,改寫成order by time where time>$time_max limit 100。如此往復,可以看出scroll是一個常量查詢延遲和開銷。
這個從原始碼中也可以看出:
if (returnsDocsInOrder(query, searchContext.sort())) {
if (scrollContext.totalHits == -1) {
// first round
assert scrollContext.lastEmittedDoc == null;
// there is not much that we can optimize here since we want to collect all
// documents in order to get the total number of hits
} else {
// now this gets interesting: since we sort in index-order, we can directly
// skip to the desired doc and stop collecting after ${size} matches
if (scrollContext.lastEmittedDoc != null) {
if (scrollContext.lastEmittedDoc != null) {
BooleanQuery bq = new BooleanQuery.Builder()
.add(query, BooleanClause.Occur.MUST)
.add(new MinDocQuery(after.doc + 1), BooleanClause.Occur.FILTER)
.build();
query = bq;
}
searchContext.terminateAfter(numDocs);
}
}
當scrollContext.lastEmittedDoc不為空也就是查詢scroll的下一頁時,將查詢變為一個帶偏移量的bool查詢。
在本地查出資料後,更新scrollContext的值:
if (scrollContext != null) {
if (scrollContext.totalHits == -1) {
// first round
scrollContext.totalHits = topDocs.totalHits;
scrollContext.maxScore = topDocs.getMaxScore();
} else {
// subsequent round: the total number of hits and
// the maximum score were computed on the first round
topDocs.totalHits = scrollContext.totalHits;
topDocs.setMaxScore(scrollContext.maxScore);
}
if (searchContext.request().numberOfShards() == 1) {
// if we fetch the document in the same roundtrip, we already know the last emitted doc
if (topDocs.scoreDocs.length > 0) {
// set the last emitted doc
scrollContext.lastEmittedDoc = topDocs.scoreDocs[topDocs.scoreDocs.length - 1];
}
}
}
我們看到上述只更新了scrollContext的totalHits和maxScore,而lastEmittedDoc則是在fetch階段中更新的:
先將lastEmittedDocPerShard存入ShardFetchSearchRequest:
final ScoreDoc[] lastEmittedDocPerShard = isScrollSearch ?
searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, numShards)
: null;
ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult().getRequestId(), i, entry,
lastEmittedDocPerShard, searchShardTarget.getOriginalIndices());
然後每個節點在接收到fetch資訊後,執行SearchService的executeFetchPhase方法:
if (request.lastEmittedDoc() != null) {
context.scrollContext().lastEmittedDoc = request.lastEmittedDoc();
}
當使用scroll請求時,scrollID是不變的,從SearchScrollAsyncAction程式碼中可以看出:
protected final void sendResponse(SearchPhaseController.ReducedQueryPhase queryPhase,
final AtomicArray<? extends SearchPhaseResult> fetchResults) {
try {
final InternalSearchResponse internalResponse = searchPhaseController.merge(true, queryPhase, fetchResults.asList(),
fetchResults::get);
// the scroll ID never changes we always return the same ID. This ID contains all the shards and their context ids
// such that we can talk to them abgain in the next roundtrip.
String scrollId = null;
if (request.scroll() != null) {
scrollId = request.scrollId();
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.getContext().length, successfulOps.get(),
buildTookInMillis(), buildShardFailures()));
} catch (Exception e) {
listener.onFailure(new ReduceSearchPhaseException("fetch", "inner finish failed", e, buildShardFailures()));
}
}
作者:YG_9013
連結:https://www.jianshu.com/p/f92af1e87100
來源:簡書
簡書著作權歸作者所有,任何形式的轉載都請聯絡作者獲得授權並註明出處。