elasticsearch原始碼分析之search查詢(十一)
分散式查詢
elasticsearch的搜尋主要分為結構化搜尋和全文檢索。
結構化搜尋(Structured search) 是指有關探詢那些具有內在結構資料的過程。比如日期、時間和數字都是結構化的:它們有精確的格式,我們可以對這些格式進行邏輯操作。比較常見的操作包括比較數字或時間的範圍,或判定兩個值的大小。說白了就是類SQL檢索。
全文搜尋(full-text search)是怎樣在全文欄位中搜索到最相關的文件。
因為我們主要針對解決OLAP問題,所以此處只介紹結構化搜尋。
elasticsearch整個查詢是scatter/gather思想,也是多數分散式查詢的套路,即:
1. master服務端(配置為node.master: true)接收客戶端請求,查詢對應的index、shard,分發資料請求到對應node服務端(node.data: true)
2. node端負責資料查詢,返回結果到master端
3. master端把查詢結果進行資料合併
上面流程是一個邏輯流程,es的具體查詢過程中會分為不同的查詢型別:QUERY_THEN_FETCH、QUERY_AND_FETCH(Deprecated),有不同的查詢動作。
由於QUERY_AND_FETCH在5.X已經廢除(使用QUERY_THEN_FETCH替代),所以這裡只介紹QUERY_THEN_FETCH查詢流程。
master服務端
1、接收查詢請求,進行readblock檢查。根據request的index構造相應的ShardsIterator,shardIterators由localShardsIterator和remoteShardIterators合併而成,使用者遍歷所有的shard。生成shardits會有一些查詢策略,控制每個shard的查詢優先次序和條件控制。
preferenceType = Preference.parse(preference);
switch (preferenceType) {
case PREFER_NODES:
final Set<String> nodesIds =
Arrays.stream(
preference.substring(Preference.PREFER_NODES.type().length() + 1 ).split(",")
).collect(Collectors.toSet());
return indexShard.preferNodeActiveInitializingShardsIt(nodesIds);
case LOCAL:
return indexShard.preferNodeActiveInitializingShardsIt(Collections.singleton(localNodeId));
case PRIMARY:
return indexShard.primaryActiveInitializingShardIt();
case REPLICA:
return indexShard.replicaActiveInitializingShardIt();
case PRIMARY_FIRST:
return indexShard.primaryFirstActiveInitializingShardsIt();
case REPLICA_FIRST:
return indexShard.replicaFirstActiveInitializingShardsIt();
case ONLY_LOCAL:
return indexShard.onlyNodeActiveInitializingShardsIt(localNodeId);
case ONLY_NODES:
String nodeAttributes = preference.substring(Preference.ONLY_NODES.type().length() + 1);
return indexShard.onlyNodeSelectorActiveInitializingShardsIt(nodeAttributes.split(","), nodes);
default:
throw new IllegalArgumentException("unknown preference [" + preferenceType + "]");
}
2、根據條件設定查詢型別,根據查詢型別構造出AbstractSearchAsyncAction(繼承了InitialSearchPhase),非同步查詢action。查詢型別QUERY_THEN_FETCH構造出SearchQueryThenFetchAsyncAction。start方法啟動非同步查詢。
QUERY階段
3、query shard階段。如果需要查詢的shard數為空,則直接返回。遍歷shardits,每個shard執行query請求操作
for (final SearchShardIterator shardIt : shardsIts) {
shardIndex++;
final ShardRouting shard = shardIt.nextOrNull();
if (shard != null) {
performPhaseOnShard(shardIndex, shardIt, shard);
} else {
// really, no shards active in this group
onShardFailure(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
}
}
4、監聽所有shard query請求,成功返回回撥onShardResult方法,失敗返回回撥onShardFailure方法。onShardResult維護了shard計數器的工作,onShardFailure維護了計數器和shard失敗處理工作(失敗後請求該shard的下一個副本,重新發起請求)。上面所有shard均已返回(計數器判斷),則執行onPhaseDone,即executeNextPhase,進入fetch階段。
try {
executePhaseOnShard(shardIt, shard, new SearchActionListener<FirstResult>(new SearchShardTarget(shard.currentNodeId(),
shardIt.shardId(), shardIt.getClusterAlias(), shardIt.getOriginalIndices()), shardIndex) {
@Override
public void innerOnResponse(FirstResult result) {
onShardResult(result, shardIt);
}
@Override
public void onFailure(Exception t) {
onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, t);
}
});
} catch (ConnectTransportException | IllegalArgumentException ex) {
onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, ex);
}
FETCH階段
5、FetchSearchPhase,fetch階段。如果query階段shard全部失敗,則通過raisePhaseFailure丟擲異常,否則執行FetchSearchPhase.innerRun。如果不需要進行fetch抓取(聚合查詢),則直接呼叫finishPhase進行資料合併處理;如果需要進行fetch抓取(明細查詢),則呼叫executeFetch進行資料抓取,返回後進行資料合併。
6、資料合併工作主要有searchPhaseController.merge完成。主要完成search hits,合併aggregations聚合和分析結果。結果返回給client。
context.onResponse(context.buildSearchResponse(response, scrollId));
...
public final SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId) {
return new SearchResponse(internalSearchResponse, scrollId, getNumShards(), successfulOps.get(),
buildTookInMillis(), buildShardFailures());
}
...
public final void onResponse(SearchResponse response) {
listener.onResponse(response);
}
node服務端
QUERY階段
1、接收到master端傳送來的queryaction,執行executeQueryPhase。其中SearchContext為查詢階段的上下文物件,讀取某個參考時間點快照的shard(IndexReader / contextindexsearcher),支援從query階段到fetch階段,查詢過程中主要操作該物件。
final SearchContext context = createAndPutContext(request);
final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
context.incRef();
boolean queryPhaseSuccess = false;
try {
context.setTask(task);
operationListener.onPreQueryPhase(context);
long time = System.nanoTime();
contextProcessing(context);
loadOrExecuteQueryPhase(request, context);
if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) {
freeContext(context.id());
} else {
contextProcessedSuccessfully(context);
}
final long afterQueryTime = System.nanoTime();
queryPhaseSuccess = true;
operationListener.onQueryPhase(context, afterQueryTime - time);
if (request.numberOfShards() == 1) {
return executeFetchPhase(context, operationListener, afterQueryTime);
}
return context.queryResult();
} catch (Exception e) {
// execution exception can happen while loading the cache, strip it
if (e instanceof ExecutionException) {
e = (e.getCause() == null || e.getCause() instanceof Exception) ?
(Exception) e.getCause() : new ElasticsearchException(e.getCause());
}
if (!queryPhaseSuccess) {
operationListener.onFailedQueryPhase(context);
}
logger.trace("Query phase failed", e);
processFailure(context, e);
throw ExceptionsHelper.convertToRuntime(e);
} finally {
cleanContext(context);
}
建立context程式碼
final DefaultSearchContext searchContext = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget, engineSearcher, indexService, indexShard, bigArrays, threadPool.estimatedTimeInMillisCounter(), timeout, fetchPhase);
2、執行查詢階段,loadOrExecuteQueryPhase(request, context)。首先在cache裡面判斷是否有快取,如果有則執行快取查詢indicesService.loadIntoContext;如果cache裡面沒有,執行queryPhase.execute(context),程式碼如下:
if (searchContext.hasOnlySuggest()) {
suggestPhase.execute(searchContext);
// TODO: fix this once we can fetch docs for suggestions
searchContext.queryResult().topDocs(
new TopDocs(0, Lucene.EMPTY_SCORE_DOCS, 0),
new DocValueFormat[0]);
return;
}
// Pre-process aggregations as late as possible. In the case of a DFS_Q_T_F
// request, preProcess is called on the DFS phase phase, this is why we pre-process them
// here to make sure it happens during the QUERY phase
aggregationPhase.preProcess(searchContext);
boolean rescore = execute(searchContext, searchContext.searcher());
if (rescore) { // only if we do a regular search
rescorePhase.execute(searchContext);
}
suggestPhase.execute(searchContext);
aggregationPhase.execute(searchContext);
if (searchContext.getProfilers() != null) {
ProfileShardResult shardResults = SearchProfileShardResults
.buildShardResults(searchContext.getProfilers());
searchContext.queryResult().profileResults(shardResults);
}
3、其中execute是對索引進行查詢,呼叫lucene的searcher.search(query, collector)。還支援聚合查詢,aggregationPhase.execute(searchContext)(下節介紹)。
4、最終返回context.queryResult()。
FETCH階段
1、接收到來自master端的fetchquery,執行executeFetchPhase。首先通過request尋找SearchContext,findContext(request.id(), request)。
final SearchContext context = findContext(request.id(), request);
final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
context.incRef();
try {
context.setTask(task);
contextProcessing(context);
if (request.lastEmittedDoc() != null) {
context.scrollContext().lastEmittedDoc = request.lastEmittedDoc();
}
context.docIdsToLoad(request.docIds(), 0, request.docIdsSize());
operationListener.onPreFetchPhase(context);
long time = System.nanoTime();
fetchPhase.execute(context);
if (fetchPhaseShouldFreeContext(context)) {
freeContext(request.id());
} else {
contextProcessedSuccessfully(context);
}
operationListener.onFetchPhase(context, System.nanoTime() - time);
return context.fetchResult();
} catch (Exception e) {
operationListener.onFailedFetchPhase(context);
logger.trace("Fetch phase failed", e);
processFailure(context, e);
throw ExceptionsHelper.convertToRuntime(e);
} finally {
cleanContext(context);
}
2、核心的查詢方法是fetchPhase.execute(context)。主要是輪流通過上輪query結果中的docsIds,建立SearchHit[]集合,最後放在fetchResult中。
for (int index = 0; index < context.docIdsToLoadSize(); index++) {
...
final SearchHit searchHit;
try {
int rootDocId = findRootDocumentIfNested(context, subReaderContext, subDocId);
if (rootDocId != -1) {
searchHit = createNestedSearchHit(context, docId, subDocId, rootDocId, fieldNames, fieldNamePatterns, subReaderContext);
} else {
searchHit = createSearchHit(context, fieldsVisitor, docId, subDocId, subReaderContext);
}
} catch (IOException e) {
throw ExceptionsHelper.convertToElastic(e);
}
hits[index] = searchHit;
hitContext.reset(searchHit, subReaderContext, subDocId, context.searcher());
for (FetchSubPhase fetchSubPhase : fetchSubPhases) {
fetchSubPhase.hitExecute(context, hitContext);
}
}
for (FetchSubPhase fetchSubPhase : fetchSubPhases) {
fetchSubPhase.hitsExecute(context, hits);
}
context.fetchResult().hits(new SearchHits(hits, context.queryResult().getTotalHits(), context.queryResult().getMaxScore()));
3、釋放SearchContext,freeContext。該釋放有兩類情況:1是在masterquer端如果命中該shard(需要該shard執行fetch),則執行fetch完成之後(如上介紹);2是沒有命中該shard,則在master端會發送釋放context的請求到指定節點,進行釋放。
4、fetch查詢結果返回給master端。完成。
總結
ES整個查詢過程是scatter/gather的過程,具體如下:
下節詳細學習一下es的聚合查詢Aggregation。