ES5.6.4原始碼分析----ES的查詢過程
程式碼入口
ES5.6.4的查詢功能的程式碼入口位於TransportSearchAction#doExecute
@Override protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) { final long absoluteStartMillis = System.currentTimeMillis(); final long relativeStartNanos = System.nanoTime(); final SearchTimeProvider timeProvider = new SearchTimeProvider(absoluteStartMillis, relativeStartNanos, System::nanoTime); // 獲取叢集的狀態 final ClusterState clusterState = clusterService.state(); // 根據不同叢集分組索引 final Map<String, OriginalIndices> remoteClusterIndices = remoteClusterService.groupIndices(searchRequest.indicesOptions(), searchRequest.indices(), idx -> indexNameExpressionResolver.hasIndexOrAlias(idx, clusterState)); OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); if (remoteClusterIndices.isEmpty()) { executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, remoteClusterIndices, Collections.emptyList(), (clusterName, nodeId) -> null, clusterState, Collections.emptyMap(), listener, clusterState.getNodes() .getDataNodes().size()); } else { remoteClusterService.collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), searchRequest.routing(), remoteClusterIndices, ActionListener.wrap((searchShardsResponses) -> { List<SearchShardIterator> remoteShardIterators = new ArrayList<>(); Map<String, AliasFilter> remoteAliasFilters = new HashMap<>(); BiFunction<String, String, DiscoveryNode> clusterNodeLookup = processRemoteShards(searchShardsResponses, remoteClusterIndices, remoteShardIterators, remoteAliasFilters); int numNodesInvovled = searchShardsResponses.values().stream().mapToInt(r -> r.getNodes().length).sum() + clusterState.getNodes().getDataNodes().size(); executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices, remoteClusterIndices, remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener, numNodesInvovled); }, listener::onFailure)); } }
這個方法做了兩件事:
1、對請求中的索引根據叢集名稱進行分組
由於ES的一次查詢請求是支援誇叢集查詢的,因此在執行請求之前需要對請求中包含的所有索引按照叢集名進行分組。然後按照叢集執行請求。
2、執行搜尋請求。
如果查詢物件包含其他叢集的索引,那麼需要先獲取其他叢集的對應分片資訊,然後執行executeSearch進行查詢。
索引分組
public Map<String, List<String>> groupClusterIndices(String[] requestIndices, Predicate<String> indexExists) { Map<String, List<String>> perClusterIndices = new HashMap<>(); Set<String> remoteClusterNames = getRemoteClusterNames(); for (String index : requestIndices) { int i = index.indexOf(RemoteClusterService.REMOTE_CLUSTER_INDEX_SEPARATOR); if (i >= 0) { String remoteClusterName = index.substring(0, i); List<String> clusters = clusterNameResolver.resolveClusterNames(remoteClusterNames, remoteClusterName); if (clusters.isEmpty() == false) { if (indexExists.test(index)) { // we use : as a separator for remote clusters. might conflict if there is an index that is actually named // remote_cluster_alias:index_name - for this case we fail the request. the user can easily change the cluster alias // if that happens throw new IllegalArgumentException("Can not filter indices; index " + index + " exists but there is also a remote cluster named: " + remoteClusterName); } String indexName = index.substring(i + 1); for (String clusterName : clusters) { perClusterIndices.computeIfAbsent(clusterName, k -> new ArrayList<>()).add(indexName); } } else { perClusterIndices.computeIfAbsent(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, k -> new ArrayList<>()).add(index); } } else { perClusterIndices.computeIfAbsent(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, k -> new ArrayList<>()).add(index); } } return perClusterIndices; }
遍歷索引名稱,遠端索引名稱格式與本地索引的名稱格式不同,叢集名稱:索引名稱。本地索引存在Map中key= LOCAL_CLUSTER_GROUP_KEY對應的value中。遠端索引在Map中key=叢集名稱,value存索引名稱集合。
對索引按照叢集名稱分組後,開始進入查詢
查詢流程
1、 檢視叢集狀態,如果狀態為RED就丟擲異常
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);
2、 由於請求中的索引資訊可能是含有萬用字元的表示式。比如:hik_mac*,因此需要將其解析成具體的索引資訊
indices = indexNameExpressionResolver.concreteIndices(clusterState, searchRequest.indicesOptions(), timeProvider.getAbsoluteStartMillis(), localIndices.indices());
在ES中對索引名稱的解析器有兩種:DateMathExpressionResolver用於解析日期數學表示式表示的索引名稱;WildcardExpressionResolver用於解析萬用字元表示的索引名稱
3、 解析出每個索引對應的路由
Map<String, Set<String>> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(),
searchRequest.indices());
routingMap 的key=索引名稱 value是對應的路由名稱集合
4、 根據routingMap查找出本次請求的所有目標分片
GroupShardsIterator<ShardIterator> localShardsIterator = clusterService.operationRouting().searchShards(clusterState,
concreteIndices, routingMap, searchRequest.preference());
路由轉化為分片號的演算法已經與2.x的演算法不同 ES 5.6.4
private static int calculateScaledShardId(IndexMetaData indexMetaData, String effectiveRouting, int partitionOffset) {
final int hash = Murmur3HashFunction.hash(effectiveRouting) + partitionOffset;
// we don't use IMD#getNumberOfShards since the index might have been shrunk such that we need to use the size
// of original index to hash documents
// 這裡的計算方法不再對NumberOfShards取模,因為這涉及到5.X新增的分片收縮功能。
return Math.floorMod(hash, indexMetaData.getRoutingNumShards()) / indexMetaData.getRoutingFactor();
}
ES 2.3.5
if (routing == null) {
if (!useType) {
hash = hash(hashFunction, id);
} else {
hash = hash(hashFunction, type, id);
}
} else {
hash = hash(hashFunction, routing);
}
if (createdVersion.onOrAfter(Version.V_2_0_0_beta1)) {
return MathUtils.mod(hash, indexMetaData.getNumberOfShards());
5.X 的 計算公式: (hash%routingNumShards)/routingFactor 2.X 的 計算公式: hash%NumOfShards
ES 5.x之所以對路由演算法改進是因為5.X新增分片的收縮功能:
doc0--------> shard0 shard1----->shard’0
doc1--------> shard1 shard2----->shard’0
doc2--------> shard2 收縮成2個分片後 shard3-----shard’1
doc3--------> shard3 shard4----->shard’1 收縮的演算法: targetShardId = sourceShardId/factor; factor=sourceNum/targetNum
收縮後的文件對應分片關係
doc0--------> shard’0
doc1--------> shard’0
doc2--------> shard’1
doc3--------> shard’1
收縮過如果還是按照hash%routingNumShards 這個演算法路由的話會變成:
doc0--------> shard’0
doc1--------> shard’1
doc2--------> shard’0 這就錯了
doc3--------> shard’1
按照(hash%routingNumShards)/routingFactor 演算法:routingNumShards=4,routingFactor=routingNumShards/ NumOfShards=4/2=2
doc0--------> shard’0
doc1--------> shard’0
doc2--------> shard’1
doc3--------> shard’1 結果正確
5、 檢查目標分片數是否超過限制,action.search.shard_count.limit 引數控制
failIfOverShardCountLimit(clusterService, shardIterators.size());
6、 判斷是否需要在查詢前做目標分片過濾
boolean preFilterSearchShards = shouldPreFilterSearchShards(searchRequest, shardIterators);
判斷為true需要滿足3個條件: searchRequest.searchType() == QUERY_THEN_FETCH && //1、查詢型別為QUERY_THEN_FETCH SearchService.canRewriteToMatchNone(source) && // 2、是否能通過查詢重寫預判出查詢結果為空 searchRequest.getPreFilterShardSize() < shardIterators.size(); // 3、實際的查詢分片數量> preFilterShardSize(預設128)
7、 如果需要做分片過濾,就需要進入CAN_MATCH階段
public boolean canMatch(ShardSearchRequest request) throws IOException {
assert request.searchType() == SearchType.QUERY_THEN_FETCH : "unexpected search type: " + request.searchType();
try (DefaultSearchContext context = createSearchContext(request, defaultSearchTimeout, null)) {
SearchSourceBuilder source = context.request().source();
if (canRewriteToMatchNone(source)) {
QueryBuilder queryBuilder = source.query();
return queryBuilder instanceof MatchNoneQueryBuilder == false;
}
return true; // null query means match_all
}
}
首先用createSearchContext對查詢進行重寫,然後根據重寫結果對分片進行過濾。
8、 在每個分片執行查詢請求
if (shardsIts.size() > 0) {
int maxConcurrentShardRequests = Math.min(this.maxConcurrentShardRequests, shardsIts.size());
final boolean success = shardExecutionIndex.compareAndSet(0, maxConcurrentShardRequests);
assert success;
for (int index = 0; index < maxConcurrentShardRequests; index++) {
final SearchShardIterator shardRoutings = shardsIts.get(index);
assert shardRoutings.skip() == false;
performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull());
}
}
9、 從快取中取資料或者執行查詢
private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final SearchContext context) throws Exception {
final boolean canCache = indicesService.canCache(request, context);
context.getQueryShardContext().freezeContext();
if (canCache) {
indicesService.loadIntoContext(request, context, queryPhase);
} else {
queryPhase.execute(context);
}
}
必須要QUERY_THEN_FETCH 才能取快取 請求中的快取引數為true才能取快取,如果沒設定,配置檔案的引數index.requests.cache.enable(預設為true)為true才能取快取。 如果允許取快取,且去得到,就把快取資料反序列化到查詢結果中,否則執行查詢。
10、 判斷是否只做建議查詢,是的話只執行建議查詢,否則先執行搜尋查詢。建議查詢例子:輸入查詢只”unexpacted”,查詢結果返回unexpected。這個功能通過返回編輯距離近似的值,判斷出使用者真正要查的結果。
if (searchContext.hasOnlySuggest()) {
suggestPhase.execute(searchContext);
// TODO: fix this once we can fetch docs for suggestions
searchContext.queryResult().topDocs(
new TopDocs(0, Lucene.EMPTY_SCORE_DOCS, 0),
new DocValueFormat[0]);
return;
}
seggust的流程:
- 用分析器將text解析成sourcetoken流。
- 根據suggest的欄位從lucene索引中獲取指定欄位所有的targettoken流。
- 遍歷sourcetoken流的每個sourcetoken,從targettoken流中篩選出與sourcetoken相似的targettoken。
11、 如果不是隻做建議查詢,就先執行搜尋查詢,再對查詢結果做二次平分,再做建議查詢,再做聚合
final ContextIndexSearcher searcher = searchContext.searcher();
boolean rescore = execute(searchContext, searcher, searcher::setCheckCancelled);
if (rescore) { // only if we do a regular search
rescorePhase.execute(searchContext);
}
suggestPhase.execute(searchContext);
aggregationPhase.execute(searchContext);
12、 如果有要求,對查詢語句進行分析
if (searchContext.getProfilers() != null) {
ProfileShardResult shardResults = SearchProfileShardResults
.buildShardResults(searchContext.getProfilers());
searchContext.queryResult().profileResults(shardResults);
}