1. 程式人生 > >Lucene學習總結之四:Lucene索引過程分析

Lucene學習總結之四:Lucene索引過程分析

3、將文件加入IndexWriter

程式碼:

writer.addDocument(doc); 
-->IndexWriter.addDocument(Document doc, Analyzer analyzer) 
     -->doFlush = docWriter.addDocument(doc, analyzer); 
          --> DocumentsWriter.updateDocument(Document, Analyzer, Term) 
注:--> 代表一級函式呼叫

IndexWriter繼而呼叫DocumentsWriter.addDocument,其又呼叫DocumentsWriter.updateDocument。

4、將文件加入DocumentsWriter

程式碼:

DocumentsWriter.updateDocument(Document doc, Analyzer analyzer, Term delTerm) 
-->(1) DocumentsWriterThreadState state = getThreadState(doc, delTerm); 
-->(2) DocWriter perDoc = state.consumer.processDocument(); 
-->(3) finishDocument(state, perDoc);

DocumentsWriter物件主要包含以下幾部分:

  • 用於寫索引檔案
    • IndexWriter writer;
    • Directory directory;
    • Similarity similarity:分詞器
    • String segment:當前的段名,每當flush的時候,將索引寫入以此為名稱的段。
IndexWriter.doFlushInternal() 
--> String segment = docWriter.getSegment();//return segment 
--> newSegment = new SegmentInfo(segment,……); 
--> docWriter.createCompoundFile(segment);//根據segment建立cfs檔案。
  •  
    • String docStoreSegment:儲存域所要寫入的目標段。(在索引檔案格式一文中已經詳細描述)
    • int docStoreOffset:儲存域在目標段中的偏移量。
    • int nextDocID:下一篇新增到此索引的文件ID號,對於同一個索引資料夾,此變數唯一,且同步訪問。
    • DocConsumer consumer; 這是整個索引過程的核心,是IndexChain整個索引鏈的源頭。

基本索引鏈:

對於一篇文件的索引過程,不是由一個物件來完成的,而是用物件組合的方式形成的一個處理鏈,鏈上的每個物件僅僅處理索引過程的一部分,稱為索引鏈,由於後面還有其他的索引鏈,所以此處的索引鏈我稱為基本索引鏈。

DocConsumer consumer 型別為DocFieldProcessor,是整個索引鏈的源頭,包含如下部分:

  • 對索引域的處理
    • DocFieldConsumer consumer 型別為DocInverter,包含如下部分
      • InvertedDocConsumer consumer型別為TermsHash,包含如下部分
        • TermsHashConsumer consumer型別為FreqProxTermsWriter,負責寫freq, prox資訊
        • TermsHash nextTermsHash
          • TermsHashConsumer consumer型別為TermVectorsTermsWriter,負責寫tvx, tvd, tvf資訊
      • InvertedDocEndConsumer endConsumer 型別為NormsWriter,負責寫nrm資訊
  • 對儲存域的處理
    • FieldInfos fieldInfos = new FieldInfos();
    • StoredFieldsWriter fieldsWriter負責寫fnm, fdt, fdx資訊
  • 刪除文件
    • BufferedDeletes deletesInRAM = new BufferedDeletes();
    • BufferedDeletes deletesFlushed = new BufferedDeletes();

類BufferedDeletes包含了一下的成員變數:

  • HashMap terms = new HashMap();刪除的詞(Term)
  • HashMap queries = new HashMap();刪除的查詢(Query)
  • List docIDs = new ArrayList();刪除的文件ID
  • long bytesUsed:用於判斷是否應該對刪除的文件寫入索引檔案。

由此可見,文件的刪除主要有三種方式:

  • IndexWriter.deleteDocuments(Term term):所有包含此詞的文件都會被刪除。
  • IndexWriter.deleteDocuments(Query query):所有能滿足此查詢的文件都會被刪除。
  • IndexReader.deleteDocument(int docNum):刪除此文件ID

刪除文件既可以用reader進行刪除,也可以用writer進行刪除,不同的是,reader進行刪除後,此reader馬上能夠生效,而用writer刪除後,會被快取在deletesInRAM及deletesFlushed中,只有寫入到索引檔案中,當reader再次開啟的時候,才能夠看到。

那deletesInRAM和deletesFlushed各有什麼用處呢?

此版本的Lucene對文件的刪除是支援多執行緒的,當用IndexWriter刪除文件的時候,都是快取在deletesInRAM中的,直到flush,才將刪除的文件寫入到索引檔案中去,我們知道flush是需要一段時間的,那麼在flush的過程中,另一個執行緒又有文件刪除怎麼辦呢?

一般過程是這個樣子的,當flush的時候,首先在同步(synchornized)的方法pushDeletes中,將deletesInRAM全部加到deletesFlushed中,然後將deletesInRAM清空,退出同步方法,於是flush的執行緒程就向索引檔案寫deletesFlushed中的刪除文件的過程,而與此同時其他執行緒新刪除的文件則新增到新的deletesInRAM中去,直到下次flush才寫入索引檔案。

  • 快取管理
    • 為了提高索引的速度,Lucene對很多的資料進行了快取,使一起寫入磁碟,然而快取需要進行管理,何時分配,何時回收,何時寫入磁碟都需要考慮。
    • ArrayList freeCharBlocks = new ArrayList();將用於快取詞(Term)資訊的空閒塊
    • ArrayList freeByteBlocks = new ArrayList();將用於快取文件號(doc id)及詞頻(freq),位置(prox)資訊的空閒塊。
    • ArrayList freeIntBlocks = new ArrayList();將儲存某詞的詞頻(freq)和位置(prox)分別在byteBlocks中的偏移量
    • boolean bufferIsFull;用來判斷快取是否滿了,如果滿了,則應該寫入磁碟
    • long numBytesAlloc;分配的記憶體數量
    • long numBytesUsed;使用的記憶體數量
    • long freeTrigger;應該開始回收記憶體時的記憶體用量。
    • long freeLevel;回收記憶體應該回收到的記憶體用量。
    • long ramBufferSize;使用者設定的記憶體用量。
快取用量之間的關係如下: 
DocumentsWriter.setRAMBufferSizeMB(double mb){ 

    ramBufferSize = (long) (mb*1024*1024);//使用者設定的記憶體用量,當使用記憶體大於此時,開始寫入磁碟 
    waitQueuePauseBytes = (long) (ramBufferSize*0.1); 
    waitQueueResumeBytes = (long) (ramBufferSize*0.05); 
    freeTrigger = (long) (1.05 * ramBufferSize);//當分配的記憶體到達105%的時候開始釋放freeBlocks中的記憶體 
    freeLevel = (long) (0.95 * ramBufferSize);//一直釋放到95%



DocumentsWriter.balanceRAM(){ 
    if (numBytesAlloc+deletesRAMUsed > freeTrigger) { 
    //當分配的記憶體加刪除文件所佔用的記憶體大於105%的時候,開始釋放記憶體 
        while(numBytesAlloc+deletesRAMUsed > freeLevel) { 
        //一直進行釋放,直到95% 

            //釋放free blocks

            byteBlockAllocator.freeByteBlocks.remove(byteBlockAllocator.freeByteBlocks.size()-1); 
            numBytesAlloc -= BYTE_BLOCK_SIZE;

            freeCharBlocks.remove(freeCharBlocks.size()-1); 
            numBytesAlloc -= CHAR_BLOCK_SIZE * CHAR_NUM_BYTE;

            freeIntBlocks.remove(freeIntBlocks.size()-1); 
            numBytesAlloc -= INT_BLOCK_SIZE * INT_NUM_BYTE; 
        } 
    } else {

        if (numBytesUsed+deletesRAMUsed > ramBufferSize){

        //當使用的記憶體加刪除文件佔有的記憶體大於使用者指定的記憶體時,可以寫入磁碟

              bufferIsFull = true;

        }

    } 
}

當判斷是否應該寫入磁碟時:

  • 如果使用的記憶體大於使用者指定記憶體時,bufferIsFull = true
  • 當使用的記憶體加刪除文件所佔的記憶體加正在寫入的刪除文件所佔的記憶體大於使用者指定記憶體時 deletesInRAM.bytesUsed + deletesFlushed.bytesUsed + numBytesUsed) >= ramBufferSize
  • 當刪除的文件數目大於maxBufferedDeleteTerms時

DocumentsWriter.timeToFlushDeletes(){

    return (bufferIsFull || deletesFull()) && setFlushPending();

}

DocumentsWriter.deletesFull(){

    return (ramBufferSize != IndexWriter.DISABLE_AUTO_FLUSH && 
        (deletesInRAM.bytesUsed + deletesFlushed.bytesUsed + numBytesUsed) >= ramBufferSize) || 
        (maxBufferedDeleteTerms != IndexWriter.DISABLE_AUTO_FLUSH && 
        ((deletesInRAM.size() + deletesFlushed.size()) >= maxBufferedDeleteTerms));

}

  • 多執行緒併發索引
    • 為了支援多執行緒併發索引,對每一個執行緒都有一個DocumentsWriterThreadState,其為每一個執行緒根據DocConsumer consumer的索引鏈來建立每個執行緒的索引鏈(XXXPerThread),來進行對文件的併發處理。
    • DocumentsWriterThreadState[] threadStates = new DocumentsWriterThreadState[0];
    • HashMap threadBindings = new HashMap();
    • 雖然對文件的處理過程可以並行,但是將文件寫入索引檔案卻必須序列進行,序列寫入的程式碼在DocumentsWriter.finishDocument中
    • WaitQueue waitQueue = new WaitQueue()
    • long waitQueuePauseBytes
    • long waitQueueResumeBytes

在Lucene中,文件是按新增的順序編號的,DocumentsWriter中的nextDocID就是記錄下一個新增的文件id。 當Lucene支援多執行緒的時候,就必須要有一個synchornized方法來付給文件id並且將nextDocID加一,這些是在DocumentsWriter.getThreadState這個函式裡面做的。

雖然給文件付ID沒有問題了。但是由Lucene索引檔案格式我們知道,文件是要按照ID的順序從小到大寫到索引檔案中去的,然而不同的文件處理速度不同,當一個先來的執行緒一處理一篇需要很長時間的大文件時,另一個後來的執行緒二可能已經處理了很多小的文件了,但是這些後來小文件的ID號都大於第一個執行緒所處理的大文件,因而不能馬上寫到索引檔案中去,而是放到waitQueue中,僅僅當大文件處理完了之後才寫入索引檔案。

waitQueue中有一個變數nextWriteDocID表示下一個可以寫入檔案的ID,當付給大文件ID=4時,則nextWriteDocID也設為4,雖然後來的小文件5,6,7,8等都已處理結束,但是如下程式碼,

WaitQueue.add(){

    if (doc.docID == nextWriteDocID){ 
       ………… 
    } else { 
        waiting[loc] = doc; 
        waitingBytes += doc.sizeInBytes(); 
   }

   doPause()

}

則把5, 6, 7, 8放入waiting佇列,並且記錄當前等待的文件所佔用的記憶體大小waitingBytes。

當大文件4處理完畢後,不但寫入文件4,把原來等待的文件5, 6, 7, 8也一起寫入。

WaitQueue.add(){

    if (doc.docID == nextWriteDocID) {

       writeDocument(doc);

       while(true) {

           doc = waiting[nextWriteLoc];

           writeDocument(doc);

       }

   } else {

      …………

   }

   doPause()

}

但是這存在一個問題:當大文件很大很大,處理的很慢很慢的時候,後來的執行緒二可能已經處理了很多的小文件了,這些文件都是在waitQueue中,則佔有了越來越多的記憶體,長此以往,有記憶體不夠的危險。

因而在finishDocuments裡面,在WaitQueue.add最後呼叫了doPause()函式

DocumentsWriter.finishDocument(){

    doPause = waitQueue.add(docWriter);

    if (doPause) 
        waitForWaitQueue();

    notifyAll();

}

WaitQueue.doPause() { 
    return waitingBytes > waitQueuePauseBytes; 
}

當waitingBytes足夠大的時候(為使用者指定的記憶體使用量的10%),doPause返回true,於是後來的執行緒二會進入wait狀態,不再處理另外的文件,而是等待執行緒一處理大文件結束。

當執行緒一處理大文件結束的時候,呼叫notifyAll喚醒等待他的執行緒。

DocumentsWriter.waitForWaitQueue() { 
  do { 
    try { 
      wait(); 
    } catch (InterruptedException ie) { 
      throw new ThreadInterruptedException(ie); 
    } 
  } while (!waitQueue.doResume()); 
}

WaitQueue.doResume() { 
     return waitingBytes <= waitQueueResumeBytes; 
}

當waitingBytes足夠小的時候,doResume返回true, 則執行緒二不用再wait了,可以繼續處理另外的文件。

  • 一些標誌位
    • int maxFieldLength:一篇文件中,一個域內可索引的最大的詞(Term)數。
    • int maxBufferedDeleteTerms:可快取的最大的刪除詞(Term)數。當大於這個數的時候,就要寫到檔案中了。

此過程又包含如下三個子過程:

4.1、得到當前執行緒對應的文件集處理物件(DocumentsWriterThreadState)

程式碼為:

DocumentsWriterThreadState state = getThreadState(doc, delTerm);

在Lucene中,對於同一個索引資料夾,只能夠有一個IndexWriter開啟它,在開啟後,在資料夾中,生成檔案write.lock,當其他IndexWriter再試圖開啟此索引資料夾的時候,則會報org.apache.lucene.store.LockObtainFailedException錯誤。

這樣就出現了這樣一個問題,在同一個程序中,對同一個索引資料夾,只能有一個IndexWriter開啟它,因而如果想多執行緒向此索引資料夾中新增文件,則必須共享一個IndexWriter,而且在以往的實現中,addDocument函式是同步的(synchronized),也即多執行緒的索引並不能起到提高效能的效果。

於是為了支援多執行緒索引,不使IndexWriter成為瓶頸,對於每一個執行緒都有一個相應的文件集處理物件(DocumentsWriterThreadState),這樣對文件的索引過程可以多執行緒並行進行,從而增加索引的速度。

getThreadState函式是同步的(synchronized),DocumentsWriter有一個成員變數threadBindings,它是一個HashMap,鍵為執行緒物件(Thread.currentThread()),值為此執行緒對應的DocumentsWriterThreadState物件。

DocumentsWriterThreadState DocumentsWriter.getThreadState(Document doc, Term delTerm)包含如下幾個過程:

  • 根據當前執行緒物件,從HashMap中查詢相應的DocumentsWriterThreadState物件,如果沒找到,則生成一個新物件,並新增到HashMap中
DocumentsWriterThreadState state = (DocumentsWriterThreadState) threadBindings.get(Thread.currentThread()); 
if (state == null) { 
    …… 
    state = new DocumentsWriterThreadState(this); 
    …… 
    threadBindings.put(Thread.currentThread(), state); 
  • 如果此執行緒物件正在用於處理上一篇文件,則等待,直到此執行緒的上一篇文件處理完。
DocumentsWriter.getThreadState() { 
    waitReady(state); 
    state.isIdle = false; 


waitReady(state) { 
    while (!state.isIdle) {wait();} 
}  

顯然如果state.isIdle為false,則此執行緒等待。 
在一篇文件處理之前,state.isIdle = false會被設定,而在一篇文件處理完畢之後,DocumentsWriter.finishDocument(DocumentsWriterThreadState perThread, DocWriter docWriter)中,會首先設定perThread.isIdle = true; 然後notifyAll()來喚醒等待此文件完成的執行緒,從而處理下一篇文件。
  • 如果IndexWriter剛剛commit過,則新新增的文件要加入到新的段中(segment),則首先要生成新的段名。
initSegmentName(false); 
--> if (segment == null) segment = writer.newSegmentName();
  • 將此執行緒的文件處理物件設為忙碌:state.isIdle = false;

4.2、用得到的文件集處理物件(DocumentsWriterThreadState)處理文件

程式碼為:

DocWriter perDoc = state.consumer.processDocument();

每一個文件集處理物件DocumentsWriterThreadState都有一個文件及域處理物件DocFieldProcessorPerThread,它的成員函式processDocument()被呼叫來對文件及域進行處理。

執行緒索引鏈(XXXPerThread):

由於要多執行緒進行索引,因而每個執行緒都要有自己的索引鏈,稱為執行緒索引鏈。

執行緒索引鏈同基本索引鏈有相似的樹形結構,由基本索引鏈中每個層次的物件呼叫addThreads進行建立的,負責每個執行緒的對文件的處理。

DocFieldProcessorPerThread是執行緒索引鏈的源頭,由DocFieldProcessor.addThreads(…)建立

DocFieldProcessorPerThread物件結構如下:

  • 對索引域進行處理
    • DocFieldConsumerPerThread consumer 型別為 DocInverterPerThread,由DocInverter.addThreads建立
      • InvertedDocConsumerPerThread consumer 型別為TermsHashPerThread,由TermsHash.addThreads建立
        • TermsHashConsumerPerThread consumer型別為FreqProxTermsWriterPerThread,由FreqProxTermsWriter.addThreads建立,負責每個執行緒的freq,prox資訊處理
        • TermsHashPerThread nextPerThread
          • TermsHashConsumerPerThread consumer型別TermVectorsTermsWriterPerThread,由TermVectorsTermsWriter建立,負責每個執行緒的tvx,tvd,tvf資訊處理
      • InvertedDocEndConsumerPerThread endConsumer 型別為NormsWriterPerThread,由NormsWriter.addThreads建立,負責nrm資訊的處理
  • 對儲存域進行處理
    • StoredFieldsWriterPerThread fieldsWriter由StoredFieldsWriter.addThreads建立,負責fnm,fdx,fdt的處理。
    • FieldInfos fieldInfos;

DocumentsWriter.DocWriter DocFieldProcessorPerThread.processDocument()包含以下幾個過程:

4.2.1、開始處理當前文件

consumer(DocInverterPerThread).startDocument(); 
fieldsWriter(StoredFieldsWriterPerThread).startDocument();

在此版的Lucene中,幾乎所有的XXXPerThread的類,都有startDocument和finishDocument兩個函式,因為對同一個執行緒,這些物件都是複用的,而非對每一篇新來的文件都建立一套,這樣也提高了效率,也牽扯到資料的清理問題。一般在startDocument函式中,清理處理上篇文件遺留的資料,在finishDocument中,收集本次處理的結果資料,並返回,一直返回到DocumentsWriter.updateDocument(Document, Analyzer, Term) 然後根據條件判斷是否將資料重新整理到硬碟上。

4.2.2、逐個處理文件的每一個域

由於一個執行緒可以連續處理多個文件,而在普通的應用中,幾乎每篇文件的域都是大致相同的,為每篇文件的每個域都建立一個處理物件非常低效,因而考慮到複用域處理物件DocFieldProcessorPerField,對於每一個域都有一個此物件。

那當來到一個新的域的時候,如何更快的找到此域的處理物件呢?Lucene建立了一個DocFieldProcessorPerField[] fieldHash雜湊表來方便更快查詢域對應的處理物件。

當處理各個域的時候,按什麼順序呢?其實是按照域名的字典順序。因而Lucene建立了DocFieldProcessorPerField[] fields的陣列來方便按順序處理域。

因而一個域的處理物件被放在了兩個地方。

對於域的處理過程如下:

4.2.2.1、首先:對於每一個域,按照域名,在fieldHash中查詢域處理物件DocFieldProcessorPerField,程式碼如下:

final int hashPos = fieldName.hashCode() & hashMask;//計算雜湊值 
DocFieldProcessorPerField fp = fieldHash[hashPos];//找到雜湊表中對應的位置 
while(fp != null && !fp.fieldInfo.name.equals(fieldName)) fp = fp.next;//鏈式雜湊表

如果能夠找到,則更新DocFieldProcessorPerField中的域資訊fp.fieldInfo.update(field.isIndexed()…)

如果沒有找到,則新增域到DocFieldProcessorPerThread.fieldInfos中,並建立新的DocFieldProcessorPerField,且將其加入雜湊表。程式碼如下:

fp = new DocFieldProcessorPerField(this, fi); 
fp.next = fieldHash[hashPos]; 
fieldHash[hashPos] = fp;

如果是一個新的field,則將其加入fields陣列fields[fieldCount++] = fp;

並且如果是儲存域的話,用StoredFieldsWriterPerThread將其寫到索引中:

if (field.isStored()) { 
  fieldsWriter.addField(field, fp.fieldInfo); 
}

4.2.2.1.1、處理儲存域的過程如下:

StoredFieldsWriterPerThread.addField(Fieldable field, FieldInfo fieldInfo) 
--> localFieldsWriter.writeField(fieldInfo, field);

FieldsWriter.writeField(FieldInfo fi, Fieldable field)程式碼如下:

請參照fdt檔案的格式,則一目瞭然:

fieldsStream.writeVInt(fi.number);//文件號 
byte bits = 0; 
if (field.isTokenized()) 
    bits |= FieldsWriter.FIELD_IS_TOKENIZED; 
if (field.isBinary()) 
    bits |= FieldsWriter.FIELD_IS_BINARY; 
if (field.isCompressed()) 
    bits |= FieldsWriter.FIELD_IS_COMPRESSED;

fieldsStream.writeByte(bits); //域的屬性位

if (field.isCompressed()) {//對於壓縮域 
    // compression is enabled for the current field 
    final byte[] data; 
    final int len; 
    final int offset; 
    // check if it is a binary field 
    if (field.isBinary()) { 
        data = CompressionTools.compress(field.getBinaryValue(), field.getBinaryOffset(), field.getBinaryLength()); 
    } else { 
        byte x[] = field.stringValue().getBytes("UTF-8"); 
        data = CompressionTools.compress(x, 0, x.length); 
    } 
    len = data.length; 
    offset = 0; 
    fieldsStream.writeVInt(len);//寫長度 
    fieldsStream.writeBytes(data, offset, len);//寫二進位制內容 
} else {//對於非壓縮域 
    // compression is disabled for the current field 
    if (field.isBinary()) {//如果是二進位制域 
        final byte[] data; 
        final int len; 
        final int offset; 
        data = field.getBinaryValue(); 
        len = field.getBinaryLength(); 
        offset = field.getBinaryOffset();

        fieldsStream.writeVInt(len);//寫長度 
        fieldsStream.writeBytes(data, offset, len);//寫二進位制內容 
    } else { 
        fieldsStream.writeString(field.stringValue());//寫字元內容 
    } 
}

4.2.2.2、然後:對fields陣列進行排序,是域按照名稱排序。quickSort(fields, 0, fieldCount-1);

4.2.2.3、最後:按照排序號的順序,對域逐個處理,此處處理的僅僅是索引域,程式碼如下:

for(int i=0;i      fields[i].consumer.processFields(fields[i].fields, fields[i].fieldCount);

域處理物件(DocFieldProcessorPerField)結構如下:

域索引鏈:

每個域也有自己的索引鏈,稱為域索引鏈,每個域的索引鏈也有同執行緒索引鏈有相似的樹形結構,由執行緒索引鏈中每個層次的每個層次的物件呼叫addField進行建立,負責對此域的處理。

和基本索引鏈及執行緒索引鏈不同的是,域索引鏈僅僅負責處理索引域,而不負責儲存域的處理。

DocFieldProcessorPerField是域索引鏈的源頭,物件結構如下:

  • DocFieldConsumerPerField consumer型別為DocInverterPerField,由DocInverterPerThread.addField建立
    • InvertedDocConsumerPerField consumer 型別為TermsHashPerField,由TermsHashPerThread.addField建立
      • TermsHashConsumerPerField consumer 型別為FreqProxTermsWriterPerField,由FreqProxTermsWriterPerThread.addField建立,負責freq, prox資訊的處理
      • TermsHashPerField nextPerField
        • TermsHashConsumerPerField consumer 型別為TermVectorsTermsWriterPerField,由TermVectorsTermsWriterPerThread.addField建立,負責tvx, tvd, tvf資訊的處理
    • InvertedDocEndConsumerPerField endConsumer 型別為NormsWriterPerField,由NormsWriterPerThread.addField建立,負責nrm資訊的處理。

4.2.2.3.1、處理索引域的過程如下:

DocInverterPerField.processFields(Fieldable[], int) 過程如下:

  • 判斷是否要形成倒排表,程式碼如下:
boolean doInvert = consumer.start(fields, count); 
--> TermsHashPerField.start(Fieldable[], int)  
      --> for(int i=0;i             if (fields[i].isIndexed()) 
                 return true; 
            return false;

讀到這裡,大家可能會發生困惑,既然XXXPerField是對於每一個域有一個處理物件的,那為什麼引數傳進來的是Fieldable[]陣列, 並且還有域的數目count呢?

其實這不經常用到,但必須得提一下,由上面的fieldHash的實現我們可以看到,是根據域名進行雜湊的,所以準確的講,XXXPerField並非對於每一個域有一個處理物件,而是對每一組相同名字的域有相同的處理物件。

對於同一篇文件,相同名稱的域可以新增多個,程式碼如下:

doc.add(new Field("contents", "the content of the file.", Field.Store.NO, Field.Index.NOT_ANALYZED)); 
doc.add(new Field("contents", new FileReader(f)));

則傳進來的名為"contents"的域如下:

fields    Fieldable[2]  (id=52)    
    [0]    Field  (id=56)    
        binaryLength    0    
        binaryOffset    0    
        boost    1.0    
        fieldsData    "the content of the file."    
        isBinary    false    
        isCompressed    false    
        isIndexed    true    
        isStored    false    
        isTokenized    false    
        lazy    false    
        name    "contents"    
        omitNorms    false    
        omitTermFreqAndPositions    false    
        storeOffsetWithTermVector    false    
        storePositionWithTermVector    false    
        storeTermVector    false    
        tokenStream    null    
    [1]    Field  (id=58)    
        binaryLength    0    
        binaryOffset    0    
        boost    1.0    
        fieldsData    FileReader  (id=131)    
        isBinary    false    
        isCompressed    false    
        isIndexed    true    
        isStored    false    
        isTokenized    true    
        lazy    false    
        name    "contents"    
        omitNorms    false    
        omitTermFreqAndPositions    false    
        storeOffsetWithTermVector    false    
        storePositionWithTermVector    false    
        storeTermVector    false    
        tokenStream    null   

  • 對傳進來的同名域逐一處理,程式碼如下

for(int i=0;i

    final Fieldable field = fields[i];

    if (field.isIndexed() && doInvert) {

        //僅僅對索引域進行處理

        if (!field.isTokenized()) {

            //如果此域不分詞,見(1)對不分詞的域的處理

        } else {

            //如果此域分詞,見(2)對分詞的域的處理

        }

    }

}

(1) 對不分詞的域的處理

(1-1) 得到域的內容,並構建單個Token形成的SingleTokenAttributeSource。因為不進行分詞,因而整個域的內容算做一個Token.

String stringValue = field.stringValue(); //stringValue    "200910240957"  
final int valueLength = stringValue.length(); 
perThread.singleToken.reinit(stringValue, 0, valueLength);

對於此域唯一的一個Token有以下的屬性:

  • Term:文字資訊。在處理過程中,此值將儲存在TermAttribute的實現類例項化的物件TermAttributeImp裡面。
  • Offset:偏移量資訊,是按字或字母的起始偏移量和終止偏移量,表明此Token在文章中的位置,多用於加亮。在處理過程中,此值將儲存在OffsetAttribute的實現類例項化的物件OffsetAttributeImp裡面。

在SingleTokenAttributeSource裡面,有一個HashMap來儲存可能用於儲存屬性的類名(Key,準確的講是介面)以及儲存屬性資訊的物件(Value):

singleToken    DocInverterPerThread$SingleTokenAttributeSource  (id=150)    
    attributeImpls    LinkedHashMap  (id=945)    
    attributes    LinkedHashMap  (id=946)     
        size    2    
        table    HashMap$Entry[16]  (id=988)    
            [0]    LinkedHashMap$Entry  (id=991)     
                key    Class (org.apache.lucene.analysis.tokenattributes.TermAttribute) (id=755)     
                value    TermAttributeImpl  (id=949)    
                    termBuffer    char[19]  (id=954)    //[2, 0, 0, 9, 1, 0, 2, 4, 0, 9, 5, 7] 
                    termLength    12
     
            [7]    LinkedHashMap$Entry  (id=993)     
                key    Class (org.apache.lucene.analysis.tokenattributes.OffsetAttribute) (id=274)     
                value    OffsetAttributeImpl  (id=948)    
                    endOffset    12    
                    startOffset    0
     
    factory    AttributeSource$AttributeFactory$DefaultAttributeFactory  (id=947)    
    offsetAttribute    OffsetAttributeImpl  (id=948)    
    termAttribute    TermAttributeImpl  (id=949)   

(1-2) 得到Token的各種屬性資訊,為索引做準備。

consumer.start(field)做的主要事情就是根據各種屬性的型別來構造儲存屬性的物件(HashMap中有則取出,無則構造),為索引做準備。

consumer(TermsHashPerField).start(…)

--> termAtt = fieldState.attributeSource.addAttribute(TermAttribute.class);得到的就是上述HashMap中的TermAttributeImpl   

--> consumer(FreqProxTermsWriterPerField).start(f);

      --> if (fieldState.attributeSource.hasAttribute(PayloadAttribute.class)) {

                payloadAttribute = fieldState.attributeSource.getAttribute(PayloadAttribute.class); 
                儲存payload資訊則得到payload的屬}

--> nextPerField(TermsHashPerField).start(f);

      --> termAtt = fieldState.attributeSource.addAttribute(TermAttribute.class);得到的還是上述HashMap中的TermAttributeImpl

      --> consumer(TermVectorsTermsWriterPerField).start(f);

            --> if (doVectorOffsets) {

                      offsetAttribute = fieldState.attributeSource.addAttribute(OffsetAttribute.class); 
                      如果儲存詞向量則得到的是上述HashMap中的OffsetAttributeImp }

(1-3) 將Token加入倒排表

consumer(TermsHashPerField).add();

加入倒排表的過程,無論對於分詞的域和不分詞的域,過程是一樣的,因而放到對分詞的域的解析中一起說明。

(2) 對分詞的域的處理

(2-1) 構建域的TokenStream

final TokenStream streamValue = field.tokenStreamValue();

//使用者可以在新增域的時候,應用建構函式public Field(String name, TokenStream tokenStream) 直接傳進一個TokenStream過來,這樣就不用另外構建一個TokenStream了。

if (streamValue != null) 
  stream = streamValue; 
else {

  ……

  stream = docState.analyzer.reusableTokenStream(fieldInfo.name, reader);

}

此時TokenStream的各項屬性值還都是空的,等待一個一個被分詞後得到,此時的TokenStream物件如下:

stream    StopFilter  (id=112)    
    attributeImpls    LinkedHashMap  (id=121)    
    attributes    LinkedHashMap  (id=122)     
        size    4    
        table    HashMap$Entry[16]  (id=146)     
            [2]    LinkedHashMap$Entry  (id=148)     
                key    Class (org.apache.lucene.analysis.tokenattributes.TypeAttribute) (id=154)     
                value    TypeAttributeImpl  (id=157)    
                    type    "word"     
            [8]    LinkedHashMap$Entry  (id=150)    
                after    LinkedHashMap$Entry  (id=156)     
                    key    Class (org.apache.lucene.analysis.tokenattributes.OffsetAttribute) (id=163)     
                    value    OffsetAttributeImpl  (id=164)    
                        endOffset    0    
                        startOffset    0
     
                key    Class (org.apache.lucene.analysis.tokenattributes.TermAttribute) (id=142)     
                value    TermAttributeImpl  (id=133)    
                    termBuffer    char[17]  (id=173)    
                    termLength    0
     
            [10]    LinkedHashMap$Entry  (id=151)     
                key    Class (org.apache.lucene.analysis.tokenattributes.PositionIncrementAttribute) (id=136)     
                value    PositionIncrementAttributeImpl  (id=129)    
                    positionIncrement    1     
    currentState    AttributeSource$State  (id=123)    
    enablePositionIncrements    true    
    factory    AttributeSource$AttributeFactory$DefaultAttributeFactory  (id=125)    
    input    LowerCaseFilter  (id=127)     
        input    StandardFilter  (id=213)     
            input    StandardTokenizer  (id=218)     
                input    FileReader  (id=93)    //從檔案中讀出來的文字,將經過分詞器分詞,並一層層的Filter的處理,得到一個個Token 
    stopWords    CharArraySet$UnmodifiableCharArraySet  (id=131)    
    termAtt    TermAttributeImpl  (id=133)   

(2-2) 得到第一個Token,並初始化此Token的各項屬性資訊,併為索引做準備(start)。

boolean hasMoreTokens = stream.incrementToken();//得到第一個Token

OffsetAttribute offsetAttribute = fieldState.attributeSource.addAttribute(OffsetAttribute.class);//得到偏移量屬性

offsetAttribute    OffsetAttributeImpl  (id=164)    
    endOffset    8    
    startOffset    0   

PositionIncrementAttribute posIncrAttribute = fieldState.attributeSource.addAttribute(PositionIncrementAttribute.class);//得到位置屬性

posIncrAttribute    PositionIncrementAttributeImpl  (id=129)    
    positionIncrement    1   

consumer.start(field);//其中得到了TermAttribute屬性,如果儲存payload則得到PayloadAttribute屬性,如果儲存詞向量則得到OffsetAttribute屬性。

(2-3) 進行迴圈,不斷的取下一個Token,並新增到倒排表

for(;;) {

    if (!hasMoreTokens) break;

    …… 
    consumer.add();

    …… 
    hasMoreTokens = stream.incrementToken(); 
}

(2-4) 新增Token到倒排表的過程consumer(TermsHashPerField).add()

TermsHashPerField物件主要包括以下部分:

  • CharBlockPool charPool; 用於儲存Token的文字資訊,如果不足時,從DocumentsWriter中的freeCharBlocks分配
  • ByteBlockPool bytePool;用於儲存freq, prox資訊,如果不足時,從DocumentsWriter中的freeByteBlocks分配
  • IntBlockPool intPool; 用於儲存分別指向每個Token在bytePool中freq和prox資訊的偏移量。如果不足時,從DocumentsWriter的freeIntBlocks分配
  • TermsHashConsumerPerField consumer型別為FreqProxTermsWriterPerField,用於寫freq, prox資訊到快取中。
  • RawPostingList[] postingsHash = new RawPostingList[postingsHashSize];儲存倒排表,每一個Term都有一個RawPostingList (PostingList),其中包含了int textStart,也即文字在charPool中的偏移量,int byteStart,即此Term的freq和prox資訊在bytePool中的起始偏移量,int intStart,即此term的在intPool中的起始偏移量。

形成倒排表的過程如下:

//得到token的文字及文字長度

final char[] tokenText = termAtt.termBuffer();//[s, t, u, d, e, n, t, s]

final int tokenTextLen = termAtt.termLength();//tokenTextLen 8

//按照token的文字計算雜湊值,以便在postingsHash中找到此token對應的倒排表

int downto = tokenTextLen; 
int code = 0; 
while (downto > 0) { 
  char ch = tokenText[—downto]; 
  code = (code*31) + ch; 
}

int hashPos = code & postingsHashMask;

//在倒排表雜湊表中查詢此Token,如果找到相應的位置,但是不是此Token,說明此位置存在雜湊衝突,採取重新雜湊rehash的方法。

p = postingsHash[hashPos];

if (p != null && !postingEquals(tokenText, tokenTextLen)) {  
  final int inc = ((code>>8)+code)|1; 
  do { 
    code += inc; 
    hashPos = code & postingsHashMask; 
    p = postingsHash[hashPos]; 
  } while (p != null && !postingEquals(tokenText, tokenTextLen)); 
}

//如果此Token之前從未出現過

if (p == null) {

    if (textLen1 + charPool.charUpto > DocumentsWriter.CHAR_BLOCK_SIZE) {

        //當charPool不足的時候,在freeCharBlocks中分配新的buffer

        charPool.nextBuffer();

    }

    //從空閒的倒排表中分配新的倒排表

    p = perThread.freePostings[--perThread.freePostingsCount];

    //將文字複製到charPool中

    final char[] text = charPool.buffer; 
    final int textUpto = charPool.charUpto; 
    p.textStart = textUpto + charPool.charOffset; 
    charPool.charUpto += textLen1; 
    System.arraycopy(tokenText, 0, text, textUpto, tokenTextLen); 
    text[textUpto+tokenTextLen] = 0xffff;

    //將倒排表放入雜湊表中

    postingsHash[hashPos] = p; 
    numPostings++;

    if (numPostingInt + intPool.intUpto > DocumentsWriter.INT_BLOCK_SIZE) intPool.nextBuffer();

    //當intPool不足的時候,在freeIntBlocks中分配新的buffer。

    if (DocumentsWriter.BYTE_BLOCK_SIZE - bytePool.byteUpto < numPostingInt*ByteBlockPool.FIRST_LEVEL_SIZE)

        bytePool.nextBuffer();

    //當bytePool不足的時候,在freeByteBlocks中分配新的buffer。

    //此處streamCount為2,表明在intPool中,每兩項表示一個詞,一個是指向bytePool中freq資訊偏移量的,一個是指向bytePool中prox資訊偏移量的。

    intUptos = intPool.buffer; 
    intUptoStart = intPool.intUpto; 
    intPool.intUpto += streamCount;

    p.intStart = intUptoStart + intPool.intOffset;

    //在bytePool中分配兩個空間,一個放freq資訊,一個放prox資訊的。  
    for(int i=0;i

        final int upto = bytePool.newSlice(ByteBlockPool.FIRST_LEVEL_SIZE); 
        intUptos[intUptoStart+i] = upto + bytePool.byteOffset; 
    } 
    p.byteStart = intUptos[intUptoStart];

    //當Term原來沒有出現過的時候,呼叫newTerm

    consumer(FreqProxTermsWriterPerField).newTerm(p);

}

//如果此Token之前曾經出現過,則呼叫addTerm。

else {

    intUptos = intPool.buffers[p.intStart >> DocumentsWriter.INT_BLOCK_SHIFT]; 
    intUptoStart = p.intStart & DocumentsWriter.INT_BLOCK_MASK; 
    consumer(FreqProxTermsWriterPerField).addTerm(p);

}

(2-5) 新增新Term的過程,consumer(FreqProxTermsWriterPerField).newTerm

final void newTerm(RawPostingList p0) { 
  FreqProxTermsWriter.PostingList p = (FreqProxTermsWriter.PostingList) p0; 
  p.lastDocID = docState.docID; //當一個新的term出現的時候,包含此Term的就只有本篇文件,記錄其ID 
  p.lastDocCode = docState.docID << 1; //docCode是文件ID左移一位,為什麼左移,請參照索引檔案格式(1)中的或然跟隨規則。 
  p.docFreq = 1; //docFreq這裡用詞可能容易引起誤會,docFreq這裡指的是此文件所包含的此Term的次數,並非包含此Term的文件的個數。 
  writeProx(p, fieldState.position); //寫入prox資訊到bytePool中,此時freq資訊還不能寫入,因為當前的文件還沒有處理完,尚不知道此文件包含此Term的總數。 
}

writeProx(FreqProxTermsWriter.PostingList p, int proxCode) {

  termsHashPerField.writeVInt(1, proxCode<<1);//第一個引數所謂1,也就是寫入此文件在intPool中的第1項——prox資訊。為什麼左移一位呢?是因為後面可能跟著payload資訊,參照索引檔案格式(1)中或然跟隨規則。 
  p.lastPosition = fieldState.position;//總是要記錄lastDocID, lastPostion,是因為要計算差值,參照索引檔案格式(1)中的差值規則。

}

(2-6) 新增已有Term的過程

final void addTerm(RawPostingList p0) {

  FreqProxTermsWriter.PostingList p = (FreqProxTermsWriter.PostingList) p0;

  if (docState.docID != p.lastDocID) {

      //當文件ID變了的時候,說明上一篇文件已經處理完畢,可以寫入freq資訊了。

      //第一個引數所謂0,也就是寫入上一篇文件在intPool中的第0項——freq資訊。至於資訊為何這樣寫,參照索引檔案格式(1)中的或然跟隨規則,及tis檔案格式。

      if (1 == p.docFreq) 
        termsHashPerField.writeVInt(0, p.lastDocCode|1); 
      else { 
        termsHashPerField.writeVInt(0, p.lastDocCode); 
        termsHashPerField.writeVInt(0, p.docFreq); 
      } 
      p.docFreq = 1;//對於新的文件,freq還是為1. 
      p.lastDocCode = (docState.docID - p.lastDocID) << 1;//文件號儲存差值 
      p.lastDocID = docState.docID; 
      writeProx(p, fieldState.position);  
    } else {

      //當文件ID不變的時候,說明此文件中這個詞又出現了一次,從而freq加一,寫入再次出現的位置資訊,用差值。 
      p.docFreq++; 
      writeProx(p, fieldState.position-p.lastPosition); 
  } 
}

(2-7) 結束處理當前域

consumer(TermsHashPerField).finish();

--> FreqProxTermsWriterPerField.finish()

--> TermVectorsTermsWriterPerField.finish()

endConsumer(NormsWriterPerField).finish();

--> norms[upto] = Similarity.encodeNorm(norm);//計算標準化因子的值。

--> docIDs[upto] = docState.docID;

4.2.3、結束處理當前文件

final DocumentsWriter.DocWriter one = fieldsWriter(StoredFieldsWriterPerThread).finishDocument();

儲存域返回結果:一個寫成了二進位制的儲存域快取。

one    StoredFieldsWriter$PerDoc  (id=322)    
    docID    0    
    fdt    RAMOutputStream  (id=325)    
        bufferLength    1024    
        bufferPosition    40    
        bufferStart    0    
        copyBuffer    null    
        currentBuffer    byte[1024]  (id=332)    
        currentBufferIndex    0    
        file    RAMFile  (id=333)    
        utf8Result    UnicodeUtil$UTF8Result  (id=335)    
    next    null    
    numStoredFields    2    
    this$0    StoredFieldsWriter  (id=327)   

final DocumentsWriter.DocWriter two = consumer(DocInverterPerThread).finishDocument();

--> NormsWriterPerThread.finishDocument()

--> TermsHashPerThread.finishDocument()

索引域的返回結果為null

4.3、用DocumentsWriter.finishDocument結束本次文件新增

程式碼:

DocumentsWriter.updateDocument(Document, Analyzer, Term)

--> DocumentsWriter.finishDocument(DocumentsWriterThreadState, DocumentsWriter$DocWriter)

      --> doPause = waitQueue.add(docWriter);//有關waitQueue,在DocumentsWriter的快取管理中已作解釋

            --> DocumentsWriter$WaitQueue.writeDocument(DocumentsWriter$DocWriter)

                  --> StoredFieldsWriter$PerDoc.finish()

                        --> fieldsWriter.flushDocument(perDoc.numStoredFields, perDoc.fdt);將儲存域資訊真正寫入檔案。