1. 程式人生 > >es lucene搜尋及聚合流程原始碼分析

es lucene搜尋及聚合流程原始碼分析

本文以TermQuery,GlobalOrdinalsStringTermsAggregator為例,通過程式碼,分析es,lucene搜尋及聚合流程。
1:協調節點收到請求後,將search任務發到相關的各個shard。

相關程式碼:

TransportSearchAction.executeSearch
TransportSearchAction.searchAsyncAction.start
AbstractSearchAsyncAction.executePhase(SearchQueryThenFetchAsyncAction)
InitialSearchPhase.performPhaseOnShard
SearchQueryThenFetchAsyncAction.executePhaseOnShard

2:資料節點查詢及聚合一個shard。

相關程式碼:

SearchService.executeQueryPhase

 2.1:根據request構造SearchContext。

SearchContext
包含Query,Aggregator等重要資訊。並將記錄查詢,聚合結果。
Query
根據request建立具體的query,如:
TermQuery:用於keyword,text欄位。索引結構為倒排。
PointRangeQuery:用於數字,日期,ip,point等欄位。索引結構為k-d tree。
Aggregator
此時僅根據request建立AggregatorFactory,用於後續建立Aggregator。

相關程式碼:

SearchService.createAndPutContext

2.2:根據SearchContext構造Aggregator。

根據SearchContext構造具體的Aggregator,如:
GlobalOrdinalsStringTermsAggregator:用於keyword欄位,開啟global ordinal的term聚合。
StringTermsAggregator:用於keyword欄位,關閉global ordinal的term聚合。
LongTermsAggregator:用於long欄位的term聚合。
TopScoreDocCollector:用於為doc 評分並取topN。

相關程式碼:
AggregationPhase.preProcess

2.3:建立GlobalOrdinalsStringTermsAggregator,如果cache中沒有GlobalOrdinals,將建立GlobalOrdinals,並cache。當shard下資料發生變化時,應當清空cache。

GlobalOrdinals
將所有segment ,指定field的所有term排序,合併成一個GlobalOrdinals,並建立OrdinalMap。collect時,使用doc的segment ord獲取global ord。
OrdinalMap
為每一個segmentValueCount小於globalValueCount的segment,儲存了一份segment ord到global ord的mapping(LongValues)。對於segment valueCount等於globalValueCount的segment,原本的segment ord就是global ord,後續獲取ord時,直接從SortedSetDV(dvd)中讀取。
value count
指的是不同term數量(term集合的大小)。使用globalValueCount 用來在collect時,確定結果集的大小。

舉例
segment 1:{sorted terms: [aa, bb, cc],ord:[0, 1, 2]}。
segment 2:{sorted terms: [bb, cc, dd],ord:[0, 1, 2]}。
segment 3:{sorted terms: [aa, bb, cc, dd],ord:[0, 1, 2, 3]}。
GlobalOrdinals:{sorted terms: [aa, bb, cc, dd],ord:[0, 1, 2, 3]}。
ordinalMap:segment1:[0, 1, 2]->[0, 1, 2],segment2:[0, 1, 2]->[1, 2, 3]。segment3則使用原始的segment ord。

docCounts
int[globalValueCount],用來記錄ord對應的count。
注:經查詢條件過濾後,有些ord可能沒有對應doc。

bucketOrds
稀疏(value count多,但doc少)時使用,縮減docCounts size。
LongHash:globalOrd與 id (size)對映。collect時在id處++,build agg時取出id對應的count。
當父聚合是BucketAggregator聚合時,子聚合只對父的某個term聚合,所以doc會減少,使用bucketOrds。
注:按照此邏輯,如果query本身有term過濾條件,也應該啟用bucketOrds(global_ordinals_hash)。

相關程式碼:

TermsAggregatorFactory.doCreateInternal。
//獲取globalValueCount決定是否global_ordinals_low_cardinality, global_ordinals_low_cardinality中又因不是ValuesSource.Bytes.FieldData,建立global_ordinals。
ValuesSource$WithOrdinals.globalMaxOrd。
//通過獲取一個segment的globalOrdinals,觸發如果cache中沒有一個shardId+field對應的globalOrdinals,load 所有segment ord,建立global ords。
ValuesSource$FieldData.globalOrdinalsValues。
SortedSetDVOrdinalsIndexFieldData.loadGlobal。
IndicesFieldDataCache$IndexFieldCache.load
SortedSetDVOrdinalsIndexFieldData.localGlobalDirect。
GlobalOrdinalsBuilder.build。
//globalOrdinals主要類
GlobalOrdinalsIndexFieldData。
MultiDocValues$OrdinalMap

 2.3.1:從docValues中讀取單個segment,指定field的ordinals,term等。

相關程式碼:

SortedSetDVOrdinalsIndexFieldData.load。
SortedSetDVBytesAtomicFieldData.getOrdinalsValues。
//獲取segment指定field的SortedSetDocValues
DocValues.getSortedSet。
//獲取segment的docValuesReader
SegmentReader.getDocValuesReader。
//讀取field的SortedDocValues
Lucene54DocValuesProducer.getSortedSet。

 2.3.2:對多個segment的SortedSetDocValues排序,建立OrdinalMap。

具體為獲取每個segment的SortedDocValuesTermsEnum。使用多個SortedDocValuesTermsEnum構建成小頂堆,合併成一個。

相關程式碼:

MultiDocValues$OrdinalMap.build。
MultiTermsEnum
TermMergeQueue
//獲取一個segment的segment ord到global ord的mapping。
MultiDocValues$OrdinalMap.getGlobalOrds

 2.4:查詢及聚合資料。

相關程式碼:

QueryPhase.execute。

2.4.1:根據Query建立具體的weight。
weigth將用於query segment,並建立scorer。
scorer將用於評分和collect。
如果需要評分,讀取field的fst,查詢term,定位postings將提前到這裡執行。

相關程式碼:

IndexSearcher.createNormalizedWeight。
TermQuery.createWeight。

 2.4.2:為每個leafReader(segment)建立leafCollector。

建立LeafBucketCollector,獲取該segment的globalOrds。
globalOrds
如果segment的value count等於global value count,則返回segment ords(從dvd中讀取);

如果不等,則從OrdinalMap中獲取該segment的GlobalOrdinalMapping,且該segment的value count改為獲取global value count。
singleValues
並判斷該field的docValues是否為singleValues(keyword single ord,text則為多term多ord)。

相關程式碼:

//序列查詢及聚合一個分片下的所有segment。
IndexSearcher.search。
IndexSearcher.search.collector.getLeafCollector。
GlobalOrdinalsStringTermsAggregator.getLeafCollector。
//獲取指定segment的globalOrdinals,如果cache中沒有該shardId+field對應的globalOrdinals,load 所有segment ord,建立global ords。
ValuesSource$FieldData.globalOrdinalsValues
//獲取一個segment的global ords。
GlobalOrdinalsIndexFieldData$Atomic.getOrdinalsValues
//提供獲取該segment ord對應的global ord,使用globalOrd獲取termBytes等方法。
GlobalOrdinalMapping
//singleValues
SingletonSortedSetDocValues

 2.4.3:query該segment, 獲取DocIdSetIterator,並構造scorer。

DocIdSetIterator即查詢出的docId集合,對於倒排是PostingsEnum,對於數字使用的是BitSetIterator。

相關程式碼:

IndexSearcher.search.weight.bulkScorer。
Weight.bulkScorer。
//構造bulkScorer。
TermQuery$TermWeight.scorer。
//查詢segment,獲取TermsEnum,並根據搜尋關鍵字,定位PostingsEnum位置。
TermQuery$TermWeight.getTermsEnum。

 query segment流程如下:

1:根據field讀取.tip(fst索引結構,term index)檔案,獲取該field下所有term字首構造的索引,並快取。

FST(Finite State Transducer,有限狀態感測器)其他用途:阿里對hbase rowkey索引定位block(類似lucene tip索引term),

自然語言處理中一個單詞或漢字下一個狀態各個狀態的概率。

相關程式碼:
BlockTreeTermsReader.terms。
FieldReader。
//Load a previously saved FST
FST。

注:官方lucene在open IndexReader(es recovery shard)時,就要通過構造SegmentReader,BlockTreeTermsReader,構造FieldReader,讀取FST。

相關程式碼:

DirectoryReader.open 

2:從fst中查詢term,如果能找到的value(fst正常結束),value記錄了

該term字首對應的term dict所在的block(.tim,term dictionary)位置,讀取該block,查詢具體的term,獲取posting所在.doc(postings)的位置。

相關程式碼:

TermQuery$TermWeight.getTermsEnum.termsEnum.seekExact。
SegmentTermsEnum.seekExact。
SegmentTermsEnumFrame.scanToTerm。
//根據termsEnum(已經設定term)讀取postings。
TermQuery$TermWeight.scorer.termsEnum.postings。
SegmentTermsEnum.postings。
//根據termsEnum中的term,設定postings在.doc中位置。
SegmentTermsEnum.postings.currentFrame.decodeMetaData。

3:從.doc中讀取postings,返回PostingsEnum(BlockDocsEnum)。

相關程式碼:

Lucene50PostingsReader.postings。

上述流程如下圖:

postings
(docID, termFreq, positions), (docID, termFreq, positions),.....
termFreq
term在該文件出現的次數。
用於對文件頻分。
positions
term在該文件中每次的位置。
用於短語查詢時,多個term是否連續出現,或者小於指定位置。

2.4.4:遍歷PostingsEnum(過濾deleted doc),評分及collect資料。

相關程式碼:

acceptDocs:getLiveDocs
IndexSearcher.search.scorer.score。
BulkScorer.score。
DefaultBulkScorer.score。
//在查詢結果中前進到>=target的docID,並返回docID。
Lucene50PostingsReader$BlockDocsEnum.advance(target)。
//遍歷BlockDocsEnum(PostingsEnum)中的查詢結果,collect doc。
DefaultBulkScorer.scoreRange。
//collect一個doc。
MultiCollector$MultiLeafCollector.collect。

TopScoreDocCollector對doc評分,並取topN的流程如下:
為該doc評分,並基於score構建N節點的小頂堆,用於保留TopN。

相關程式碼:

TopScoreDocCollector$SimpleTopScoreDocCollector.collect。

1:根據設定的Similarity,使用BM25或TFIDF等演算法為doc評分。

BM25,TFIDF都使用freq,norms(NumericDocValues),演算法不同,可能使用的NumericDocValues也不同。

相關程式碼:

TermScorer.score。
BM25Similarity$BM25DocScorer.score。
TFIDFSimilarity$TFIDFSimScorer.score。
IndexWriterConfig.setSimilarity。
IndexSearcher.setSimilarity。
NumericDocValues。

2:根據doc得到的score構建N節點的小頂堆。

相關程式碼:

TopScoreDocCollector$SimpleTopScoreDocCollector.collect。
PriorityQueue.updateTop/downHeap/insertWithOverflow。

GlobalOrdinalsStringTermsAggregator統計各term doc數的流程如下:
1:根據doc是否為singleValues,獲取doc的ord或ords。

相關程式碼:

//singleValues獲取ord
singleValues.getOrd(doc)。
//獲取ords
//設定doc。
GlobalOrdinalsStringTermsAggregator$LeafBucketCollector.collect.globalOrds.setDocument(doc)
AbstractRandomAccessOrds.setDocument(doc)。
//獲取doc對應的term基數。
GlobalOrdinalsStringTermsAggregator$LeafBucketCollector.collect.globalOrds.cardinality()。
GlobalOrdinalMapping.cardinality()。
//遍歷doc ords。
GlobalOrdinalsStringTermsAggregator$LeafBucketCollector.collect.globalOrds.ordAt(i)。
GlobalOrdinalMapping.ordAt(i)。

2:docCounts(IntArray)對應的ord count++。
如果啟用bucketOrds(稀疏處理,見2.3),則將ord對映到bucketOrd,docCounts的bucketOrd位置 count++。

相關程式碼:

//將ord對應count++。傳入doc,用於sub collect。
GlobalOrdinalsStringTermsAggregator.collectGlobalOrd。

2.4.5:取topDocs。TopScoreDocCollector collect時僅保留topN。在此每次取堆頂元素,得到逆序的topN。

相關程式碼:

TopDocsCollector.topDocs。

2.4.6:根據聚合資料,按docCount取topN,排序。
根據aggregator的資料,按docCount構建小頂堆。
每次取走堆頂元素,逆序放入陣列,得到降序的topN。
設定termBytes。

相關程式碼:

AggregationPhase.execute。
GlobalOrdinalsStringTermsAggregator.buildAggregation。
PriorityQueue.updateTop/downHeap/insertWithOverflow。
//根據globalOrd從所有segment中獲取第一個含有該globalOrd的segment,並從該segment中讀取term值BytesRef。
GlobalOrdinalMapping.lookupOrd。

3:協調節點reduce 各個shard返回的結果。
使用各shard返回的有序結果,構造堆,合併聚合,合併TopDocs。

相關程式碼:

InitialSearchPhase.onShardResult。
InitialSearchPhase.onShardFailure。
//reduce結果
FetchSearchPhase.innerRun.resultConsumer.reduce。
SearchPhaseController.reducedQueryPhase。
SearchPhaseController.sortDocs。
//mergeTopDocs
SearchPhaseController.mergeTopDocs。
TopDocs.merge。
TopDocs.mergeAux。
PriorityQueue。

4:fetch資料。
協調發送fecth請求到相關shard,資料節點從stored field中fetch結果。

相關程式碼:

FetchSearchPhase.innerRun。

參考:
source code: elasticsearch 5.6.12, lucene 6.6.1。
https://www.elastic.co/blog/lucene-points-6.0

PointRangeQuery:abstract class竟然可以有構造方