1. 程式人生 > >深入elasticsearch原始碼之索引過程

深入elasticsearch原始碼之索引過程

呼叫es 2..2.1 的 java Api在ES叢集中索引一個文件

客戶端大致流程:

  1. 使用XContentBuilder構建索引的json串,也可直接用json字串
  2. 使用TransportClient連線ES叢集
  3. 傳送索引到叢集並獲取IndexResponse

測試程式碼如下:

package index;

import java.io.IOException;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.transport.TransportClient
; import org.elasticsearch.common.xcontent.XContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.json.JsonXContentGenerator; import com
.fasterxml.jackson.core.filter.JsonPointerBasedFilter; import es.MyTransportClient; public class MyIndex { public static void main(String[] args) { TransportClient client = MyTransportClient.getInstance().getTransportClient(); /** * 直接構造json */ // IndexResponse response = client.prepareIndex
("library","book","1") // .setSource("{\"title\":\"mastering elasticsearch\"}") // .execute().actionGet(); /** * 程式碼構造json */ XContentBuilder builder; try { builder = JsonXContent.contentBuilder().startObject().field("user", "qiaqia") .field("title", "this is title") .field("subtitle", new String[] { "title1", "title2", "title3" }).endObject(); IndexResponse response = client.prepareIndex("library", "book", "4").setSource(builder) .get(); System.out.println(response.toString()); System.out.println(response.isCreated()); System.out.println(response.getVersion()); /* * 在es儲存的結果如下: { "_index": "library", "_type": "book", "_id": "5", "_score": 1, "_source": { "user": "qiaqia", "title": "this is title", "subtitle": [ "title1" , "title2" , "title3" ] } } */ } catch (IOException e) { e.printStackTrace(); } } }

構建好XContent後,生成IndexRequest,IndexRequest封裝了索引的操作,索引內容,路由,索引型別,id, 時間戳,版本號,超時,ttl等等資訊
然後將IndexRequestTransportService通過tcp傳送到叢集,TransportService封裝了非同步,事件驅動的高效能網路應用程式框架Netty

服務端流程

獲取到TransportAction後,讀取叢集狀態,確定資料分配到哪個分片上。

把請求提交到主分片處理,可檢視TransportIndexAction

  protected Tuple<IndexResponse, IndexRequest> shardOperationOnPrimary(MetaData metaData, IndexRequest request) throws Throwable {

        // validate, if routing is required, that we got routing
        IndexMetaData indexMetaData = metaData.index(request.shardId().getIndex());
        MappingMetaData mappingMd = indexMetaData.mappingOrDefault(request.type());
        if (mappingMd != null && mappingMd.routing().required()) {
            if (request.routing() == null) {
                throw new RoutingMissingException(request.shardId().getIndex(), request.type(), request.id());
            }
        }

        IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
        IndexShard indexShard = indexService.shardSafe(request.shardId().id());

        final WriteResult<IndexResponse> result = executeIndexRequestOnPrimary(null, request, indexShard, mappingUpdatedAction);
        final IndexResponse response = result.response;
        final Translog.Location location = result.location;
        processAfterWrite(request.refresh(), indexShard, location);
        return new Tuple<>(response, request);//返回操作後的IndexResponse
    }

執行索引寫入前,TransportIndexAction

public static Engine.IndexingOperation prepareIndexOperationOnPrimary(BulkShardRequest shardRequest, IndexRequest request, IndexShard indexShard) {
/**
 將IndexRequest中的資料解析出來
*/
        SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.source()).index(request.index()).type(request.type()).id(request.id())
            .routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
        boolean canHaveDuplicates = request.canHaveDuplicates();
        if (shardRequest != null) {
            canHaveDuplicates |= shardRequest.canHaveDuplicates();
        }
    /**
          判斷是索引還是建立,當opType是Index時,如果文件id存在,更新文件,否則建立文件
          當opType是Create,如果文件id存在,丟擲文件存在的錯誤
    */
        if (request.opType() == IndexRequest.OpType.INDEX) {
            return indexShard.prepareIndexOnPrimary(sourceToParse, request.version(), request.versionType(), canHaveDuplicates);
        } else {
            assert request.opType() == IndexRequest.OpType.CREATE : request.opType();

            return indexShard.prepareCreateOnPrimary(sourceToParse, request.version(), request.versionType(), canHaveDuplicates, canHaveDuplicates);//呼叫indexShard對Lucene進行操作
        }
    }

IndexShard中,

public Engine.Index prepareIndexOnPrimary(SourceToParse source, long version, VersionType versionType, boolean canHaveDuplicates) {
        try {
            if (shardRouting.primary() == false) {
                throw new IllegalIndexShardStateException(shardId, state, "shard is not a primary");
            }
            return prepareIndex(docMapper(source.type()), source, version, versionType, Engine.Operation.Origin.PRIMARY, state !=
                    IndexShardState.STARTED || canHaveDuplicates);
        } catch (Throwable t) {
            verifyNotClosed(t);
            throw t;
        }
    }
static Engine.Index prepareIndex(DocumentMapperForType docMapper, SourceToParse source, long version, VersionType versionType, Engine
            .Operation.Origin origin, boolean canHaveDuplicates) {
        long startTime = System.nanoTime();
        /**
       解析json為ParsedDocument
    */
        ParsedDocument doc = docMapper.getDocumentMapper().parse(source);
        if (docMapper.getMapping() != null) {
            doc.addDynamicMappingsUpdate(docMapper.getMapping());
        }
        //寫入Lucene
        return new Engine.Index(docMapper.getDocumentMapper().uidMapper().term(doc.uid().stringValue()), doc, version, versionType,
                origin, startTime, canHaveDuplicates);
    }

有關如何跟Lucene底層進行資料互動的問題,由於本人剛入門ES,也沒讀過Lucene原始碼,所以等以後有時間再補充好了。