lucene原始碼分析---5
lucene原始碼分析—flush
在前幾章的分析中經常遇到flush操作,即當索引的相關資料存入記憶體中的某些資料結構後,再適當的實際就會通過flush函式將這些資料寫入檔案中,本章就開始分析flush函式,從DocumentsWriter的doflush函式開始分析,下面來看。
DocumentsWriter::doflush
private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException, AbortingException {
boolean hasEvents = false ;
while (flushingDWPT != null) {
hasEvents = true;
boolean success = false;
SegmentFlushTicket ticket = null;
try {
try {
ticket = ticketQueue.addFlushTicket(flushingDWPT);
final int flushingDocsInRam = flushingDWPT.getNumDocsInRAM();
boolean dwptSuccess = false;
try {
final FlushedSegment newSegment = flushingDWPT.flush();
ticketQueue.addSegment(ticket, newSegment);
dwptSuccess = true;
} finally {
subtractFlushedNumDocs(flushingDocsInRam);
if (!flushingDWPT.pendingFilesToDelete().isEmpty()) {
putEvent(new DeleteNewFilesEvent(flushingDWPT.pendingFilesToDelete()));
hasEvents = true;
}
if (!dwptSuccess) {
putEvent(new FlushFailedEvent(flushingDWPT.getSegmentInfo()));
hasEvents = true;
}
}
success = true;
} finally {
if (!success && ticket != null) {
ticketQueue.markTicketFailed(ticket);
}
}
if (ticketQueue.getTicketCount() >= perThreadPool.getActiveThreadStateCount()) {
putEvent(ForcedPurgeEvent.INSTANCE);
break;
}
} finally {
flushControl.doAfterFlush(flushingDWPT);
}
flushingDWPT = flushControl.nextPendingFlush();
}
if (hasEvents) {
putEvent(MergePendingEvent.INSTANCE);
}
final double ramBufferSizeMB = config.getRAMBufferSizeMB();
if (ramBufferSizeMB != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
flushControl.getDeleteBytesUsed() > (1024*1024*ramBufferSizeMB/2)) {
hasEvents = true;
if (!this.applyAllDeletes(deleteQueue)) {
putEvent(ApplyDeletesEvent.INSTANCE);
}
}
return hasEvents;
}
傳入的引數flushingDWPT是DocumentsWriterPerThread型別代表一個索引文件的執行緒。
ticketQueue被定義為DocumentsWriterFlushQueue,用來同步多個flush執行緒,其addFlushTicket定義如下,
DocumentsWriter::doflush->DocumentsWriterFlushQueue::addFlushTicket
synchronized SegmentFlushTicket addFlushTicket(DocumentsWriterPerThread dwpt) {
incTickets();
boolean success = false;
try {
final SegmentFlushTicket ticket = new SegmentFlushTicket(dwpt.prepareFlush());
queue.add(ticket);
success = true;
return ticket;
} finally {
if (!success) {
decTickets();
}
}
}
addFlushTicket函式首先通過incTickets增加計數。prepareFlush操作在flush還沒開始前將一些被標記的文件刪除。該函式主要建立一個SegmentFlushTicket並新增進內部佇列queue中。。
回到DocumentsWriter的doflush函式中,該函式繼續通過getNumDocsInRAM獲得在記憶體中的文件數,然後呼叫DocumentsWriterPerThread的flush函式繼續進行。
DocumentsWriter::doflush->DocumentsWriterPerThread::flush
FlushedSegment flush() throws IOException, AbortingException {
segmentInfo.setMaxDoc(numDocsInRAM);
final SegmentWriteState flushState = new SegmentWriteState(infoStream, directory, segmentInfo, fieldInfos.finish(), pendingUpdates, new IOContext(new FlushInfo(numDocsInRAM, bytesUsed())));
final double startMBUsed = bytesUsed() / 1024. / 1024.;
if (pendingUpdates.docIDs.size() > 0) {
flushState.liveDocs = codec.liveDocsFormat().newLiveDocs(numDocsInRAM);
for(int delDocID : pendingUpdates.docIDs) {
flushState.liveDocs.clear(delDocID);
}
flushState.delCountOnFlush = pendingUpdates.docIDs.size();
pendingUpdates.bytesUsed.addAndGet(-pendingUpdates.docIDs.size() * BufferedUpdates.BYTES_PER_DEL_DOCID);
pendingUpdates.docIDs.clear();
}
if (aborted) {
return null;
}
long t0 = System.nanoTime();
try {
consumer.flush(flushState);
pendingUpdates.terms.clear();
segmentInfo.setFiles(new HashSet<>(directory.getCreatedFiles()));
final SegmentCommitInfo segmentInfoPerCommit = new SegmentCommitInfo(segmentInfo, 0, -1L, -1L, -1L);
final BufferedUpdates segmentDeletes;
if (pendingUpdates.queries.isEmpty() && pendingUpdates.numericUpdates.isEmpty() && pendingUpdates.binaryUpdates.isEmpty()) {
pendingUpdates.clear();
segmentDeletes = null;
} else {
segmentDeletes = pendingUpdates;
}
FlushedSegment fs = new FlushedSegment(segmentInfoPerCommit, flushState.fieldInfos, segmentDeletes, flushState.liveDocs, flushState.delCountOnFlush);
sealFlushedSegment(fs);
return fs;
} catch (Throwable th) {
}
}
flush函式的pendingUpdates儲存了待刪除或更新的文件ID。假設待刪除或更新的文件數大於0,就要標記處這些文件,接下來的codec被定義為Lucene60Codec,往下跟蹤可知liveDocsFormat函式返回Lucene50LiveDocsFormat,Lucene50LiveDocsFormat的newLiveDocs函式建立FixedBitSet用來標記待刪除或更新的文件ID。再往下的consumer在建立函式中被定義為DefaultIndexingChain,下面開始重點看DefaultIndexingChain的flush函式。
DefaultIndexingChain的flush函式
DefaultIndexingChain的flush函式程式碼如下所示,
DocumentsWriter::doflush->DocumentsWriterPerThread::flush->DefaultIndexingChain::flush
public void flush(SegmentWriteState state) throws IOException, AbortingException {
int maxDoc = state.segmentInfo.maxDoc();
long t0 = System.nanoTime();
writeNorms(state);
t0 = System.nanoTime();
writeDocValues(state);
t0 = System.nanoTime();
writePoints(state);
t0 = System.nanoTime();
initStoredFieldsWriter();
fillStoredFields(maxDoc);
storedFieldsWriter.finish(state.fieldInfos, maxDoc);
storedFieldsWriter.close();
t0 = System.nanoTime();
Map<String,TermsHashPerField> fieldsToFlush = new HashMap<>();
for (int i=0;i<fieldHash.length;i++) {
PerField perField = fieldHash[i];
while (perField != null) {
if (perField.invertState != null) {
fieldsToFlush.put(perField.fieldInfo.name, perField.termsHashPerField);
}
perField = perField.next;
}
}
termsHash.flush(fieldsToFlush, state);
t0 = System.nanoTime();
docWriter.codec.fieldInfosFormat().write(state.directory, state.segmentInfo, "", state.fieldInfos, IOContext.DEFAULT);
}
引數state中的segmentInfo是DocumentsWriterPerThread建構函式中建立的SegmentInfo,儲存了相應的段資訊,maxDoc函式返回目前在記憶體中的文件樹。DefaultIndexingChain的flush函式接下來通過writeNorms函式將norm資訊寫入.nvm和.nvd檔案中。
DefaultIndexingChain的writeNorms函式
DefaultIndexingChain::flush->DefaultIndexingChain::writeNorms
private void writeNorms(SegmentWriteState state) throws IOException {
boolean success = false;
NormsConsumer normsConsumer = null;
try {
if (state.fieldInfos.hasNorms()) {
NormsFormat normsFormat = state.segmentInfo.getCodec().normsFormat();
normsConsumer = normsFormat.normsConsumer(state);
for (FieldInfo fi : state.fieldInfos) {
PerField perField = getPerField(fi.name);
assert perField != null;
if (fi.omitsNorms() == false && fi.getIndexOptions() != IndexOptions.NONE) {
perField.norms.finish(state.segmentInfo.maxDoc());
perField.norms.flush(state, normsConsumer);
}
}
}
success = true;
} finally {
}
}
Lucene60Codec的normsFormat函式最終返回Lucene53NormsFormat,對應的normsConsumer函式返回一個Lucene53NormsConsumer。
DefaultIndexingChain::flush->writeNorms->Lucene53NormsFormat::normsConsumer
public NormsConsumer normsConsumer(SegmentWriteState state) throws IOException {
return new Lucene53NormsConsumer(state, DATA_CODEC, DATA_EXTENSION, METADATA_CODEC, METADATA_EXTENSION);
}
Lucene53NormsConsumer(SegmentWriteState state, String dataCodec, String dataExtension, String metaCodec, String metaExtension) throws IOException {
boolean success = false;
try {
String dataName = IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, dataExtension);
data = state.directory.createOutput(dataName, state.context);
CodecUtil.writeIndexHeader(data, dataCodec, VERSION_CURRENT, state.segmentInfo.getId(), state.segmentSuffix);
String metaName = IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, metaExtension);
meta = state.directory.createOutput(metaName, state.context);
CodecUtil.writeIndexHeader(meta, metaCodec, VERSION_CURRENT, state.segmentInfo.getId(), state.segmentSuffix);
maxDoc = state.segmentInfo.maxDoc();
success = true;
} finally {
}
}
IndexFileNames的segmentFileName函式會根據傳入的引數段名(例如_0)和拓展名(例如.nvd)構造檔名_0.nvd。接著通過FSDirectory的createOutput建立輸出流,程式碼如下,
DefaultIndexingChain::flush->writeNorms->Lucene53NormsFormat::normsConsumer->TrackingDirectoryWrapper::createOutput
public IndexOutput createOutput(String name, IOContext context) throws IOException {
IndexOutput output = in.createOutput(name, context);
createdFileNames.add(name);
return output;
}
public IndexOutput createOutput(String name, IOContext context) throws IOException {
ensureOpen();
pendingDeletes.remove(name);
maybeDeletePendingFiles();
return new FSIndexOutput(name);
}
public FSIndexOutput(String name) throws IOException {
this(name, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.WRITE);
}
createOutput函式內部會根據檔名建立一個FSIndexOutput並返回。
回到Lucene53NormsFormat的normsConsumer函式中,接下來就通過writeIndexHeader向檔案寫入頭資訊。
DefaultIndexingChain::flush->writeNorms->Lucene53NormsFormat::normsConsumer->CodecUtil::writeIndexHeader
public static void writeIndexHeader(DataOutput out, String codec, int version, byte[] id, String suffix) throws IOException {
writeHeader(out, codec, version);
out.writeBytes(id, 0, id.length);
BytesRef suffixBytes = new BytesRef(suffix);
out.writeByte((byte) suffixBytes.length);
out.writeBytes(suffixBytes.bytes, suffixBytes.offset, suffixBytes.length);
}
public static void writeHeader(DataOutput out, String codec, int version) throws IOException {
BytesRef bytes = new BytesRef(codec);
out.writeInt(CODEC_MAGIC);
out.writeString(codec);
out.writeInt(version);
}
回到DefaultIndexingChain的writeNorms函式中,接下來通過getPerField獲取PerField,其中的FieldInfo儲存了域資訊,程式碼如下
DefaultIndexingChain:flush->writeNorms->getPerField
private PerField getPerField(String name) {
final int hashPos = name.hashCode() & hashMask;
PerField fp = fieldHash[hashPos];
while (fp != null && !fp.fieldInfo.name.equals(name)) {
fp = fp.next;
}
return fp;
}
繼續往下看,PerField中的成員變數norms在其建構函式中被定義為NormValuesWriter,對應的finish為空函式,而flush函式如下,
DefaultIndexingChain::flush->writeNorms->NormValuesWriter::flush
public void flush(SegmentWriteState state, NormsConsumer normsConsumer) throws IOException {
final int maxDoc = state.segmentInfo.maxDoc();
final PackedLongValues values = pending.build();
normsConsumer.addNormsField(fieldInfo,
new Iterable<Number>() {
@Override
public Iterator<Number> iterator() {
return new NumericIterator(maxDoc, values);
}
});
}
這裡的normsConsumer就是Lucene53NormsConsumer,對應的addNormsField函式如下所示,
DefaultIndexingChain::flush->writeNorms->NormValuesWriter::flush->Lucene53NormsConsumer::addNormsField
public void addNormsField(FieldInfo field, Iterable<Number> values) throws IOException {
meta.writeVInt(field.number);
long minValue = Long.MAX_VALUE;
long maxValue = Long.MIN_VALUE;
int count = 0;
for (Number nv : values) {
final long v = nv.longValue();
minValue = Math.min(minValue, v);
maxValue = Math.max(maxValue, v);
count++;
}
if (minValue == maxValue) {
addConstant(minValue);
} else if (minValue >= Byte.MIN_VALUE && maxValue <= Byte.MAX_VALUE) {
addByte1(values);
} else if (minValue >= Short.MIN_VALUE && maxValue <= Short.MAX_VALUE) {
addByte2(values);
} else if (minValue >= Integer.MIN_VALUE && maxValue <= Integer.MAX_VALUE) {
addByte4(values);
} else {
addByte8(values);
}
}
該函式再往下看就是將FieldInfo中的資料通過剛剛建立的FSIndexOutput寫入到.nvd和.nvm檔案中。
DefaultIndexingChain的writeDocValues函式
看完了writeNorms函式,接下來看writeDocValues函式,
DefaultIndexingChain::flush->writeDocValues
private void writeDocValues(SegmentWriteState state) throws IOException {
int maxDoc = state.segmentInfo.maxDoc();
DocValuesConsumer dvConsumer = null;
boolean success = false;
try {
for (int i=0;i<fieldHash.length;i++) {
PerField perField = fieldHash[i];
while (perField != null) {
if (perField.docValuesWriter != null) {
if (dvConsumer == null) {
DocValuesFormat fmt = state.segmentInfo.getCodec().docValuesFormat();
dvConsumer = fmt.fieldsConsumer(state);
}
perField.docValuesWriter.finish(maxDoc);
perField.docValuesWriter.flush(state, dvConsumer);
perField.docValuesWriter = null;
}
perField = perField.next;
}
}
success = true;
} finally {
}
}
和前面writeNorms函式中的分析類似,writeDocValues函式遍歷得到每個PerField,PerField中的docValuesWriter根據不同的Field值域型別被定義為NumericDocValuesWriter、BinaryDocValuesWriter、SortedDocValuesWriter、SortedNumericDocValuesWriter和SortedSetDocValuesWriter,程式碼如下,
DefaultIndexingChain::flush->indexDocValue
private void indexDocValue(PerField fp, DocValuesType dvType, IndexableField field) throws IOException {
if (fp.fieldInfo.getDocValuesType() == DocValuesType.NONE) {
fieldInfos.globalFieldNumbers.setDocValuesType(fp.fieldInfo.number, fp.fieldInfo.name, dvType);
}
fp.fieldInfo.setDocValuesType(dvType);
int docID = docState.docID;
switch(dvType) {
case NUMERIC:
if (fp.docValuesWriter == null) {
fp.docValuesWriter = new NumericDocValuesWriter(fp.fieldInfo, bytesUsed);
}
((NumericDocValuesWriter) fp.docValuesWriter).addValue(docID, field.numericValue().longValue());
break;
case BINARY:
if (fp.docValuesWriter == null) {
fp.docValuesWriter = new BinaryDocValuesWriter(fp.fieldInfo, bytesUsed);
}
((BinaryDocValuesWriter) fp.docValuesWriter).addValue(docID, field.binaryValue());
break;
case SORTED:
if (fp.docValuesWriter == null) {
fp.docValuesWriter = new SortedDocValuesWriter(fp.fieldInfo, bytesUsed);
}
((SortedDocValuesWriter) fp.docValuesWriter).addValue(docID, field.binaryValue());
break;
case SORTED_NUMERIC:
if (fp.docValuesWriter == null) {
fp.docValuesWriter = new SortedNumericDocValuesWriter(fp.fieldInfo, bytesUsed);
}
((SortedNumericDocValuesWriter) fp.docValuesWriter).addValue(docID, field.numericValue().longValue());
break;
case SORTED_SET:
if (fp.docValuesWriter == null) {
fp.docValuesWriter = new SortedSetDocValuesWriter(fp.fieldInfo, bytesUsed);
}
((SortedSetDocValuesWriter) fp.docValuesWriter).addValue(docID, field.binaryValue());
break;
default:
throw new AssertionError();
}
}
為了方便分析,下面假設PerField中的docValuesWriter被定義為BinaryDocValuesWriter。
回到writeDocValues函式中,再往下通過docValuesFormat函式返回一個PerFieldDocValuesFormat,並通過PerFieldDocValuesFormat的fieldsConsumer獲得一個DocValuesConsumer。
DefaultIndexingChain::flush->writeDocValues->PerFieldDocValuesFormat::fieldsConsumer
public final DocValuesConsumer fieldsConsumer(SegmentWriteState state) throws IOException {
return new FieldsWriter(state);
}
fieldsConsumer最後返回的其實是一個FieldsWriter。
回到DefaultIndexingChain的writeDocValues函式中,接下來繼續呼叫docValuesWriter也即前面假設的BinaryDocValuesWriter的flush函式。
DefaultIndexingChain::flush->writeDocValues->BinaryDocValuesWriter::flush
public void flush(SegmentWriteState state, DocValuesConsumer dvConsumer) throws IOException {
final int maxDoc = state.segmentInfo.maxDoc();
bytes.freeze(false);
final PackedLongValues lengths = this.lengths.build();
dvConsumer.addBinaryField(fieldInfo,
new Iterable<BytesRef>() {
@Override
public Iterator<BytesRef> iterator() {
return new BytesIterator(maxDoc, lengths);
}
});
}
BinaryDocValuesWriter的flush函式主要呼叫了FieldsWriter的addBinaryField函式新增FieldInfo中的資料。
DefaultIndexingChain::flush->writeDocValues->BinaryDocValuesWriter::flush->FieldsWriter::addBinaryField
public void addBinaryField(FieldInfo field, Iterable<BytesRef> values) throws IOException {
getInstance(field).addBinaryField(field, values);
}
addBinaryField首先通過getInstance函式最終獲得一個Lucene54DocValuesConsumer。
DefaultIndexingChain::flush->writeDocValues->BinaryDocValuesWriter::flush->FieldsWriter::addBinaryField->getInstance
private DocValuesConsumer getInstance(FieldInfo field) throws IOException {
DocValuesFormat format = null;
if (field.getDocValuesGen() != -1) {
final String formatName = field.getAttribute(PER_FIELD_FORMAT_KEY);
if (formatName != null) {
format = DocValuesFormat.forName(formatName);
}
}
if (format == null) {
format = getDocValuesFormatForField(field.name);
}
final String formatName = format.getName();
String previousValue = field.putAttribute(PER_FIELD_FORMAT_KEY, formatName);
Integer suffix = null;
ConsumerAndSuffix consumer = formats.get(format);
if (consumer == null) {
if (field.getDocValuesGen() != -1) {
final String suffixAtt = field.getAttribute(PER_FIELD_SUFFIX_KEY);
if (suffixAtt != null) {
suffix = Integer.valueOf(suffixAtt);
}
}
if (suffix == null) {
suffix = suffixes.get(formatName);
if (suffix == null) {
suffix = 0;
} else {
suffix = suffix + 1;
}
}
suffixes.put(formatName, suffix);
final String segmentSuffix = getFullSegmentSuffix(segmentWriteState.segmentSuffix,
getSuffix(formatName, Integer.toString(suffix)));
consumer = new ConsumerAndSuffix();
consumer.consumer = format.fieldsConsumer(new SegmentWriteState(segmentWriteState, segmentSuffix));
consumer.suffix = suffix;
formats.put(format, consumer);
} else {
suffix = consumer.suffix;
}
previousValue = field.putAttribute(PER_FIELD_SUFFIX_KEY, Integer.toString(suffix));
return consumer.consumer;
}
假設是第一次進入該函式,format會通過getDocValuesFormatForField函式被定義為Lucene54DocValuesFormat,然後通過Lucene54DocValuesFormat的fieldsConsumer函式構造一個Lucene54DocValuesConsumer並返回。
DefaultIndexingChain::flush->writeDocValues->BinaryDocValuesWriter::flush->FieldsWriter::addBinaryField->getInstance->Lucene54DocValuesFormat::fieldsConsumer
public DocValuesConsumer fieldsConsumer(SegmentWriteState state) throws IOException {
return new Lucene54DocValuesConsumer(state, DATA_CODEC, DATA_EXTENSION, META_CODEC, META_EXTENSION);
}
public Lucene54DocValuesConsumer(SegmentWriteState state, String dataCodec, String dataExtension, String metaCodec, String metaExtension) throws IOException {
boolean success = false;
try {
String dataName = IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, dataExtension);
data = state.directory.createOutput(dataName, state.context);
CodecUtil.writeIndexHeader(data, dataCodec, Lucene54DocValuesFormat.VERSION_CURRENT, state.segmentInfo.getId(), state.segmentSuffix);
String metaName = IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, metaExtension);
meta = state.directory.createOutput(metaName, state.context);
CodecUtil.writeIndexHeader(meta, metaCodec, Lucene54DocValuesFormat.VERSION_CURRENT, state.segmentInfo.getId(), state.segmentSuffix);
maxDoc = state.segmentInfo.maxDoc();
success = true;
} finally {
}
}
和前面的分析類似,這裡建立了.dvd和.dvm的檔案輸出流並寫入相應的頭資訊。
Lucene54DocValuesConsumer的addBinaryField函式就不往下看了,就是呼叫檔案輸出流寫入相應的資料。
DefaultIndexingChain的writePoints函式
DefaultIndexingChain::flush->writePoints
private void writePoints(SegmentWriteState state) throws IOException {
PointsWriter pointsWriter = null;
boolean success = false;
try {
for (int i=0;i<fieldHash.length;i++) {
PerField perField = fieldHash[i];
while (perField != null) {
if (perField.pointValuesWriter != null) {
if (pointsWriter == null) {
PointsFormat fmt = state.segmentInfo.getCodec().pointsFormat();
pointsWriter = fmt.fieldsWriter(state);
}
perField.pointValuesWriter.flush(state, pointsWriter);
perField.pointValuesWriter = null;
}
perField = perField.next;
}
}
if (pointsWriter != null) {
pointsWriter.finish();
}
success = true;
} finally {
}
}
和前面的分析類似,writePoints函式中的pointsFormat最終返回Lucene60PointsFormat,然後通過其fieldsWriter函式獲得一個Lucene60PointsWriter。
DefaultIndexingChain::writePoints->Lucene60PointsFormat::fieldsWriter
public PointsWriter fieldsWriter(SegmentWriteState state) throws IOException {
return new Lucene60PointsWriter(state);
}
PerField中的成員變數pointValuesWriter被設定為PointValuesWriter,對應的flush函式如下所示,
DefaultIndexingChain::writePoints->PointValuesWriter::flush
public void flush(SegmentWriteState state, PointsWriter writer) throws IOException {
writer.writeField(fieldInfo,
new PointsReader() {
@Override
public void intersect(String fieldName, IntersectVisitor visitor) throws IOException {
if (fieldName.equals(fieldInfo.name) == false) {
throw new IllegalArgumentException();
}
for(int i=0;i<numPoints;i++) {
bytes.readBytes(packedValue.length * i, packedValue, 0, packedValue.length);
visitor.visit(docIDs[i], packedValue);
}
}
@Override
public void checkIntegrity() {
throw new UnsupportedOperationException();
}
@Override
public long ramBytesUsed() {
return 0L;
}
@Override
public void close() {
}
@Override
public byte[] getMinPackedValue(String fieldName) {
throw new UnsupportedOperationException();
}
@Override
public byte[] getMaxPackedValue(String fieldName) {
throw new UnsupportedOperationException();
}
@Override
public int getNumDimensions(String fieldName) {
throw new UnsupportedOperationException();
}
@Override
public int getBytesPerDimension(String fieldName) {
throw new UnsupportedOperationException();
}
@Override
public long size(String fieldName) {
return numPoints;
}
@Override
public int getDocCount(String fieldName) {
return numDocs;
}
});
}
PointValuesWriter的flush函式繼而會呼叫Lucene60PointsWriter的writeField函式,如下所示,
DefaultIndexingChain::writePoints->PointValuesWriter::flush->Lucene60PointsWriter::writeField
public void writeField(FieldInfo fieldInfo, PointsReader values) throws IOException {
boolean singleValuePerDoc = values.size(fieldInfo.name) == values.getDocCount(fieldInfo.name);
try (BKDWriter writer = new BKDWriter(writeState.segmentInfo.maxDoc(),
writeState.directory,
writeState.segmentInfo.name,
fieldInfo.getPointDimensionCount(),
fieldInfo.getPointNumBytes(),
maxPointsInLeafNode,
maxMBSortInHeap,
values.size(fieldInfo.name),
singleValuePerDoc)) {
values.intersect(fieldInfo.name, new IntersectVisitor() {
@Override
public void visit(int docID) {
throw new IllegalStateException();
}
public void visit(int docID, byte[] packedValue) throws IOException {
writer.add(packedValue, docID);
}
@Override
public Relation compare(byte[] minPackedValue, byte[] maxPackedValue) {
return Relation.CELL_CROSSES_QUERY;
}
});
if (writer.getPointCount() > 0) {
indexFPs.put(fieldInfo.name, writer.finish(dataOut));
}
}
}
結合PointValuesWriter的flush函式中PointsReader的定義,以及Lucene60PointsWriter中writeField函式中visit函式的定義,writeField函式最終會呼叫BKDWriter的add函式,BKD是一種資料結構,add函式定義如下,
public void add(byte[] packedValue, int docID) throws IOException {
if (pointCount >= maxPointsSortInHeap) {
if (offlinePointWriter == null) {
spillToOffline();
}
offlinePointWriter.append(packedValue, pointCount, docID);
} else {
heapPointWriter.append(packedValue, pointCount, docID);
}
if (pointCount == 0) {
System.arraycopy(packedValue, 0, minPackedValue, 0, packedBytesLength);
System.arraycopy(packedValue, 0, maxPackedValue, 0, packedBytesLength);
} else {
for(int dim=0;dim<numDims;dim++) {
int offset = dim*bytesPerDim;
if (StringHelper.compare(bytesPerDim, packedValue, offset, minPackedValue, offset) < 0) {
System.arraycopy(packedValue, offset, minPackedValue, offset, bytesPerDim);
}
if (StringHelper.compare(bytesPerDim, packedValue, offset, maxPackedValue, offset) > 0) {
System.arraycopy(packedValue, offset, maxPackedValue, offset, bytesPerDim);
}
}
}
pointCount++;
docsSeen.set(docID);
}
成員變數heapPointWriter的型別為HeapPointWriter,用來將資料寫入記憶體;offlinePointWriter的型別為OfflinePointWriter,用來將資料寫入硬碟。一開始,資料將會通過HeapPointWriter被寫入記憶體,當記憶體中的資料超過maxPointsSortInHeap時,就呼叫spillToOffline函式進行切換。
private void spillToOffline() throws IOException {
offlinePointWriter = new OfflinePointWriter(tempDir, tempFileNamePrefix, packedBytesLength, longOrds, "spill", 0, singleValuePerDoc);
tempInput = offlinePointWriter.out;
PointReader reader = heapPointWriter.getReader(0, pointCount);
for(int i=0;i<pointCount;i++) {
boolean hasNext = reader.next();
offlinePointWriter.append(reader.packedValue(), i, heapPointWriter.docIDs[i]);
}
heapPointWriter = null;
}
OfflinePointWriter的建構函式會建立類似”段名bkd_spill臨時檔案數量.tmp”的檔名對應的輸出流,然後通過append函式複製HeapPointWriter中的資料。
回到DefaultIndexingChain的writePoints函式中,接下來通過finish函式將資料寫入最終的.dim檔案中,程式碼如下,
public void finish() throws IOException {
finished = true;
CodecUtil.writeFooter(dataOut);
String indexFileName = IndexFileNames.segmentFileName(writeState.segmentInfo.name,
writeState.segmentSuffix,
Lucene60PointsFormat.INDEX_EXTENSION);
try (IndexOutput indexOut = writeState.directory.createOutput(indexFileName, writeState.context)) {
CodecUtil.writeIndexHeader(indexOut,
Lucene60PointsFormat.META_CODEC_NAME,
Lucene60PointsFormat.INDEX_VERSION_CURRENT,
writeState.segmentInfo.getId(),
writeState.segmentSuffix);
int count = indexFPs.size();
indexOut.writeVInt(count);
for(Map.Entry<String,Long> ent : indexFPs.entrySet()) {
FieldInfo fieldInfo = writeState.fieldInfos.fieldInfo(ent.getKey());
indexOut.writeVInt(fieldInfo.number);
indexOut.writeVLong(ent.getValue());
}
CodecUtil.writeFooter(indexOut);
}
}
資料寫入.fdt以及.fdx檔案
繼續看DefaultIndexingChain的flush函式,接下來通過initStoredFieldsWriter函式初始化一個StoredFieldsWriter,程式碼如下
DefaultIndexingChain::flush->initStoredFieldsWriter
private void initStoredFieldsWriter() throws IOException {
if (storedFieldsWriter == null) {
storedFieldsWriter = docWriter.codec.storedFieldsFormat().fieldsWriter(docWriter.directory, docWriter.getSegmentInfo(), IOContext.DEFAULT);
}
}
storedFieldsFormat函式返回Lucene50StoredFieldsFormat,其fieldsWriter函式會接著呼叫CompressingStoredFieldsFormat的fieldsWriter函式,最後返回CompressingStoredFieldsWriter,
DefaultIndexingChain::flush->initStoredFieldsWriter->Lucene60Codec::storedFieldsFormat->Lucene50StoredFieldsFormat::fieldsWriter->CompressingStoredFieldsFormat::fieldsWriter
public StoredFieldsWriter fieldsWriter(Directory directory, SegmentInfo si,
IOContext context) throws IOException {
return new CompressingStoredFieldsWriter(directory, si, segmentSuffix, context,
formatName, compressionMode, chunkSize, maxDocsPerChunk, blockSize);
}
CompressingStoredFieldsWriter的建構函式如下所示,
DefaultIndexingChain::flush->initStoredFieldsWriter->Lucene60Codec::storedFieldsFormat->Lucene50StoredFieldsFormat::fieldsWriter->CompressingStoredFieldsFormat::fieldsWriter->CompressingStoredFieldsWriter::CompressingStoredFieldsWriter
public CompressingStoredFieldsWriter(Directory directory, SegmentInfo si, String segmentSuffix, IOContext context, String formatName, CompressionMode compressionMode, int chunkSize, int maxDocsPerChunk, int blockSize) throws IOException {
assert directory != null;
this.segment = si.name;
this.compressionMode = compressionMode;
this.compressor = compressionMode.newCompressor();
this.chunkSize = chunkSize;
this.maxDocsPerChunk = maxDocsPerChunk;
this.docBase = 0;
this.bufferedDocs = new GrowableByteArrayDataOutput(chunkSize);
this.numStoredFields = new int[16];
this.endOffsets = new int[16];
this.numBufferedDocs = 0;
boolean success = false;
IndexOutput indexStream = directory.createOutput(IndexFileNames.segmentFileName(segment, segmentSuffix, FIELDS_INDEX_EXTENSION), context);
try {
fieldsStream = directory.createOutput(IndexFileNames.segmentFileName(segment, segmentSuffix, FIELDS_EXTENSION),context);
final String codecNameIdx = formatName + CODEC_SFX_IDX;
final String codecNameDat = formatName + CODEC_SFX_DAT;
CodecUtil.writeIndexHeader(indexStream, codecNameIdx, VERSION_CURRENT, si.getId(), segmentSuffix);
CodecUtil.writeIndexHeader(fieldsStream, codecNameDat, VERSION_CURRENT, si.getId(), segmentSuffix);
indexWriter = new CompressingStoredFieldsIndexWriter(indexStream, blockSize);
indexStream = null;
fieldsStream.writeVInt(chunkSize);
fieldsStream.writeVInt(PackedInts.VERSION_CURRENT);
success = true;
} finally {
}
}
CompressingStoredFieldsWriter的建構函式建立了.fdt和.fdx兩個檔案並建立輸出流。
回到DefaultIndexingChain的flush函式中,接下來呼叫fillStoredFields,進而呼叫startStoredFields以及finishStoredFields函式,startStoredFields函式會呼叫剛剛上面構造的CompressingStoredFieldsWriter的startDocument函式,該函式為空,finishStoredFields函式會呼叫CompressingStoredFieldsWriter的finishDocument函式,程式碼如下
DefaultIndexingChain::flush->fillStoredFields->startStoredFields->CompressingStoredFieldsWriter::finishDocument