elasticsearch 2.3.5 源碼簡單分析
modules.add(new ActionModule(true)); 用來加載請求操作對應的類
ActionModule 定義每種操作對應的類,相當於入口
registerAction(SearchAction.INSTANCE, TransportSearchAction.class); 搜索請求對應這個類,
registerAction(DeleteAction.INSTANCE, TransportDeleteAction.class); 刪除請求對應這個類。
重點看TransportSearchAction類。
switch(searchRequest.searchType()) { case DFS_QUERY_THEN_FETCH: searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(logger, searchService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener); break; case QUERY_THEN_FETCH: searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener); break; case DFS_QUERY_AND_FETCH: searchAsyncAction = new SearchDfsQueryAndFetchAsyncAction(logger, searchService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener); break; case QUERY_AND_FETCH: searchAsyncAction = new SearchQueryAndFetchAsyncAction(logger, searchService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener); break;
根據查詢方式,走不同的分支,
if (shardCount == 1) {
// if we only have one group, then we always want Q_A_F, no need for DFS, and no need to do THEN since we hit one shard
searchRequest.searchType(QUERY_AND_FETCH);
}
當只有一個分片時,直接采用QUERY_AND_FETCH 方式,這種方式在多分片時,會返回 n*size 個結果,但在一個分片時是沒問題的,而且速度快,後面會講到。
首先看一下QUERY_AND_FETCH 對應的操作流程
SearchQueryAndFetchAsyncAction 是入口,我們看一下該類的start 方法,在其父類中
for (final ShardIterator shardIt : shardsIts) {
shardIndex++;
final ShardRouting shard = shardIt.nextOrNull();
if (shard != null) {
performFirstPhase(shardIndex, shardIt, shard);
} else {
// really, no shards active in this group
onFirstPhaseResult(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
}
}
根據涉及到分片,以此執行performFirstPhase,其的作用就是生成一個內部的分片搜索請求,這種請求只針對一個分片,最終調用以下代碼
sendExecuteFirstPhase(node, internalSearchRequest(shard, shardsIts.size(), request, filteringAliases, startTime()), new ActionListener<FirstResult>() {
@Override
public void onResponse(FirstResult result) {
onFirstPhaseResult(shardIndex, shard, result, shardIt);
}
@Override
public void onFailure(Throwable t) {
onFirstPhaseResult(shardIndex, shard, node.id(), shardIt, t);
}
});
onFirstPhaseResult主要作用是調用子類的moveToSecondPhase。這個方法在executeFetchPhase之後才執行的,因此在其後面再介紹。接著執行
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request,
ActionListener<QueryFetchSearchResult> listener) {
searchService.sendExecuteFetch(node, request, listener);
}
sendExecuteFetch定義在SearchServiceTransportAction中,它會將分片搜索請求轉發到對應的節點上。
看一樣其源代碼
public void sendExecuteFetch(DiscoveryNode node, final ShardSearchTransportRequest request, final ActionListener<QueryFetchSearchResult> listener) {
transportService.sendRequest(node, QUERY_FETCH_ACTION_NAME, request, new ActionListenerResponseHandler<QueryFetchSearchResult>(listener) {
@Override
public QueryFetchSearchResult newInstance() {
return new QueryFetchSearchResult();
}
});
}
註意另個handler
transportService.registerRequestHandler(QUERY_FETCH_ACTION_NAME, ShardSearchTransportRequest.class, ThreadPool.Names.SEARCH, new SearchQueryFetchTransportHandler());`
class SearchQueryFetchTransportHandler extends TransportRequestHandler<ShardSearchTransportRequest> {
@Override
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel) throws Exception {
QueryFetchSearchResult result = searchService.executeFetchPhase(request);
channel.sendResponse(result);
}
}
接著看sendRequest
if (node.equals(localNode)) {
sendLocalRequest(requestId, action, request);
} else {
transport.sendRequest(node, requestId, action, request, options);
}
sendLocalRequest 最終會調用SearchQueryFetchTransportHandler 的messageReceived 方法, 從而開始 執行 executeFetchPhase。
看一下 executeFetchPhase的源代碼:
loadOrExecuteQueryPhase(request, context, queryPhase); 裏面是
if (canCache) {
indicesQueryCache.loadIntoContext(request, context, queryPhase);
} else {
queryPhase.execute(context);
}
execute 裏面是最終的查詢操作,包括各種參數的解析,這裏先不講,
接著看executeFetchPhase 類, 發現裏面還有
fetchPhase.execute(context);
最終返回
return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
從上面的代碼可以看出,queryPhase和fetchPhase是連續執行的,這就是query_and_fetch的含義。
返回的結果中包含query的結果和fetch的結果。對應的兩個類分別是QuerySearchResult和FetchSearchResult。
QuerySearchResult 類
private long id;
private SearchShardTarget shardTarget;
private int from;
private int size;
private TopDocs topDocs;
private InternalAggregations aggregations;
private List<SiblingPipelineAggregator> pipelineAggregators;
private Suggest suggest;
private boolean searchTimedOut;
private Boolean terminatedEarly = null;
private List<ProfileShardResult> profileShardResults;
再看看TopDocs的定義:
/** The total number of hits for the query. */
public int totalHits;
/** The top hits for the query. */
public ScoreDoc[] scoreDocs;
/** Stores the maximum score value encountered, needed for normalizing. */
private float maxScore;
裏面包含了搜索命中的結果數量、文檔編號、文檔匹配分數、最大匹配分數。再看看ScoreDoc是如何定義的。
/** The score of this document for the query. */
public float score;
/** A hit document‘s number.
* @see IndexSearcher#doc(int) */
public int doc;
/** Only set by {@link TopDocs#merge} */
public int shardIndex;
這裏的doc就是內部的文檔編號,可以通過IndexSearcher#doc(int)方法獲取對應的文檔內容。
因此QuerySearchResult中只包含了內部的文檔編號、文檔的匹配分值。
再看 FetchSearchResult
private long id;
private SearchShardTarget shardTarget;
private InternalSearchHits hits;
// client side counter
private transient int counter;
InternalSearchHits的定義如下:
private InternalSearchHit[] hits;
public long totalHits;
private float maxScore;
InternalSearchHit 源碼如下
private transient int docId;
private float score = Float.NEGATIVE_INFINITY;
private Text id;
private Text type;
private InternalNestedIdentity nestedIdentity;
private long version = -1;
private BytesReference source;
private Map<String, SearchHitField> fields = ImmutableMap.of();
private Map<String, HighlightField> highlightFields = null;
private Object[] sortValues = EMPTY_SORT_VALUES;
private String[] matchedQueries = Strings.EMPTY_ARRAY;
private Explanation explanation;
@Nullable
private SearchShardTarget shard;
private Map<String, Object> sourceAsMap;
private byte[] sourceAsBytes;
private Map<String, InternalSearchHits> innerHits;
它包含了文檔的原始內容和解析後的內容。
moveToSecondPhase執行過程
在上面的executeFetchPhase執行完成之後,得到query結果和fetch結果之後,就執行moveToSecondPhase了,關鍵的兩行
sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults,
firstResults, request);
從代碼中可以看出,搜索的第二階段是在search線程池中提交一個任務,首先是對分片結果進行整體排序,然後將搜索結果進行合並。這裏面分別調用了searchPhaseController.sortDocs和searchPhaseController.merge兩個方法。
排序先不說,merge 主要是合並 hits suggest addAggregation, 所以我們會看到每個search 請求會有 hit 區域 聚合區域
所以 如果在搜索的時候指定search_type為query_and_fetch,再指定size為10,那麽就會返回50個結果。,因為每個分片都返回了10個文檔,如果有5個分片的話,那麽最後合並的結果就是50個。
query_then_fetch執行有些類似
其 executeQueryPhase,如下
if (canCache) {
indicesQueryCache.loadIntoContext(request, context, queryPhase);
} else {
queryPhase.execute(context);
}
只有query沒有fetch。這是很重要的區別,它只包含文檔編號和必要的排序分值
當queryPhase結束之後,就開始第二階段了。第二階段從moveToSecondPhase開始。它的代碼定義在TransportSearchQueryThenFetchAction中。主要的作用是將第一階段獲取的文檔編號進行排序。排序完成之後再根據文檔編號獲取文檔裏面實際的內容。相關的代碼如下:
sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults);
searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);
for (AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
QuerySearchResultProvider queryResult = firstResults.get(entry.index);
DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId());
ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult(), entry, lastEmittedDocPerShard);
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
}
sortDocs前面已經講過了,作用是將文檔編號根據每個文檔的匹配分值進行排序。
executeFetch相關的代碼如下。它的作用是調用searchService開啟一個異步任務,根據文檔編號獲取文檔的具體內容,並將結果存放到fetchResults中。根據counter判斷如果所有的fetch任務都執行完了,就調用finishHim來完成本次查詢結果。
finishHim相關的代碼如下。它的作用是合並每個分片的查詢結果,讓後將合並結果通知給listener。讓它完成最後的查詢結果。searchPhaseController.merge在前面講過了。
剩下的一些內容下次再聊,參考內容
https://blog.csdn.net/caipeichao2/article/details/46418413
elasticsearch 2.3.5 源碼簡單分析