1. 程式人生 > >lucene原始碼分析---5

lucene原始碼分析---5

lucene原始碼分析—flush

在前幾章的分析中經常遇到flush操作,即當索引的相關資料存入記憶體中的某些資料結構後,再適當的實際就會通過flush函式將這些資料寫入檔案中,本章就開始分析flush函式,從DocumentsWriter的doflush函式開始分析,下面來看。
DocumentsWriter::doflush

  private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException, AbortingException {
    boolean hasEvents = false
; while (flushingDWPT != null) { hasEvents = true; boolean success = false; SegmentFlushTicket ticket = null; try { try { ticket = ticketQueue.addFlushTicket(flushingDWPT); final int flushingDocsInRam = flushingDWPT.getNumDocsInRAM(); boolean
dwptSuccess = false; try { final FlushedSegment newSegment = flushingDWPT.flush(); ticketQueue.addSegment(ticket, newSegment); dwptSuccess = true; } finally { subtractFlushedNumDocs(flushingDocsInRam); if (!flushingDWPT.pendingFilesToDelete().isEmpty()) { putEvent(new
DeleteNewFilesEvent(flushingDWPT.pendingFilesToDelete())); hasEvents = true; } if (!dwptSuccess) { putEvent(new FlushFailedEvent(flushingDWPT.getSegmentInfo())); hasEvents = true; } } success = true; } finally { if (!success && ticket != null) { ticketQueue.markTicketFailed(ticket); } } if (ticketQueue.getTicketCount() >= perThreadPool.getActiveThreadStateCount()) { putEvent(ForcedPurgeEvent.INSTANCE); break; } } finally { flushControl.doAfterFlush(flushingDWPT); } flushingDWPT = flushControl.nextPendingFlush(); } if (hasEvents) { putEvent(MergePendingEvent.INSTANCE); } final double ramBufferSizeMB = config.getRAMBufferSizeMB(); if (ramBufferSizeMB != IndexWriterConfig.DISABLE_AUTO_FLUSH && flushControl.getDeleteBytesUsed() > (1024*1024*ramBufferSizeMB/2)) { hasEvents = true; if (!this.applyAllDeletes(deleteQueue)) { putEvent(ApplyDeletesEvent.INSTANCE); } } return hasEvents; }

傳入的引數flushingDWPT是DocumentsWriterPerThread型別代表一個索引文件的執行緒。
ticketQueue被定義為DocumentsWriterFlushQueue,用來同步多個flush執行緒,其addFlushTicket定義如下,
DocumentsWriter::doflush->DocumentsWriterFlushQueue::addFlushTicket

  synchronized SegmentFlushTicket addFlushTicket(DocumentsWriterPerThread dwpt) {
    incTickets();
    boolean success = false;
    try {
      final SegmentFlushTicket ticket = new SegmentFlushTicket(dwpt.prepareFlush());
      queue.add(ticket);
      success = true;
      return ticket;
    } finally {
      if (!success) {
        decTickets();
      }
    }
  }

addFlushTicket函式首先通過incTickets增加計數。prepareFlush操作在flush還沒開始前將一些被標記的文件刪除。該函式主要建立一個SegmentFlushTicket並新增進內部佇列queue中。。

回到DocumentsWriter的doflush函式中,該函式繼續通過getNumDocsInRAM獲得在記憶體中的文件數,然後呼叫DocumentsWriterPerThread的flush函式繼續進行。
DocumentsWriter::doflush->DocumentsWriterPerThread::flush

  FlushedSegment flush() throws IOException, AbortingException {
    segmentInfo.setMaxDoc(numDocsInRAM);
    final SegmentWriteState flushState = new SegmentWriteState(infoStream, directory, segmentInfo, fieldInfos.finish(), pendingUpdates, new IOContext(new FlushInfo(numDocsInRAM, bytesUsed())));
    final double startMBUsed = bytesUsed() / 1024. / 1024.;

    if (pendingUpdates.docIDs.size() > 0) {
      flushState.liveDocs = codec.liveDocsFormat().newLiveDocs(numDocsInRAM);
      for(int delDocID : pendingUpdates.docIDs) {
        flushState.liveDocs.clear(delDocID);
      }
      flushState.delCountOnFlush = pendingUpdates.docIDs.size();
      pendingUpdates.bytesUsed.addAndGet(-pendingUpdates.docIDs.size() * BufferedUpdates.BYTES_PER_DEL_DOCID);
      pendingUpdates.docIDs.clear();
    }

    if (aborted) {
      return null;
    }

    long t0 = System.nanoTime();

    try {
      consumer.flush(flushState);
      pendingUpdates.terms.clear();
      segmentInfo.setFiles(new HashSet<>(directory.getCreatedFiles()));

      final SegmentCommitInfo segmentInfoPerCommit = new SegmentCommitInfo(segmentInfo, 0, -1L, -1L, -1L);

      final BufferedUpdates segmentDeletes;
      if (pendingUpdates.queries.isEmpty() && pendingUpdates.numericUpdates.isEmpty() && pendingUpdates.binaryUpdates.isEmpty()) {
        pendingUpdates.clear();
        segmentDeletes = null;
      } else {
        segmentDeletes = pendingUpdates;
      }

      FlushedSegment fs = new FlushedSegment(segmentInfoPerCommit, flushState.fieldInfos, segmentDeletes, flushState.liveDocs, flushState.delCountOnFlush);
      sealFlushedSegment(fs);

      return fs;
    } catch (Throwable th) {
    }
  }

flush函式的pendingUpdates儲存了待刪除或更新的文件ID。假設待刪除或更新的文件數大於0,就要標記處這些文件,接下來的codec被定義為Lucene60Codec,往下跟蹤可知liveDocsFormat函式返回Lucene50LiveDocsFormat,Lucene50LiveDocsFormat的newLiveDocs函式建立FixedBitSet用來標記待刪除或更新的文件ID。再往下的consumer在建立函式中被定義為DefaultIndexingChain,下面開始重點看DefaultIndexingChain的flush函式。

DefaultIndexingChain的flush函式

DefaultIndexingChain的flush函式程式碼如下所示,
DocumentsWriter::doflush->DocumentsWriterPerThread::flush->DefaultIndexingChain::flush

  public void flush(SegmentWriteState state) throws IOException, AbortingException {

    int maxDoc = state.segmentInfo.maxDoc();
    long t0 = System.nanoTime();
    writeNorms(state);

    t0 = System.nanoTime();
    writeDocValues(state);

    t0 = System.nanoTime();
    writePoints(state);

    t0 = System.nanoTime();
    initStoredFieldsWriter();
    fillStoredFields(maxDoc);
    storedFieldsWriter.finish(state.fieldInfos, maxDoc);
    storedFieldsWriter.close();

    t0 = System.nanoTime();
    Map<String,TermsHashPerField> fieldsToFlush = new HashMap<>();
    for (int i=0;i<fieldHash.length;i++) {
      PerField perField = fieldHash[i];
      while (perField != null) {
        if (perField.invertState != null) {
          fieldsToFlush.put(perField.fieldInfo.name, perField.termsHashPerField);
        }
        perField = perField.next;
      }
    }

    termsHash.flush(fieldsToFlush, state);

    t0 = System.nanoTime();
    docWriter.codec.fieldInfosFormat().write(state.directory, state.segmentInfo, "", state.fieldInfos, IOContext.DEFAULT);
  }

引數state中的segmentInfo是DocumentsWriterPerThread建構函式中建立的SegmentInfo,儲存了相應的段資訊,maxDoc函式返回目前在記憶體中的文件樹。DefaultIndexingChain的flush函式接下來通過writeNorms函式將norm資訊寫入.nvm和.nvd檔案中。

DefaultIndexingChain的writeNorms函式

DefaultIndexingChain::flush->DefaultIndexingChain::writeNorms

  private void writeNorms(SegmentWriteState state) throws IOException {
    boolean success = false;
    NormsConsumer normsConsumer = null;
    try {
      if (state.fieldInfos.hasNorms()) {
        NormsFormat normsFormat = state.segmentInfo.getCodec().normsFormat();
        normsConsumer = normsFormat.normsConsumer(state);

        for (FieldInfo fi : state.fieldInfos) {
          PerField perField = getPerField(fi.name);
          assert perField != null;

          if (fi.omitsNorms() == false && fi.getIndexOptions() != IndexOptions.NONE) {
            perField.norms.finish(state.segmentInfo.maxDoc());
            perField.norms.flush(state, normsConsumer);
          }
        }
      }
      success = true;
    } finally {

    }
  }

Lucene60Codec的normsFormat函式最終返回Lucene53NormsFormat,對應的normsConsumer函式返回一個Lucene53NormsConsumer。
DefaultIndexingChain::flush->writeNorms->Lucene53NormsFormat::normsConsumer

  public NormsConsumer normsConsumer(SegmentWriteState state) throws IOException {
    return new Lucene53NormsConsumer(state, DATA_CODEC, DATA_EXTENSION, METADATA_CODEC, METADATA_EXTENSION);
  }

  Lucene53NormsConsumer(SegmentWriteState state, String dataCodec, String dataExtension, String metaCodec, String metaExtension) throws IOException {
    boolean success = false;
    try {
      String dataName = IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, dataExtension);
      data = state.directory.createOutput(dataName, state.context);
      CodecUtil.writeIndexHeader(data, dataCodec, VERSION_CURRENT, state.segmentInfo.getId(), state.segmentSuffix);
      String metaName = IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, metaExtension);
      meta = state.directory.createOutput(metaName, state.context);
      CodecUtil.writeIndexHeader(meta, metaCodec, VERSION_CURRENT, state.segmentInfo.getId(), state.segmentSuffix);
      maxDoc = state.segmentInfo.maxDoc();
      success = true;
    } finally {

    }
  }

IndexFileNames的segmentFileName函式會根據傳入的引數段名(例如_0)和拓展名(例如.nvd)構造檔名_0.nvd。接著通過FSDirectory的createOutput建立輸出流,程式碼如下,
DefaultIndexingChain::flush->writeNorms->Lucene53NormsFormat::normsConsumer->TrackingDirectoryWrapper::createOutput

  public IndexOutput createOutput(String name, IOContext context) throws IOException {
    IndexOutput output = in.createOutput(name, context);
    createdFileNames.add(name);
    return output;
  }

  public IndexOutput createOutput(String name, IOContext context) throws IOException {
    ensureOpen();

    pendingDeletes.remove(name);
    maybeDeletePendingFiles();
    return new FSIndexOutput(name);
  }

  public FSIndexOutput(String name) throws IOException {
    this(name, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.WRITE);
  }

createOutput函式內部會根據檔名建立一個FSIndexOutput並返回。

回到Lucene53NormsFormat的normsConsumer函式中,接下來就通過writeIndexHeader向檔案寫入頭資訊。
DefaultIndexingChain::flush->writeNorms->Lucene53NormsFormat::normsConsumer->CodecUtil::writeIndexHeader

  public static void writeIndexHeader(DataOutput out, String codec, int version, byte[] id, String suffix) throws IOException {
    writeHeader(out, codec, version);
    out.writeBytes(id, 0, id.length);
    BytesRef suffixBytes = new BytesRef(suffix);
    out.writeByte((byte) suffixBytes.length);
    out.writeBytes(suffixBytes.bytes, suffixBytes.offset, suffixBytes.length);
  }

  public static void writeHeader(DataOutput out, String codec, int version) throws IOException {
    BytesRef bytes = new BytesRef(codec);
    out.writeInt(CODEC_MAGIC);
    out.writeString(codec);
    out.writeInt(version);
  }

回到DefaultIndexingChain的writeNorms函式中,接下來通過getPerField獲取PerField,其中的FieldInfo儲存了域資訊,程式碼如下
DefaultIndexingChain:flush->writeNorms->getPerField

  private PerField getPerField(String name) {
    final int hashPos = name.hashCode() & hashMask;
    PerField fp = fieldHash[hashPos];
    while (fp != null && !fp.fieldInfo.name.equals(name)) {
      fp = fp.next;
    }
    return fp;
  }

繼續往下看,PerField中的成員變數norms在其建構函式中被定義為NormValuesWriter,對應的finish為空函式,而flush函式如下,
DefaultIndexingChain::flush->writeNorms->NormValuesWriter::flush

  public void flush(SegmentWriteState state, NormsConsumer normsConsumer) throws IOException {

    final int maxDoc = state.segmentInfo.maxDoc();
    final PackedLongValues values = pending.build();

    normsConsumer.addNormsField(fieldInfo,
                               new Iterable<Number>() {
                                 @Override
                                 public Iterator<Number> iterator() {
                                   return new NumericIterator(maxDoc, values);
                                 }
                               });
  }

這裡的normsConsumer就是Lucene53NormsConsumer,對應的addNormsField函式如下所示,
DefaultIndexingChain::flush->writeNorms->NormValuesWriter::flush->Lucene53NormsConsumer::addNormsField

  public void addNormsField(FieldInfo field, Iterable<Number> values) throws IOException {
    meta.writeVInt(field.number);
    long minValue = Long.MAX_VALUE;
    long maxValue = Long.MIN_VALUE;
    int count = 0;

    for (Number nv : values) {
      final long v = nv.longValue();
      minValue = Math.min(minValue, v);
      maxValue = Math.max(maxValue, v);
      count++;
    }

    if (minValue == maxValue) {
      addConstant(minValue);
    } else if (minValue >= Byte.MIN_VALUE && maxValue <= Byte.MAX_VALUE) {
      addByte1(values);
    } else if (minValue >= Short.MIN_VALUE && maxValue <= Short.MAX_VALUE) {
      addByte2(values);
    } else if (minValue >= Integer.MIN_VALUE && maxValue <= Integer.MAX_VALUE) {
      addByte4(values);
    } else {
      addByte8(values);
    }
  }

該函式再往下看就是將FieldInfo中的資料通過剛剛建立的FSIndexOutput寫入到.nvd和.nvm檔案中。

DefaultIndexingChain的writeDocValues函式

看完了writeNorms函式,接下來看writeDocValues函式,
DefaultIndexingChain::flush->writeDocValues

  private void writeDocValues(SegmentWriteState state) throws IOException {
    int maxDoc = state.segmentInfo.maxDoc();
    DocValuesConsumer dvConsumer = null;
    boolean success = false;
    try {
      for (int i=0;i<fieldHash.length;i++) {
        PerField perField = fieldHash[i];
        while (perField != null) {
          if (perField.docValuesWriter != null) {
            if (dvConsumer == null) {
              DocValuesFormat fmt = state.segmentInfo.getCodec().docValuesFormat();
              dvConsumer = fmt.fieldsConsumer(state);
            }
            perField.docValuesWriter.finish(maxDoc);
            perField.docValuesWriter.flush(state, dvConsumer);
            perField.docValuesWriter = null;
          } 
          perField = perField.next;
        }
      }
      success = true;
    } finally {

    }
  }

和前面writeNorms函式中的分析類似,writeDocValues函式遍歷得到每個PerField,PerField中的docValuesWriter根據不同的Field值域型別被定義為NumericDocValuesWriter、BinaryDocValuesWriter、SortedDocValuesWriter、SortedNumericDocValuesWriter和SortedSetDocValuesWriter,程式碼如下,
DefaultIndexingChain::flush->indexDocValue

  private void indexDocValue(PerField fp, DocValuesType dvType, IndexableField field) throws IOException {

    if (fp.fieldInfo.getDocValuesType() == DocValuesType.NONE) {
      fieldInfos.globalFieldNumbers.setDocValuesType(fp.fieldInfo.number, fp.fieldInfo.name, dvType);
    }
    fp.fieldInfo.setDocValuesType(dvType);
    int docID = docState.docID;

    switch(dvType) {

      case NUMERIC:
        if (fp.docValuesWriter == null) {
          fp.docValuesWriter = new NumericDocValuesWriter(fp.fieldInfo, bytesUsed);
        }
        ((NumericDocValuesWriter) fp.docValuesWriter).addValue(docID, field.numericValue().longValue());
        break;

      case BINARY:
        if (fp.docValuesWriter == null) {
          fp.docValuesWriter = new BinaryDocValuesWriter(fp.fieldInfo, bytesUsed);
        }
        ((BinaryDocValuesWriter) fp.docValuesWriter).addValue(docID, field.binaryValue());
        break;

      case SORTED:
        if (fp.docValuesWriter == null) {
          fp.docValuesWriter = new SortedDocValuesWriter(fp.fieldInfo, bytesUsed);
        }
        ((SortedDocValuesWriter) fp.docValuesWriter).addValue(docID, field.binaryValue());
        break;

      case SORTED_NUMERIC:
        if (fp.docValuesWriter == null) {
          fp.docValuesWriter = new SortedNumericDocValuesWriter(fp.fieldInfo, bytesUsed);
        }
        ((SortedNumericDocValuesWriter) fp.docValuesWriter).addValue(docID, field.numericValue().longValue());
        break;

      case SORTED_SET:
        if (fp.docValuesWriter == null) {
          fp.docValuesWriter = new SortedSetDocValuesWriter(fp.fieldInfo, bytesUsed);
        }
        ((SortedSetDocValuesWriter) fp.docValuesWriter).addValue(docID, field.binaryValue());
        break;

      default:
        throw new AssertionError();
    }
  }

為了方便分析,下面假設PerField中的docValuesWriter被定義為BinaryDocValuesWriter。

回到writeDocValues函式中,再往下通過docValuesFormat函式返回一個PerFieldDocValuesFormat,並通過PerFieldDocValuesFormat的fieldsConsumer獲得一個DocValuesConsumer。
DefaultIndexingChain::flush->writeDocValues->PerFieldDocValuesFormat::fieldsConsumer

  public final DocValuesConsumer fieldsConsumer(SegmentWriteState state) throws IOException {
    return new FieldsWriter(state);
  }

fieldsConsumer最後返回的其實是一個FieldsWriter。

回到DefaultIndexingChain的writeDocValues函式中,接下來繼續呼叫docValuesWriter也即前面假設的BinaryDocValuesWriter的flush函式。
DefaultIndexingChain::flush->writeDocValues->BinaryDocValuesWriter::flush

  public void flush(SegmentWriteState state, DocValuesConsumer dvConsumer) throws IOException {
    final int maxDoc = state.segmentInfo.maxDoc();
    bytes.freeze(false);
    final PackedLongValues lengths = this.lengths.build();
    dvConsumer.addBinaryField(fieldInfo,
                              new Iterable<BytesRef>() {
                                @Override
                                public Iterator<BytesRef> iterator() {
                                   return new BytesIterator(maxDoc, lengths);
                                }
                              });
  }

BinaryDocValuesWriter的flush函式主要呼叫了FieldsWriter的addBinaryField函式新增FieldInfo中的資料。
DefaultIndexingChain::flush->writeDocValues->BinaryDocValuesWriter::flush->FieldsWriter::addBinaryField

    public void addBinaryField(FieldInfo field, Iterable<BytesRef> values) throws IOException {
      getInstance(field).addBinaryField(field, values);
    }

addBinaryField首先通過getInstance函式最終獲得一個Lucene54DocValuesConsumer。
DefaultIndexingChain::flush->writeDocValues->BinaryDocValuesWriter::flush->FieldsWriter::addBinaryField->getInstance

    private DocValuesConsumer getInstance(FieldInfo field) throws IOException {
      DocValuesFormat format = null;
      if (field.getDocValuesGen() != -1) {
        final String formatName = field.getAttribute(PER_FIELD_FORMAT_KEY);
        if (formatName != null) {
          format = DocValuesFormat.forName(formatName);
        }
      }
      if (format == null) {
        format = getDocValuesFormatForField(field.name);
      }

      final String formatName = format.getName();

      String previousValue = field.putAttribute(PER_FIELD_FORMAT_KEY, formatName);

      Integer suffix = null;

      ConsumerAndSuffix consumer = formats.get(format);
      if (consumer == null) {
        if (field.getDocValuesGen() != -1) {
          final String suffixAtt = field.getAttribute(PER_FIELD_SUFFIX_KEY);
          if (suffixAtt != null) {
            suffix = Integer.valueOf(suffixAtt);
          }
        }

        if (suffix == null) {
          suffix = suffixes.get(formatName);
          if (suffix == null) {
            suffix = 0;
          } else {
            suffix = suffix + 1;
          }
        }
        suffixes.put(formatName, suffix);

        final String segmentSuffix = getFullSegmentSuffix(segmentWriteState.segmentSuffix,
                                                          getSuffix(formatName, Integer.toString(suffix)));
        consumer = new ConsumerAndSuffix();
        consumer.consumer = format.fieldsConsumer(new SegmentWriteState(segmentWriteState, segmentSuffix));
        consumer.suffix = suffix;
        formats.put(format, consumer);
      } else {
        suffix = consumer.suffix;
      }

      previousValue = field.putAttribute(PER_FIELD_SUFFIX_KEY, Integer.toString(suffix));

      return consumer.consumer;
    }

假設是第一次進入該函式,format會通過getDocValuesFormatForField函式被定義為Lucene54DocValuesFormat,然後通過Lucene54DocValuesFormat的fieldsConsumer函式構造一個Lucene54DocValuesConsumer並返回。
DefaultIndexingChain::flush->writeDocValues->BinaryDocValuesWriter::flush->FieldsWriter::addBinaryField->getInstance->Lucene54DocValuesFormat::fieldsConsumer

  public DocValuesConsumer fieldsConsumer(SegmentWriteState state) throws IOException {
    return new Lucene54DocValuesConsumer(state, DATA_CODEC, DATA_EXTENSION, META_CODEC, META_EXTENSION);
  }

  public Lucene54DocValuesConsumer(SegmentWriteState state, String dataCodec, String dataExtension, String metaCodec, String metaExtension) throws IOException {
    boolean success = false;
    try {
      String dataName = IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, dataExtension);
      data = state.directory.createOutput(dataName, state.context);
      CodecUtil.writeIndexHeader(data, dataCodec, Lucene54DocValuesFormat.VERSION_CURRENT, state.segmentInfo.getId(), state.segmentSuffix);
      String metaName = IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, metaExtension);
      meta = state.directory.createOutput(metaName, state.context);
      CodecUtil.writeIndexHeader(meta, metaCodec, Lucene54DocValuesFormat.VERSION_CURRENT, state.segmentInfo.getId(), state.segmentSuffix);
      maxDoc = state.segmentInfo.maxDoc();
      success = true;
    } finally {

    }
  }

和前面的分析類似,這裡建立了.dvd和.dvm的檔案輸出流並寫入相應的頭資訊。
Lucene54DocValuesConsumer的addBinaryField函式就不往下看了,就是呼叫檔案輸出流寫入相應的資料。

DefaultIndexingChain的writePoints函式

DefaultIndexingChain::flush->writePoints

  private void writePoints(SegmentWriteState state) throws IOException {
    PointsWriter pointsWriter = null;
    boolean success = false;
    try {
      for (int i=0;i<fieldHash.length;i++) {
        PerField perField = fieldHash[i];
        while (perField != null) {
          if (perField.pointValuesWriter != null) {
            if (pointsWriter == null) {
              PointsFormat fmt = state.segmentInfo.getCodec().pointsFormat();
              pointsWriter = fmt.fieldsWriter(state);
            }

            perField.pointValuesWriter.flush(state, pointsWriter);
            perField.pointValuesWriter = null;
          } 
          perField = perField.next;
        }
      }
      if (pointsWriter != null) {
        pointsWriter.finish();
      }
      success = true;
    } finally {

    }
  }

和前面的分析類似,writePoints函式中的pointsFormat最終返回Lucene60PointsFormat,然後通過其fieldsWriter函式獲得一個Lucene60PointsWriter。

DefaultIndexingChain::writePoints->Lucene60PointsFormat::fieldsWriter

  public PointsWriter fieldsWriter(SegmentWriteState state) throws IOException {
    return new Lucene60PointsWriter(state);
  }

PerField中的成員變數pointValuesWriter被設定為PointValuesWriter,對應的flush函式如下所示,
DefaultIndexingChain::writePoints->PointValuesWriter::flush

  public void flush(SegmentWriteState state, PointsWriter writer) throws IOException {

    writer.writeField(fieldInfo,
                      new PointsReader() {
                        @Override
                        public void intersect(String fieldName, IntersectVisitor visitor) throws IOException {
                          if (fieldName.equals(fieldInfo.name) == false) {
                            throw new IllegalArgumentException();
                          }
                          for(int i=0;i<numPoints;i++) {
                            bytes.readBytes(packedValue.length * i, packedValue, 0, packedValue.length);
                            visitor.visit(docIDs[i], packedValue);
                          }
                        }

                        @Override
                        public void checkIntegrity() {
                          throw new UnsupportedOperationException();
                        }

                        @Override
                        public long ramBytesUsed() {
                          return 0L;
                        }

                        @Override
                        public void close() {
                        }

                        @Override
                        public byte[] getMinPackedValue(String fieldName) {
                          throw new UnsupportedOperationException();
                        }

                        @Override
                        public byte[] getMaxPackedValue(String fieldName) {
                          throw new UnsupportedOperationException();
                        }

                        @Override
                        public int getNumDimensions(String fieldName) {
                          throw new UnsupportedOperationException();
                        }

                        @Override
                        public int getBytesPerDimension(String fieldName) {
                          throw new UnsupportedOperationException();
                        }

                        @Override
                        public long size(String fieldName) {
                          return numPoints;
                        }

                        @Override
                        public int getDocCount(String fieldName) {
                          return numDocs;
                        }
                      });
  }

PointValuesWriter的flush函式繼而會呼叫Lucene60PointsWriter的writeField函式,如下所示,
DefaultIndexingChain::writePoints->PointValuesWriter::flush->Lucene60PointsWriter::writeField

  public void writeField(FieldInfo fieldInfo, PointsReader values) throws IOException {

    boolean singleValuePerDoc = values.size(fieldInfo.name) == values.getDocCount(fieldInfo.name);

    try (BKDWriter writer = new BKDWriter(writeState.segmentInfo.maxDoc(),
                                          writeState.directory,
                                          writeState.segmentInfo.name,
                                          fieldInfo.getPointDimensionCount(),
                                          fieldInfo.getPointNumBytes(),
                                          maxPointsInLeafNode,
                                          maxMBSortInHeap,
                                          values.size(fieldInfo.name),
                                          singleValuePerDoc)) {

      values.intersect(fieldInfo.name, new IntersectVisitor() {
          @Override
          public void visit(int docID) {
            throw new IllegalStateException();
          }

          public void visit(int docID, byte[] packedValue) throws IOException {
            writer.add(packedValue, docID);
          }

          @Override
          public Relation compare(byte[] minPackedValue, byte[] maxPackedValue) {
            return Relation.CELL_CROSSES_QUERY;
          }
        });

      if (writer.getPointCount() > 0) {
        indexFPs.put(fieldInfo.name, writer.finish(dataOut));
      }
    }
  }

結合PointValuesWriter的flush函式中PointsReader的定義,以及Lucene60PointsWriter中writeField函式中visit函式的定義,writeField函式最終會呼叫BKDWriter的add函式,BKD是一種資料結構,add函式定義如下,

  public void add(byte[] packedValue, int docID) throws IOException {

    if (pointCount >= maxPointsSortInHeap) {
      if (offlinePointWriter == null) {
        spillToOffline();
      }
      offlinePointWriter.append(packedValue, pointCount, docID);
    } else {
      heapPointWriter.append(packedValue, pointCount, docID);
    }

    if (pointCount == 0) {
      System.arraycopy(packedValue, 0, minPackedValue, 0, packedBytesLength);
      System.arraycopy(packedValue, 0, maxPackedValue, 0, packedBytesLength);
    } else {
      for(int dim=0;dim<numDims;dim++) {
        int offset = dim*bytesPerDim;
        if (StringHelper.compare(bytesPerDim, packedValue, offset, minPackedValue, offset) < 0) {
          System.arraycopy(packedValue, offset, minPackedValue, offset, bytesPerDim);
        }
        if (StringHelper.compare(bytesPerDim, packedValue, offset, maxPackedValue, offset) > 0) {
          System.arraycopy(packedValue, offset, maxPackedValue, offset, bytesPerDim);
        }
      }
    }

    pointCount++;
    docsSeen.set(docID);
  }

成員變數heapPointWriter的型別為HeapPointWriter,用來將資料寫入記憶體;offlinePointWriter的型別為OfflinePointWriter,用來將資料寫入硬碟。一開始,資料將會通過HeapPointWriter被寫入記憶體,當記憶體中的資料超過maxPointsSortInHeap時,就呼叫spillToOffline函式進行切換。

  private void spillToOffline() throws IOException {

    offlinePointWriter = new OfflinePointWriter(tempDir, tempFileNamePrefix, packedBytesLength, longOrds, "spill", 0, singleValuePerDoc);
    tempInput = offlinePointWriter.out;
    PointReader reader = heapPointWriter.getReader(0, pointCount);
    for(int i=0;i<pointCount;i++) {
      boolean hasNext = reader.next();
      offlinePointWriter.append(reader.packedValue(), i, heapPointWriter.docIDs[i]);
    }

    heapPointWriter = null;
  }

OfflinePointWriter的建構函式會建立類似”段名bkd_spill臨時檔案數量.tmp”的檔名對應的輸出流,然後通過append函式複製HeapPointWriter中的資料。

回到DefaultIndexingChain的writePoints函式中,接下來通過finish函式將資料寫入最終的.dim檔案中,程式碼如下,

  public void finish() throws IOException {
    finished = true;
    CodecUtil.writeFooter(dataOut);

    String indexFileName = IndexFileNames.segmentFileName(writeState.segmentInfo.name,
                                                          writeState.segmentSuffix,
                                                          Lucene60PointsFormat.INDEX_EXTENSION);
    try (IndexOutput indexOut = writeState.directory.createOutput(indexFileName, writeState.context)) {
      CodecUtil.writeIndexHeader(indexOut,
                                 Lucene60PointsFormat.META_CODEC_NAME,
                                 Lucene60PointsFormat.INDEX_VERSION_CURRENT,
                                 writeState.segmentInfo.getId(),
                                 writeState.segmentSuffix);
      int count = indexFPs.size();
      indexOut.writeVInt(count);
      for(Map.Entry<String,Long> ent : indexFPs.entrySet()) {
        FieldInfo fieldInfo = writeState.fieldInfos.fieldInfo(ent.getKey());
        indexOut.writeVInt(fieldInfo.number);
        indexOut.writeVLong(ent.getValue());
      }
      CodecUtil.writeFooter(indexOut);
    }
  }

資料寫入.fdt以及.fdx檔案

繼續看DefaultIndexingChain的flush函式,接下來通過initStoredFieldsWriter函式初始化一個StoredFieldsWriter,程式碼如下
DefaultIndexingChain::flush->initStoredFieldsWriter

  private void initStoredFieldsWriter() throws IOException {
    if (storedFieldsWriter == null) {
      storedFieldsWriter = docWriter.codec.storedFieldsFormat().fieldsWriter(docWriter.directory, docWriter.getSegmentInfo(), IOContext.DEFAULT);
    }
  }

storedFieldsFormat函式返回Lucene50StoredFieldsFormat,其fieldsWriter函式會接著呼叫CompressingStoredFieldsFormat的fieldsWriter函式,最後返回CompressingStoredFieldsWriter,
DefaultIndexingChain::flush->initStoredFieldsWriter->Lucene60Codec::storedFieldsFormat->Lucene50StoredFieldsFormat::fieldsWriter->CompressingStoredFieldsFormat::fieldsWriter

  public StoredFieldsWriter fieldsWriter(Directory directory, SegmentInfo si,
      IOContext context) throws IOException {
    return new CompressingStoredFieldsWriter(directory, si, segmentSuffix, context,
        formatName, compressionMode, chunkSize, maxDocsPerChunk, blockSize);
  }

CompressingStoredFieldsWriter的建構函式如下所示,
DefaultIndexingChain::flush->initStoredFieldsWriter->Lucene60Codec::storedFieldsFormat->Lucene50StoredFieldsFormat::fieldsWriter->CompressingStoredFieldsFormat::fieldsWriter->CompressingStoredFieldsWriter::CompressingStoredFieldsWriter

  public CompressingStoredFieldsWriter(Directory directory, SegmentInfo si, String segmentSuffix, IOContext context, String formatName, CompressionMode compressionMode, int chunkSize, int maxDocsPerChunk, int blockSize) throws IOException {
    assert directory != null;
    this.segment = si.name;
    this.compressionMode = compressionMode;
    this.compressor = compressionMode.newCompressor();
    this.chunkSize = chunkSize;
    this.maxDocsPerChunk = maxDocsPerChunk;
    this.docBase = 0;
    this.bufferedDocs = new GrowableByteArrayDataOutput(chunkSize);
    this.numStoredFields = new int[16];
    this.endOffsets = new int[16];
    this.numBufferedDocs = 0;

    boolean success = false;
    IndexOutput indexStream = directory.createOutput(IndexFileNames.segmentFileName(segment, segmentSuffix, FIELDS_INDEX_EXTENSION), context);
    try {
      fieldsStream = directory.createOutput(IndexFileNames.segmentFileName(segment, segmentSuffix, FIELDS_EXTENSION),context);

      final String codecNameIdx = formatName + CODEC_SFX_IDX;
      final String codecNameDat = formatName + CODEC_SFX_DAT;
      CodecUtil.writeIndexHeader(indexStream, codecNameIdx, VERSION_CURRENT, si.getId(), segmentSuffix);
      CodecUtil.writeIndexHeader(fieldsStream, codecNameDat, VERSION_CURRENT, si.getId(), segmentSuffix);

      indexWriter = new CompressingStoredFieldsIndexWriter(indexStream, blockSize);
      indexStream = null;

      fieldsStream.writeVInt(chunkSize);
      fieldsStream.writeVInt(PackedInts.VERSION_CURRENT);

      success = true;
    } finally {

    }
  }

CompressingStoredFieldsWriter的建構函式建立了.fdt和.fdx兩個檔案並建立輸出流。

回到DefaultIndexingChain的flush函式中,接下來呼叫fillStoredFields,進而呼叫startStoredFields以及finishStoredFields函式,startStoredFields函式會呼叫剛剛上面構造的CompressingStoredFieldsWriter的startDocument函式,該函式為空,finishStoredFields函式會呼叫CompressingStoredFieldsWriter的finishDocument函式,程式碼如下
DefaultIndexingChain::flush->fillStoredFields->startStoredFields->CompressingStoredFieldsWriter::finishDocument