lucene原始碼分析---14
lucene原始碼分析—刪除索引
本章介紹lucene中索引的刪除,主要介紹IndexWriter的deleteDocuments函式,該函式可以介紹兩種引數,一種是Term,將所有包含該Term的文件都刪除,另一種是Query,刪除所有根據該Query查詢得到的文件。本章只介紹第一種情況,以下面的一個例子開始。再次申明一下,博文裡的基本上所有程式碼都在不影響整體功能的情況下進行了或多或少的修改或刪除以方便閱讀。
indexWriter.deleteDocuments(new Term("id", "value"));
indexWriter.commit();
該例子首先建立一個Term,域名為“id”,值為“value”,然後通過IndexWriter的deleteDocuments函式新增刪除操作,最後通過commit函式真正執行刪除。
IndexWriter::deleteDocuments
public void deleteDocuments(Term... terms) throws IOException {
if (docWriter.deleteTerms(terms)) {
processEvents(true, false);
}
}
IndexWriter的成員變數docWriter是在其建構函式中建立的DocumentsWriter。deleteDocuments函式首先通過DocumentsWriter的deleteTerms函式執行主要的刪除工作,然後呼叫processEvents函式處理刪除過程中產生的事件,例如ApplyDeletesEvent、MergePendingEvent、ForcedPurgeEvent。
1. DocumentsWriter::deleteTerms
IndexWriter::deleteDocuments->DocumentsWriter::deleteTerms
synchronized boolean deleteTerms(final Term... terms) throws IOException {
final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
deleteQueue.addDelete(terms);
flushControl.doOnDelete();
return applyAllDeletes(deleteQueue);
}
DocumentsWriter的成員變數deleteQueue被初始化為DocumentsWriterDeleteQueue佇列,接下來通過addDelete函式將待刪除的Term列表新增到該佇列中。doOnDelete和applyAllDeletes函式會根據條件將佇列中新增的刪除資訊直接新增到快取中,本文不考慮這種情況,即如果是先刪除的詞,再新增的文件,則不會對後新增的文件進行操作操作。
IndexWriter::deleteDocuments->DocumentsWriter::deleteTerms->DocumentsWriterDeleteQueue::addDelete
void addDelete(Term... terms) {
add(new TermArrayNode(terms));
tryApplyGlobalSlice();
}
首先將待刪除的Term陣列封裝成TermArrayNode,TermArrayNode繼承自Node實現連結串列操作。
add函式最終將Term陣列新增到連結串列中,其函式內部實現了原子新增操作。tryApplyGlobalSlice函式用於將新增的節點寫入DocumentsWriterDeleteQueue的globalBufferedUpdates快取中。
IndexWriter::deleteDocuments->DocumentsWriter::deleteTerms->DocumentsWriterDeleteQueue::addDelete->add
void add(Node<?> item) {
final Node<?> currentTail = this.tail;
if (currentTail.casNext(null, item)) {
tailUpdater.compareAndSet(this, currentTail, item);
return;
}
}
tail成員變數是連結串列中的尾節點,也即最新節點currentTail。首先通過casNext函式在尾節點currentTail的下一個節點位置上插入item,然後設定DocumentsWriterDeleteQueue的tail指向最新插入的節點item。
IndexWriter::deleteDocuments->DocumentsWriter::deleteTerms->DocumentsWriterDeleteQueue::addDelete->tryApplyGlobalSlice
void tryApplyGlobalSlice() {
if (updateSlice(globalSlice)) {
globalSlice.apply(globalBufferedUpdates, BufferedUpdates.MAX_INT);
}
}
boolean updateSlice(DeleteSlice slice) {
if (slice.sliceTail != tail) {
slice.sliceTail = tail;
return true;
}
return false;
}
DocumentsWriterDeleteQueue的成員變數globalSlice被設定為DeleteSlice,其內部的成員變數sliceHead和sliceTail分別指向當前Slice的頭節點和尾節點,updateSlice函式將sliceTail更新為最新插入的節點,也即上面add函式中最後插入的item。然後通過DeleteSlice的apply函式將資料寫入globalBufferedUpdates中。
IndexWriter::deleteDocuments->DocumentsWriter::deleteTerms->DocumentsWriterDeleteQueue::addDelete->tryApplyGlobalSlice->DeleteSlice::apply
void apply(BufferedUpdates del, int docIDUpto) {
Node<?> current = sliceHead;
do {
current = current.next;
current.apply(del, docIDUpto);
} while (current != sliceTail);
reset();
}
void reset() {
sliceHead = sliceTail;
}
void apply(BufferedUpdates bufferedUpdates, int docIDUpto) {
for (Term term : item) {
bufferedUpdates.addTerm(term, docIDUpto);
}
}
首先獲得整個Slice的頭節點,即sliceHead,然後依次遍歷直至尾節點sliceTail,依次呼叫每個node的apply函式將要刪除的Term資訊設定進del即globalBufferedUpdates中,最後通過reset函式重置slice。
2. IndexWriter::commit
根據deleteTerms函式的分析,刪除的Term資訊最終會被儲存在DocumentsWriter的DocumentsWriterDeleteQueue的globalBufferedUpdates中,接下來通過IndexWriter的commit的函式要取出這部分資訊執行刪除操作了,下面來看。
IndexWriter::commit->commitInternal->prepareCommitInternal
public final void commit() throws IOException {
commitInternal(config.getMergePolicy());
}
private final void commitInternal(MergePolicy mergePolicy) throws IOException {
prepareCommitInternal(mergePolicy);
finishCommit();
}
IndexWriter的成員變數config在其建構函式中被建立為IndexWriterConfig,getMergePolicy返回預設的合併策略TieredMergePolicy,後面會介紹該類的函式。
commitInternal函式首先通過prepareCommitInternal函式執行文件刪除的最主要操作,再呼叫finishCommit函式執行一些收尾工作,下面一一來看。
IndexWriter::commit->commitInternal->prepareCommitInternal
private void prepareCommitInternal(MergePolicy mergePolicy) throws IOException {
boolean anySegmentsFlushed = docWriter.flushAllThreads();
processEvents(false, true);
maybeApplyDeletes(true);
SegmentInfos toCommit = segmentInfos.clone();
...
if (anySegmentsFlushed) {
maybeMerge(mergePolicy, MergeTrigger.FULL_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
}
startCommit(toCommit);
}
flushAllThreads函式執行最主要的flush操作,processEvents函式監聽flush產生的事件,maybeApplyDeletes進行具體的刪除操作,最終會在有刪除的段建立.liv檔案,省略的部分是進行一些初始化工作。
anySegmentsFlushed標誌位表示在flush時有一些新的段產生,此時呼叫maybeMerge操作找出需要合併的段並對其進行合併,lucene的合併操作將在下一章介紹。
2.1 flushAllThreads
IndexWriter::commit->commitInternal->prepareCommitInternal->DocumentsWriter::flushAllThreads
boolean flushAllThreads() {
flushControl.markForFullFlush();
boolean anythingFlushed = false;
DocumentsWriterPerThread flushingDWPT;
while ((flushingDWPT = flushControl.nextPendingFlush()) != null) {
anythingFlushed |= doFlush(flushingDWPT);
}
ticketQueue.forcePurge(writer);
return anythingFlushed;
}
DocumentsWriter的成員變數flushControl被建立為DocumentsWriterFlushControl,其markForFullFlush函式從執行緒池中選擇對應的DocumentsWriterPerThread新增到DocumentsWriterFlushControl的flushQueue中等待flush。
flushAllThreads函式接下來通過nextPendingFlush函式遍歷DocumentsWriterFlushControl中的flushQueue,依次取出在markForFullFlush函式中新增的DocumentsWriterPerThread,然後呼叫doFlush函式進行處理。
doFlush函式是執行flush的主要函式,該函式將新增的文件寫入lucene的各個檔案中,doFlush函式還將DocumentsWriterPerThread中關聯的刪除資訊封裝成SegmentFlushTicket再新增到DocumentsWriterFlushQueue佇列中等待處理。
最後的forcePurge函式從將DocumentsWriterFlushQueue佇列中依次取出SegmentFlushTicket並新增到IndexWriter的BufferedUpdatesStream快取中等待最後的處理。
這裡值得注意的是刪除資訊再各個類的各個結構之間傳遞來傳遞去是有意義的,因為flush操作會產生段的增加和更新操作,容易和刪除操作產生衝突。
2.1.1 markForFullFlush
markForFullFlush函式的主要任務是從執行緒池DocumentsWriterPerThreadPool中選擇和當前刪除佇列DocumentsWriterDeleteQueue一致的ThreadState,將其中的DocumentsWriterPerThread新增到flushQueue中等待處理。
IndexWriter::commit->commitInternal->prepareCommitInternal->DocumentsWriter::flushAllThreads->DocumentsWriterFlushControl::markForFullFlush
void markForFullFlush() {
final DocumentsWriterDeleteQueue flushingQueue = documentsWriter.deleteQueue;;
DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(flushingQueue.generation+1);
documentsWriter.deleteQueue = newQueue;
final int limit = perThreadPool.getActiveThreadStateCount();
for (int i = 0; i < limit; i++) {
final ThreadState next = perThreadPool.getThreadState(i);
if (next.dwpt.deleteQueue != flushingQueue) {
continue;
}
addFlushableState(next);
}
flushQueue.addAll(fullFlushBuffer);
fullFlushBuffer.clear();
}
markForFullFlush函式首先獲取前面建立的DocumentsWriterDeleteQueue,其內部儲存了刪除的Term資訊,然後建立一個新的DocumentsWriterDeleteQueue用於重置DocumentsWriter中原來的deleteQueue。
成員變數perThreadPool預設為DocumentsWriterPerThreadPool執行緒池,getActiveThreadStateCount函式獲取執行緒池中可用執行緒的數量,然後通過getThreadState從中獲取ThreadState,通過其成員變數dwpt即DocumentsWriterPerThread的deleteQueue是否與前面建立的DocumentsWriterDeleteQueue一致來判斷是否為對應的執行緒,如果不是,就繼續遍歷執行緒池尋找,如果找到,就通過addFlushableState函式將其中的DocumentsWriterPerThread新增到DocumentsWriter的成員變數fullFlushBuffer和flushingWriters中,並重置該ThreadState。
最後將fullFlushBuffer中剛剛新增的所有DocumentsWriterPerThread新增到flushQueue中,並清空fullFlushBuffer。
IndexWriter::commit->commitInternal->prepareCommitInternal->DocumentsWriter::flushAllThreads->DocumentsWriterFlushControl::markForFullFlush->addFlushableState
void addFlushableState(ThreadState perThread) {
final DocumentsWriterPerThread dwpt = perThread.dwpt;
if (dwpt.getNumDocsInRAM() > 0) {
if (!perThread.flushPending) {
setFlushPending(perThread);
}
final DocumentsWriterPerThread flushingDWPT = internalTryCheckOutForFlush(perThread);
fullFlushBuffer.add(flushingDWPT);
}
}
如果當前DocumentsWriterPerThread在記憶體中的文件數量getNumDocsInRAM()大於0,就通過setFlushPending將該ThreadState的flushPending設定為true,表示該DocumentsWriterPerThread需要flush。
internalTryCheckOutForFlush函式將ThreadState中的DocumentsWriterPerThread儲存在成員變數flushingWriters中並返回,同時重置該ThreadState。
最後將該DocumentsWriterPerThread新增到fullFlushBuffer中,fullFlushBuffer是一個DocumentsWriterPerThread列表。
IndexWriter::commit->commitInternal->prepareCommitInternal->DocumentsWriter::flushAllThreads->DocumentsWriterFlushControl::markForFullFlush->addFlushableState->internalTryCheckOutForFlush
private DocumentsWriterPerThread internalTryCheckOutForFlush(ThreadState perThread) {
final long bytes = perThread.bytesUsed;
DocumentsWriterPerThread dwpt = perThreadPool.reset(perThread);
flushingWriters.put(dwpt, Long.valueOf(bytes));
return dwpt;
}
DocumentsWriterPerThread reset(ThreadState threadState) {
final DocumentsWriterPerThread dwpt = threadState.dwpt;
threadState.reset();
return dwpt;
}
internalTryCheckOutForFlush函式通過DocumentsWriterPerThreadPool的reset函式重置DocumentsWriterPerThread,並獲取其中的DocumentsWriterPerThread。然後將該DocumentsWriterPerThread新增到flushingWriters中,flushingWriters是一個map。
2.1.2 doFlush
doFlush函式在《lucene原始碼分析—5》中已經重點介紹過了,該函式和刪除操作並沒有直接聯絡,這裡為了完整性,只看其中和本章相關的部分程式碼。
IndexWriter::commit->commitInternal->prepareCommitInternal->DocumentsWriter::flushAllThreads->doFlush
private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException, AbortingException {
boolean hasEvents = false;
while (flushingDWPT != null) {
hasEvents = true;
SegmentFlushTicket ticket = ticketQueue.addFlushTicket(flushingDWPT);
final int flushingDocsInRam = flushingDWPT.getNumDocsInRAM();
final FlushedSegment newSegment = flushingDWPT.flush();
ticketQueue.addSegment(ticket, newSegment);
subtractFlushedNumDocs(flushingDocsInRam);
flushControl.doAfterFlush(flushingDWPT);
flushingDWPT = flushControl.nextPendingFlush();
}
return hasEvents;
}
ticketQueue被建立為DocumentsWriterFlushQueue,代表flush的佇列,其addFlushTicket函式將DocumentsWriterPerThread中的待刪除資訊封裝成一個SegmentFlushTicket,儲存在queue列表中並返回。
getNumDocsInRAM函式返回DocumentsWriterPerThread中的文件數,該數量是新新增的文件數量,在其finishDocument函式中遞增。
DocumentsWriterPerThread的flush函式完成最主要的flush操作,該函式向lucene的各個檔案中儲存更新的文件資訊,並返回新建立的段資訊FlushedSegment。
addSegment函式將flush後新的段資訊FlushedSegment新增到SegmentFlushTicket中。
subtractFlushedNumDocs函式將待flush的文件數減去剛剛通過flush函式儲存到檔案中的文件數,並更新numDocsInRAM變數。
IndexWriter::commit->commitInternal->prepareCommitInternal->DocumentsWriter::flushAllThreads->doFlush->DocumentsWriterFlushQueue::addFlushTicket
synchronized SegmentFlushTicket addFlushTicket(DocumentsWriterPerThread dwpt) {
final SegmentFlushTicket ticket = new SegmentFlushTicket(dwpt.prepareFlush());
queue.add(ticket);
return ticket;
}
DocumentsWriterPerThread的prepareFlush函式將待刪除的資訊封裝成FrozenBufferedUpdates並返回。然後再將該FrozenBufferedUpdates封裝成SegmentFlushTicket,最後新增到queue列表中並返回。
FrozenBufferedUpdates是對刪除資訊的一種更高效的封裝。
IndexWriter::commit->commitInternal->prepareCommitInternal->DocumentsWriter::flushAllThreads->doFlush->DocumentsWriterFlushQueue::addFlushTicket->DocumentsWriterPerThread::prepareFlush
FrozenBufferedUpdates prepareFlush() {
final FrozenBufferedUpdates globalUpdates = deleteQueue.freezeGlobalBuffer(deleteSlice);
return globalUpdates;
}
FrozenBufferedUpdates freezeGlobalBuffer(DeleteSlice callerSlice) {
...
final FrozenBufferedUpdates packet = new FrozenBufferedUpdates(globalBufferedUpdates, false);
globalBufferedUpdates.clear();
return packet;
}
freezeGlobalBuffer函式的主要任務是將DocumentsWriterDeleteQueue刪除佇列中的刪除資訊globalBufferedUpdates封裝成FrozenBufferedUpdates並返回,然後清空globalBufferedUpdates。
IndexWriter::commit->commitInternal->prepareCommitInternal->DocumentsWriter::flushAllThreads->doFlush->DocumentsWriterPerThread::flush
FlushedSegment flush() throws IOException, AbortingException {
segmentInfo.setMaxDoc(numDocsInRAM);
final SegmentWriteState flushState = new SegmentWriteState(infoStream, directory, segmentInfo, fieldInfos.finish(), pendingUpdates, new IOContext(new FlushInfo(numDocsInRAM, bytesUsed())));
final double startMBUsed = bytesUsed() / 1024. / 1024.;
consumer.flush(flushState);
pendingUpdates.terms.clear();
segmentInfo.setFiles(new HashSet<>(directory.getCreatedFiles()));
final SegmentCommitInfo segmentInfoPerCommit = new SegmentCommitInfo(segmentInfo, 0, -1L, -1L, -1L);
pendingUpdates.clear();
BufferedUpdates segmentDeletes= null;
FlushedSegment fs = new FlushedSegment(segmentInfoPerCommit, flushState.fieldInfos, segmentDeletes, flushState.liveDocs, flushState.delCountOnFlush);
sealFlushedSegment(fs);
return fs;
}
flush函式首先向segmentInfo中新增更新的文件數量資訊。FieldInfos.Builder的finish函式返回FieldInfos,內部封裝了所有域Field的資訊。接下來建立FlushInfo,包含了文件數和記憶體位元組資訊,然後建立IOContext,進而建立SegmentWriteState。
成員變數consumer為DefaultIndexingChain,其flush函式最終向.doc、.tim、.nvd、.pos、.fdx、.nvm、.fnm、.fdt、.tip檔案寫入資訊。
完成資訊的寫入後,接下來清空pendingUpdates中的terms資訊,並向段segmentInfo中設定新建立的檔名,然後清空pendingUpdates。
flush最後建立FlushedSegment,然後通過sealFlushedSegment函式建立新的.si檔案並向其中寫入段資訊。
IndexWriter::commit->commitInternal->prepareCommitInternal->DocumentsWriter::flushAllThreads->doFlush->DocumentsWriterFlushControl::doAfterFlush
synchronized void doAfterFlush(DocumentsWriterPerThread dwpt) {
Long bytes = flushingWriters.remove(dwpt);
flushBytes -= bytes.longValue();
perThreadPool.recycle(dwpt);
}
doAfterFlush函式從flushingWriters中移除剛剛處理過的DocumentsWriterPerThread,然後將flushBytes減去剛剛flush的位元組數,recycle函式回收DocumentsWriterPerThread,便於重複利用。
2.1.3 forcePurge
回到flushAllThreads函式中,markForFullFlush函式將執行緒池中的DocumentsWriterPerThread新增到flushQueue中等待處理;doFlush函式依次處理flushQueue中的每個DocumentsWriterPerThread,將更新的文件flush到各個檔案中,並將DocumentsWriterDeleteQueue中的刪除資訊封裝成SegmentFlushTicket再新增到DocumentsWriterFlushQueue佇列中等待處理;本小節分析的forcePurge函式最終將DocumentsWriterFlushQueue中的刪除資訊新增到IndexWriter的BufferedUpdatesStream中等待最終的處理,下面來看。
IndexWriter::commit->commitInternal->prepareCommitInternal->DocumentsWriter::flushAllThreads->DocumentsWriterFlushQueue::forcePurge
int forcePurge(IndexWriter writer) throws IOException {
return innerPurge(writer);
}
private int innerPurge(IndexWriter writer) throws IOException {
int numPurged = 0;
while (true) {
final FlushTicket head = queue.peek();
numPurged++;
head.publish(writer);
queue.poll();
ticketCount.decrementAndGet();
}
return numPurged;
}
doFlush函式將待刪除的Term資訊封裝成SegmentFlushTicket新增到DocumentsWriterFlushQueue的queue成員變數,innerPurge函式依次從該佇列中獲取SegmentFlushTicket,呼叫其publish函式將其寫入IndexWriter的BufferedUpdatesStream中。操作成功後通過poll函式從佇列中刪除該SegmentFlushTicket。
IndexWriter::commit->commitInternal->prepareCommitInternal->DocumentsWriter::flushAllThreads->DocumentsWriterFlushQueue::forcePurge->innerPurge->SegmentFlushTicket::publish
protected void publish(IndexWriter writer) throws IOException {
finishFlush(writer, segment, frozenUpdates);
}
protected final void finishFlush(IndexWriter indexWriter, FlushedSegment newSegment, FrozenBufferedUpdates bufferedUpdates) throws IOException {
publishFlushedSegment(indexWriter, newSegment, bufferedUpdates);
}
protected final void publishFlushedSegment(IndexWriter indexWriter, FlushedSegment newSegment, FrozenBufferedUpdates globalPacket) throws IOException {
indexWriter.publishFlushedSegment(newSegment.segmentInfo, segmentUpdates, globalPacket);
}
void publishFlushedSegment(SegmentCommitInfo newSegment, FrozenBufferedUpdates packet, FrozenBufferedUpdates globalPacket) throws IOException {
bufferedUpdatesStream.push(globalPacket);
nextGen = bufferedUpdatesStream.getNextGen();
newSegment.setBufferedDeletesGen(nextGen);
segmentInfos.add(newSegment);
checkpoint();
}
public synchronized long push(FrozenBufferedUpdates packet) {
packet.setDelGen(nextGen++);
updates.add(packet);
numTerms.addAndGet(packet.numTermDeletes);
bytesUsed.addAndGet(packet.bytesUsed);
return packet.delGen();
}
SegmentFlushTicket的publish函式最終會呼叫IndexWriter的publishFlushedSegment函式,傳入的第一個引數為新增的段資訊,第二個引數在Term刪除時為null,不管它,最後一個引數就是前面建立的FrozenBufferedUpdates ,封裝了Term的刪除資訊。
publishFlushedSegment通過BufferedUpdatesStream的push函式新增packet即FrozenBufferedUpdates,最終新增到BufferedUpdatesStream的updates列表中並更新相應資訊。
publishFlushedSegment函式接下來將新增的段newSegment新增到SegmentInfos中,最後通過checkpoint函式更新檔案的引用次數,在必要時刪除檔案,本章最後會分析該函式。
2.2 processEvents
回到prepareCommitInternal函式中,下面簡單介紹一下processEvents函式,flushAllThreads函式操作過後會產生各類事件,processEvents函式監聽這些事件並執行相應的操作。
IndexWriter::commit->commitInternal->prepareCommitInternal->processEvents
private boolean processEvents(boolean triggerMerge, boolean forcePurge) throws IOException {
return processEvents(eventQueue, triggerMerge, forcePurge);
}
private boolean processEvents(Queue<Event> queue, boolean triggerMerge, boolean forcePurge) throws IOException {
boolean processed = false;
if (tragedy == null) {
Event event;
while((event = queue.poll()) != null) {
processed = true;
event.process(this, triggerMerge, forcePurge);
}
}
return processed;
}
lucene預設實現的event包括ApplyDeletesEvent、MergePendingEvent、ForcedPurgeEvent,processEvents函式從事件佇列queue中依次取出這些事件,並呼叫process函式執行操作。這些事件和本章介紹的內容沒有直接關係,這裡就不往下看了。
2.3 maybeApplyDeletes
maybeApplyDeletes是lucene執行刪除的最主要函式,下面重點分析該函式。
IndexWriter::commit->commitInternal->prepareCommitInternal->maybeApplyDeletes
final synchronized boolean maybeApplyDeletes(boolean applyAllDeletes) throws IOException {
return applyAllDeletesAndUpdates();
}
final synchronized boolean applyAllDeletesAndUpdates() throws IOException {
final BufferedUpdatesStream.ApplyDeletesResult result;
result = bufferedUpdatesStream.applyDeletesAndUpdates(readerPool, segmentInfos.asList());
if (result.anyDeletes) {
checkpoint();
}
if (!keepFullyDeletedSegments && result.allDeleted != null) {
for (SegmentCommitInfo info : result.allDeleted) {
if (!mergingSegments.contains(info)) {
segmentInfos.remove(info);
pendingNumDocs.addAndGet(-info.info.maxDoc());
readerPool.drop(info);
}
}
checkpoint();
}
bufferedUpdatesStream.prune(segmentInfos);
return result.anyDeletes;
}
BufferedUpdatesStream的applyDeletesAndUpdates執行主要的刪除操作,最終將刪除的文件ID標記在對應段的.liv檔案中。
如果有文件被刪除,則呼叫checkpoint函式遞減對應段的引用字數,如果引用計數到達0,則刪除該檔案。
keepFullyDeletedSegments標記表示當一個段的文件被全部刪除時,是否要刪除對應的段,如果此時有的段文件被全部刪除了,則遍歷對應的段,從segmentInfos中刪除該段,pendingNumDocs刪除對應段的所有文件數,再從ReaderPool中刪除該段。
最後的BufferedUpdatesStream的prune函式繼續做一些收尾工作,刪除前面建立的FrozenBufferedUpdates。
IndexWriter::commit->commitInternal->prepareCommitInternal->applyAllDeletesAndUpdates->maybeApplyDeletes->BufferedUpdatesStream::applyDeletesAndUpdates
public synchronized ApplyDeletesResult applyDeletesAndUpdates(IndexWriter.ReaderPool pool, List<SegmentCommitInfo> infos) throws IOException {
SegmentState[] segStates = null;
long totDelCount = 0;
long totTermVisitedCount = 0;
boolean success = false;
ApplyDeletesResult result = null;
infos = sortByDelGen(infos);
CoalescedUpdates coalescedUpdates = null;
int infosIDX = infos.size()-1;
int delIDX = updates.size()-1;
while (infosIDX >= 0) {
final FrozenBufferedUpdates packet = delIDX >= 0 ? updates.get(delIDX) : null;
final SegmentCommitInfo info = infos.get(infosIDX);
final long segGen = info.getBufferedDeletesGen();
if (packet != null && segGen < packet.delGen()) {
if (!packet.isSegmentPrivate && packet.any()) {
if (coalescedUpdates == null) {
coalescedUpdates = new CoalescedUpdates();
}
coalescedUpdates.update(packet);
}
delIDX--;
} else if (packet != null && segGen == packet.delGen()) {
...
} else {
if (coalescedUpdates != null) {
segStates = openSegmentStates(pool, infos);
SegmentState segState = segStates[infosIDX];
int delCount = 0;
delCount += applyQueryDeletes(coalescedUpdates.queriesIterable(), segState);
DocValuesFieldUpdates.Container dvUpdates = new DocValuesFieldUpdates.Container();
applyDocValuesUpdatesList(coalescedUpdates.numericDVUpdates, segState, dvUpdates);
applyDocValuesUpdatesList(coalescedUpdates.binaryDVUpdates, segState, dvUpdates);
if (dvUpdates.any()) {
segState.rld.writeFieldUpdates(info.info.dir, dvUpdates);
}
totDelCount += delCount;
}
infosIDX--;
}
}
if (coalescedUpdates != null && coalescedUpdates.totalTermCount != 0) {
if (segStates == null) {
segStates = openSegmentStates(pool, infos);
}
totTermVisitedCount += applyTermDeletes(coalescedUpdates, segStates);
}
result = closeSegmentStates(pool, segStates, success, gen);
return result;
}
applyDeletesAndUpdates函式首先進入迴圈,遍歷所有的段,然後從成員變數updates列表中獲取在forcePurge函式中新增的FrozenBufferedUpdates,並獲取段資訊SegmentCommitInfo,再通過getBufferedDeletesGen函式獲取該段的bufferedDeletesGen變數,用來表示操作的時間順序,這裡暫時叫做更新度。
下面的三個條件語句,第一個if表示要刪除的FrozenBufferedUpdates存在,並且段的更新度小於要刪除資料的更新度,表示可以刪除,此時建立CoalescedUpdates,用來封裝刪除的資訊,例如有些刪除是通過Term,有些刪除通過Query,這裡全部封裝起來。
第二個if語句表示更新度相同,這裡不考慮這種情況。
第三個if語句表示對應的段有需要刪除的資料,首先通過openSegmentStates函式將段資訊封裝成SegmentState,再通過applyQueryDeletes刪除Query指定的刪除資訊,然後呼叫applyDocValuesUpdatesList函式檢查是否有更新,如果有,則通過writeFieldUpdates進行更新,這裡假設沒有更新。
退出迴圈後,coalescedUpdates封裝了待刪除的Term資訊,如果不為null,則通過applyTermDeletes執行刪除操作。
刪除完成後通過closeSegmentStates函式獲取是否某段中的所有檔案都被刪除了,將該結果封裝成ApplyDeletesResult並返回,該函式還會呼叫SegmentState的finish函式將applyTermDeletes函式中的標記寫入到.liv檔案中去,這裡就不往下看了。
IndexWriter::commit->commitInternal->prepareCommitInternal->maybeApplyDeletes->BufferedUpdatesStream::applyAllDeletesAndUpdates->CoalescedUpdates::update
void update(FrozenBufferedUpdates in) {
totalTermCount += in.terms.size();
terms.add(in.terms);
...
}
CoalescedUpdates的update函式用於封裝刪除資訊,如果是通過Term刪除,則直接新增到成員變數terms列表中。
IndexWriter::commit->commitInternal->prepareCommitInternal->maybeApplyDeletes->BufferedUpdatesStream::applyAllDeletesAndUpdates->applyTermDeletes
private synchronized long applyTermDeletes(CoalescedUpdates updates, SegmentState[] segStates) throws IOException {
long startNS = System.nanoTime();
int numReaders = segStates.length;
long delTermVisitedCount = 0;
long segTermVisitedCount = 0;
FieldTermIterator iter = updates.termIterator();
String field = null;
SegmentQueue queue = null;
BytesRef term;
while ((term = iter.next()) != null) {
if (iter.field() != field) {
field = iter.field();
queue = new SegmentQueue(numReaders);
long segTermCount = 0;
for(int i=0;i<numReaders;i++) {
SegmentState state = segStates[i];
Terms terms = state.reader.fields().terms(field);
if (terms != null) {
segTermCount += terms.size();
state.termsEnum = terms.iterator();
state.term = state.termsEnum.next();
if (state.term != null) {
queue.add(state);
}
}
}
}
delTermVisitedCount++;
long delGen = iter.delGen();
while (queue.size() != 0) {
SegmentState state = queue.top();
segTermVisitedCount++;
int cmp = term.compareTo(state.term);
if (cmp < 0) {
break;
} else if (cmp == 0) {
} else {
TermsEnum.SeekStatus status = state.termsEnum.seekCeil(term);
if (status == TermsEnum.SeekStatus.FOUND) {
} else {
if (status == TermsEnum.SeekStatus.NOT_FOUND) {
state.term = state.termsEnum.term();
queue.updateTop();
} else {
queue.pop();
}
continue;
}
}
if (state.delGen < delGen) {
final Bits acceptDocs = state.rld.getLiveDocs();
state.postingsEnum = state.termsEnum.postings(state.postingsEnum, PostingsEnum.NONE);
while (true) {
final int docID = state.postingsEnum.nextDoc();
if (docID == DocIdSetIterator.NO_MORE_DOCS) {
break;
}
if (acceptDocs != null && acceptDocs.get(docID) == false) {
continue;
}
if (!state.any) {
state.rld.initWritableLiveDocs();
state.any = true;
}
state.rld.delete(docID);
}
}
state.term = state.termsEnum.next();
if (state.term == null) {
queue.pop();
} else {
queue.updateTop();
}
}
}
return delTermVisitedCount;
}
applyTermDeletes函式進行具體的Term刪除操作,首先通過termIterator函式獲得Term的迭代器TermIterator。
第一個if語句表示field域有變化,首先通過field函式獲得要刪除的Term所在的域,然後建立SegmentQueue用來排序,再針對每個段,獲取其中的FieldReader,並新增到SegmentQueue中。
接下來通過compareTo函式比較待刪除的Term(term)和該段中儲存的Term(state.term),因為段中的詞是經過排序的,因此比較的結果cmp小於0代表該段沒有要找的詞,直接break返回。如果相等,則什麼也不做,進行下一步,如果大於0,則需要通過seekCeil函式繼續在該段尋找詞,如果找到了,則也繼續進行下一步,如果沒找到,則要通過updateTop函式更新SegmentQueue佇列,等待查詢下一個詞,如果是其他情況,則直接退出該段的查詢過程。
函式到達第二個if迴圈表示在該段找到了待刪除的詞,如果delGen表示更新度小於刪除Term的更新度,則表示該詞建立的時間要早於刪除詞的時間,此時要進行刪除操作,進入while迴圈,迴圈讀取包含該次的下一個文件id,獲得文件id後,就要將其記錄在.liv檔案中,如果還未初始化,就先要呼叫initWritableLiveDocs函式初始化對應段的.liv檔案。初始化完成後就通過delete函式在.liv檔案對應的快取中標記刪除。
再往下再檢查是否段沒有詞了,沒有就通過pop函式從queue中刪除該段,如果還有,則呼叫updateTop函式更新佇列,等待下一個詞的咋找。最後如果兩個段都從佇列queue刪除了,則退出while迴圈。
最後返回刪除的詞的數量delTermVisitedCount。
IndexWriter::commit->commitInternal->prepareCommitInternal->maybeApplyDeletes->BufferedUpdatesStream::applyAllDeletesAndUpdates->applyTermDeletes ->ReadersAndUpdates::initWritableLiveDocs
public synchronized void initWritableLiveDocs() throws IOException {
if (liveDocsShared) {
LiveDocsFormat liveDocsFormat = info.info.getCodec().liveDocsFormat();
if (liveDocs == null) {
liveDocs = liveDocsFormat.newLiveDocs(info.info.maxDoc());
} else {
liveDocs = liveDocsFormat.newLiveDocs(liveDocs);
}
liveDocsShared = false;
}
}
liveDocsFormat函式最後返回Lucene50LiveDocsFormat,然後呼叫其newLiveDocs函式進行初始化,返回一個FixedBitSet,該結構用bit位記錄文件id用於判斷該文件是否未被刪除。
IndexWriter::commit->commitInternal->prepareCommitInternal->maybeApplyDeletes->BufferedUpdatesStream::applyAllDeletesAndUpdates->applyTermDeletes ->ReadersAndUpdates::delete
public synchronized boolean delete(int docID) {
final boolean didDelete = liveDocs.get(docID);
if (didDelete) {
((MutableBits) liveDocs).clear(docID);
pendingDeleteCount++;
}
return didDelete;
}
delete函式根據文件id,在前面建立的FixedBitSet裡標記位置表示刪除,注意這裡是將刪除後保留的文件id對應的bit位置上標記為1。
IndexWriter::commit->commitInternal->prepareCommitInternal->maybeApplyDeletes->BufferedUpdatesStream::applyAllDeletesAndUpdates->closeSegmentStates
private ApplyDeletesResult closeSegmentStates(IndexWriter.ReaderPool pool, SegmentState[] segStates, boolean success, long gen) throws IOException {
int numReaders = segStates.length;
Throwable firstExc = null;
List<SegmentCommitInfo> allDeleted = new ArrayList<>();
long totDelCount = 0;
for (int j=0;j<numReaders;j++) {
SegmentState segState = segStates[j];
totDelCount += segState.rld.getPendingDeleteCount() - segState.startDelCount;
segState.reader.getSegmentInfo().setBufferedDeletesGen(gen);
int fullDelCount = segState.rld.info.getDelCount() + segState.rld.getPendingDeleteCount();
if (fullDelCount == segState.rld.info.info.maxDoc()) {
allDeleted.add(segState.reader.getSegmentInfo());
}
segStates[j].finish(pool);
}
return new ApplyDeletesResult(totDelCount > 0, gen, allDeleted);
}
closeSegmentStates函式在刪除操作執行後檢查是否刪除了某個段的所有文件,如果有,就將其新增到allDeleted列表中,最終在返回時封裝成ApplyDeletesResult。closeSegmentStates函式也會遍歷每個段,對前面建立的SegmentState執行finish函式,將對應的FixedBitSet結構寫入.liv檔案中去,因為涉及到檔案格式,這裡就不往下看了。
2.4 startCommit
IndexWriter::commit->commitInternal->prepareCommitInternal->startCommit
private void startCommit(final SegmentInfos toSync) throws IOException {
...
toSync.prepareCommit(directory);
...
filesToSync = toSync.files(false);
directory.sync(filesToSync);
...
}
執行到這裡時,有一些段由於flush操作新生成,有一些段有資料生成,有一些段進行了合併操作,執行到這裡,需要對segments段檔案執行更新操作。
首先呼叫SegmentInfos的prepareCommit函式,建立pending_segments檔案,向其寫入基本資訊。
然後通過files函式獲得建立後索引目錄下最終的所有檔案,再通過sync函式將檔案同步到硬碟中去。
2.5 finishCommit
執行到這裡,主要的刪除任務已經結束,最終刪除的文件ID會被標記在對應段的.liv檔案中,finishCommit函式完成接下來的收尾工作。
IndexWriter::commit->commitInternal->finishCommit
private final void finishCommit() throws IOException {
pendingCommit.finishCommit(directory);
deleter.checkpoint(pendingCommit, true);
segmentInfos.updateGeneration(pendingCommit);
rollbackSegments = pendingCommit.createBackupSegmentInfos();
}
finishCommit函式重新命名前面建立的臨時段檔案。成員變數deleter被初始化為IndexFileDeleter,其checkpoint函式檢查是否有待刪除的檔案並將其刪除。
updateGeneration函式更新段的generation資訊。createBackupSegmentInfos函式備份當前段的最新資訊儲存在rollbackSegments中。
IndexWriter::commit->commitInternal->finishCommit->SegmentInfos::finishCommit
final String finishCommit(Directory dir) throws IOException {
final String src = IndexFileNames.fileNameFromGeneration(IndexFileNames.PENDING_SEGMENTS, "", generation);
String dest = IndexFileNames.fileNameFromGeneration(IndexFileNames.SEGMENTS, "", generation);
dir.renameFile(src, dest);
pendingCommit = false;
lastGeneration = generation;
return dest;
}
finishCommit函式主要完成將索引目錄下的臨時段檔案重新命名為正式的段檔案,例如將pending_segments_1檔案重新命名為segments_1檔案。
IndexWriter::commit->commitInternal->finishCommit->IndexFileDeleter::checkpoint
public void checkpoint(SegmentInfos segmentInfos, boolean isCommit) throws IOException {
incRef(segmentInfos, isCommit);
commits.add(new CommitPoint(commitsToDelete, directoryOrig, segmentInfos));
policy.onCommit(commits);
deleteCommits();
}
checkpoint函式首先通過incRef遞增段中每個檔案的引用次數,然後將待刪除檔案的資訊封裝成CommitPoint並新增到commits列表中。
policy是在LiveIndexWriterConfig中預設的KeepOnlyLastCommitDeletionPolicy,onCommit函式用來儲存最新的CommitPoint。deleteCommits函式降低檔案的引用次數,可能執行最終的刪除操作。
IndexWriter::commit->commitInternal->finishCommit->IndexFileDeleter::checkpoint->incRef
void incRef(SegmentInfos segmentInfos, boolean isCommit) throws IOException {
for(final String fileName: segmentInfos.files(isCommit)) {
incRef(fileName);
}
}
public Collection<String> files(boolean includeSegmentsFile) throws IOException {
HashSet<String> files = new HashSet<>();
if (includeSegmentsFile) {
final String segmentFileName = getSegmentsFileName();
if (segmentFileName != null) {
files.add(segmentFileName);
}
}
final int size = size();
for(int i=0;i<size;i++) {
final SegmentCommitInfo info = info(i);
files.addAll(info.files());
}
return files;
}
getSegmentsFileName函式獲得對應段的檔名,例如segments_1,然後將段檔名新增到files集合中。
size返回段資訊SegmentCommitInfo的數量。info函式從SegmentCommitInfo列表中獲取對應的SegmentCommitInfo,然後呼叫其files函式獲取該段對應的諸如.doc、.tim、.si、.nvd、.pos、.fdx、.nvm、.fnm、.fdt、.tip等檔名,然後將這些檔名新增到files集合中並返回。
IndexWriter::commit->commitInternal->finishCommit->IndexFileDeleter::checkpoint->incRef->incRef
void incRef(String fileName) {
RefCount rc = getRefCount(fileName);
rc.IncRef();
}
private RefCount getRefCount(String fileName) {
RefCount rc;
if (!refCounts.containsKey(fileName)) {
rc = new RefCount(fileName);
refCounts.put(fileName, rc);
} else {
rc = refCounts.get(fileName);
}
return rc;
}
getRefCount從IndexFileDeleter的成員變數refCounts中獲得當前每個檔案的引用次數,然後將其加1。
IndexWriter::commit->commitInternal->finishCommit->IndexFileDeleter::checkpoint->KeepOnlyLastCommitDeletionPolicy::onCommit
public void onCommit(List<? extends IndexCommit> commits) {
int size = commits.size();
for(int i=0;i<size-1;i++) {
commits.get(i).delete();
}
}
public void delete() {
if (!deleted) {
deleted = true;
commitsToDelete.add(this);
}
}
onCommit會將最新新增的IndexCommit繼續儲存在commits列表中,並將其餘的IndexCommit新增到commitsToDelete列表中。
IndexWriter::commit->commitInternal->finishCommit->IndexFileDeleter::checkpoint->deleteCommits
private void deleteCommits() {
int size = commitsToDelete.size();
for(int i=0;i<size;i++) {
CommitPoint commit = commitsToDelete.get(i);
decRef(commit.files);
}
commitsToDelete.clear();
size = commits.size();
int readFrom = 0;
int writeTo = 0;
while(readFrom < size) {
CommitPoint commit = commits.get(readFrom);
if (!commit.deleted) {
if (writeTo != readFrom) {
commits.set(writeTo, commits.get(readFrom));
}
writeTo++;
}
readFrom++;
}
while(size > writeTo) {
commits.remove(size-1);
size--;
}
}
遍歷commitsToDelete中的CommitPoint,降低其對應檔案的引用次數,如果等於0,就將其刪除。
刪除完成後,再通過一個while迴圈更新commits列表,取出刪除的CommitPoint。
IndexWriter::commit->commitInternal->finishCommit->IndexFileDeleter::checkpoint->deleteCommits->decRef
void decRef(Collection<String> files) throws IOException {
Set<String> toDelete = new HashSet<>();
for(final String file : files) {
if (decRef(file)) {
toDelete.add(file);
}
}
deleteFiles(toDelete);
}
decRef函式和前面介紹的incRef類似,用於降低該檔案的引用次數,如果等於0,就返回true,表示要刪除該檔案,將其新增到待刪除的檔案列表toDelete中,最後呼叫deleteFiles刪除這些檔案。