深入elasticsearch原始碼之索引過程
阿新 • • 發佈:2019-02-12
呼叫es 2..2.1 的 java Api在ES叢集中索引一個文件
客戶端大致流程:
- 使用
XContentBuilder
構建索引的json串,也可直接用json字串 - 使用
TransportClient
連線ES叢集 - 傳送索引到叢集並獲取
IndexResponse
測試程式碼如下:
package index;
import java.io.IOException;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.transport.TransportClient ;
import org.elasticsearch.common.xcontent.XContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.json.JsonXContentGenerator;
import com .fasterxml.jackson.core.filter.JsonPointerBasedFilter;
import es.MyTransportClient;
public class MyIndex {
public static void main(String[] args) {
TransportClient client = MyTransportClient.getInstance().getTransportClient();
/**
* 直接構造json
*/
// IndexResponse response = client.prepareIndex ("library","book","1")
// .setSource("{\"title\":\"mastering elasticsearch\"}")
// .execute().actionGet();
/**
* 程式碼構造json
*/
XContentBuilder builder;
try {
builder = JsonXContent.contentBuilder().startObject().field("user", "qiaqia")
.field("title", "this is title")
.field("subtitle", new String[] { "title1", "title2", "title3" }).endObject();
IndexResponse response = client.prepareIndex("library", "book", "4").setSource(builder)
.get();
System.out.println(response.toString());
System.out.println(response.isCreated());
System.out.println(response.getVersion());
/*
* 在es儲存的結果如下:
{
"_index": "library",
"_type": "book",
"_id": "5",
"_score": 1,
"_source": {
"user": "qiaqia",
"title": "this is title",
"subtitle": [
"title1"
,
"title2"
,
"title3"
]
}
}
*/
} catch (IOException e) {
e.printStackTrace();
}
}
}
構建好XContent
後,生成IndexRequest
,IndexRequest
封裝了索引的操作,索引內容,路由,索引型別,id, 時間戳,版本號,超時,ttl等等資訊
然後將IndexRequest
由TransportService
通過tcp傳送到叢集,TransportService
封裝了非同步,事件驅動的高效能網路應用程式框架Netty
服務端流程
獲取到TransportAction
後,讀取叢集狀態,確定資料分配到哪個分片上。
把請求提交到主分片處理,可檢視TransportIndexAction
protected Tuple<IndexResponse, IndexRequest> shardOperationOnPrimary(MetaData metaData, IndexRequest request) throws Throwable {
// validate, if routing is required, that we got routing
IndexMetaData indexMetaData = metaData.index(request.shardId().getIndex());
MappingMetaData mappingMd = indexMetaData.mappingOrDefault(request.type());
if (mappingMd != null && mappingMd.routing().required()) {
if (request.routing() == null) {
throw new RoutingMissingException(request.shardId().getIndex(), request.type(), request.id());
}
}
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
IndexShard indexShard = indexService.shardSafe(request.shardId().id());
final WriteResult<IndexResponse> result = executeIndexRequestOnPrimary(null, request, indexShard, mappingUpdatedAction);
final IndexResponse response = result.response;
final Translog.Location location = result.location;
processAfterWrite(request.refresh(), indexShard, location);
return new Tuple<>(response, request);//返回操作後的IndexResponse
}
執行索引寫入前,TransportIndexAction
public static Engine.IndexingOperation prepareIndexOperationOnPrimary(BulkShardRequest shardRequest, IndexRequest request, IndexShard indexShard) {
/**
將IndexRequest中的資料解析出來
*/
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.source()).index(request.index()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
boolean canHaveDuplicates = request.canHaveDuplicates();
if (shardRequest != null) {
canHaveDuplicates |= shardRequest.canHaveDuplicates();
}
/**
判斷是索引還是建立,當opType是Index時,如果文件id存在,更新文件,否則建立文件
當opType是Create,如果文件id存在,丟擲文件存在的錯誤
*/
if (request.opType() == IndexRequest.OpType.INDEX) {
return indexShard.prepareIndexOnPrimary(sourceToParse, request.version(), request.versionType(), canHaveDuplicates);
} else {
assert request.opType() == IndexRequest.OpType.CREATE : request.opType();
return indexShard.prepareCreateOnPrimary(sourceToParse, request.version(), request.versionType(), canHaveDuplicates, canHaveDuplicates);//呼叫indexShard對Lucene進行操作
}
}
在IndexShard
中,
public Engine.Index prepareIndexOnPrimary(SourceToParse source, long version, VersionType versionType, boolean canHaveDuplicates) {
try {
if (shardRouting.primary() == false) {
throw new IllegalIndexShardStateException(shardId, state, "shard is not a primary");
}
return prepareIndex(docMapper(source.type()), source, version, versionType, Engine.Operation.Origin.PRIMARY, state !=
IndexShardState.STARTED || canHaveDuplicates);
} catch (Throwable t) {
verifyNotClosed(t);
throw t;
}
}
static Engine.Index prepareIndex(DocumentMapperForType docMapper, SourceToParse source, long version, VersionType versionType, Engine
.Operation.Origin origin, boolean canHaveDuplicates) {
long startTime = System.nanoTime();
/**
解析json為ParsedDocument
*/
ParsedDocument doc = docMapper.getDocumentMapper().parse(source);
if (docMapper.getMapping() != null) {
doc.addDynamicMappingsUpdate(docMapper.getMapping());
}
//寫入Lucene
return new Engine.Index(docMapper.getDocumentMapper().uidMapper().term(doc.uid().stringValue()), doc, version, versionType,
origin, startTime, canHaveDuplicates);
}
有關如何跟Lucene底層進行資料互動的問題,由於本人剛入門ES,也沒讀過Lucene原始碼,所以等以後有時間再補充好了。