1. 程式人生 > >elasticsearch原始碼分析---索引資料

elasticsearch原始碼分析---索引資料

跟正常的網路通訊相似,es的client跟server是通過netty進行通訊的,client封裝各種request,通過netty傳送給es的server。server解析收到的各類request,dispatch到對應的handler中進行處理。

下面我們看一下索引一條資料設計到的程式碼呼叫。

客戶端當然就是發起請求的了:client.prepareIndex("index","type","id").execute(); 我們假設使用的是TransportClient。過程如下:

prepareIndex會生成一個IndexRequestBuilder,其execute為基類ActionRequestBuilder的呼叫:

  public void execute(ActionListener<Response> listener) {
        doExecute(listener);
    }

IndexRequestBuilder覆蓋doExecute方法:
protected void doExecute(ActionListener<IndexResponse> listener) {
        client.index(request, listener);
    }

呼叫到client的index方法裡邊去了,其中client我們使用的TransportClient(裡邊使用的是InternalTransportClient例項):
  public ActionFuture<IndexResponse> index(IndexRequest request) {
        return internalClient.index(request);
    }

上述呼叫會進入TranportClient的基類AbstractClient中:
public ActionFuture<IndexResponse> index(final IndexRequest request) {
        return execute(IndexAction.INSTANCE, request);
    }

注意這裡的IndexAction已經在Client初始化的時候繫結到了TransportIndexAction

modules.add(new ActionModule(true));

registerAction(IndexAction.INSTANCE, TransportIndexAction.class);

這個基類的方法在InternalTransportClient的實現如下:

  public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> ActionFuture<Response> execute(final Action<Request, Response, RequestBuilder, Client> action, final Request request) {
        final TransportActionNodeProxy<Request, Response> proxy = actions.get(action);
        return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<Response>>() {
            @Override
            public ActionFuture<Response> doWithNode(DiscoveryNode node) throws ElasticsearchException {
                return proxy.execute(node, request);
            }
        });
    }
在InternalTransportClient內部維護這樣一個結構:
private final ImmutableMap<Action, TransportActionNodeProxy> actions;
註冊了action跟TransportActionNodeProxy的對應關係。因此,在execute的過程中會根據action取對應的TransportActionNodeProxy例項(註冊了),最終呼叫到execute方法,在該方法中借用transportServive通訊模組把請求發給伺服器:
   transportService.sendRequest(node, action.name(), request, transportOptions, new BaseTransportResponseHandler<Response>()

至此,進入伺服器端的處理了:

nettyTranspot中dispatch如下:

pipeline.addLast("dispatcher", new MessageChannelHandler(NettyTransport.this, logger));
進入MessageChannelHandler的messageReceived,一通解析之後,調入handleRequest中:
TransportRequestHandler handler = transportServiceAdapter.handler(action);

TransportIndexAction對應的handler是OperationTransportHandler?在TransportShardReplicationOperationAction中註冊了:

transportService.registerHandler(actionName, new OperationTransportHandler());

至此,已經找到訊息的處理函式,進入OperationTransportHandler中,執行messageReceived方法:

 public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
            // no need to have a threaded listener since we just send back a response
            request.listenerThreaded(false);
            // if we have a local operation, execute it on a thread since we don't spawn
            request.operationThreaded(true);
            execute(request, new ActionListener<Response>() {
                @Override
                public void onResponse(Response result) {
                    try {
                        channel.sendResponse(result);
                    } catch (Throwable e) {
                        onFailure(e);
                    }
                }

                @Override
                public void onFailure(Throwable e) {
                    try {
                        channel.sendResponse(e);
                    } catch (Throwable e1) {
                        logger.warn("Failed to send response for " + actionName, e1);
                    }
                }
            });
        }

具體執行過程到execute方法中,該方法是基類TransportAction的方法,進一步檢視基類execute方法,可以發現最終又回到了TransportIndexAction的doExecute方法,然後執行InnerExecute進一步執行基類TransportShardReplicationOperationAction的doExecute方法,最終歸宿為AsyncShardOperationAction:
 protected void doExecute(Request request, ActionListener<Response> listener) {
        new AsyncShardOperationAction(request, listener).start();
    }

over。經過一番周折才找到最終服務端的執行函式AsyncShardOperationAction。。。。

接下來的操作就是真正的索引操作了,函式都集中在TransportIndexAction和基類TransportShardReplicationOperationAction中,來回穿梭。。。

AsyncShardOperationAction的doStart方法為入口,讀取叢集狀態,獲取索引請求需要在哪些shard上執行(根據indexname,type,id,routing值),找到primary shard和一致性保障,然後分別在primary shards和replicas上執行操作。其中TransportAction中的shardOperationOnPrimary函式為在primary上執行的操作。

shardOperationOnPrimary中會判斷操作的型別:

 if (request.opType() == IndexRequest.OpType.INDEX) {
            Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates());
            if (index.parsedDoc().mappingsModified()) {
                mappingUpdatedAction.updateMappingOnMaster(request.index(), index.docMapper(), indexService.indexUUID());
            }
            indexShard.index(index);
            version = index.version();
            op = index;
            created = index.created();
        } else {
            Engine.Create create = indexShard.prepareCreate(sourceToParse,
                    request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates(), request.autoGeneratedId());
            if (create.parsedDoc().mappingsModified()) {
                mappingUpdatedAction.updateMappingOnMaster(request.index(), create.docMapper(), indexService.indexUUID());
            }
            indexShard.create(create);
            version = create.version();
            op = create;
            created = true;
        }

可見分為兩種,一種是index操作(id已經存在,就更新doc的值),一種是create操作(建立新的doc).

InternalIndexShard執行index操作:其中會根據source內容來判定索引資料是否對mapping進行了更改,如果有更改,會updateMappingOnMaster,然後呼叫InternalEngine的index方法執行lucene的Index操作:執行一些version的操作,是否衝突,確定最終的version,然後寫入lucene索引,寫入translog。

主分片操作完成之後,在副本上執行操作。

至此,一條索引資料完成了。。。。。


相關推薦

elasticsearch原始碼分析---索引資料

跟正常的網路通訊相似,es的client跟server是通過netty進行通訊的,client封裝各種request,通過netty傳送給es的server。server解析收到的各類request,dispatch到對應的handler中進行處理。 下面我們看一下索引一條

elasticsearch原始碼分析索引操作(九)

上節介紹了es的node啟動如何建立叢集服務的過程,這節在其基礎之上介紹es索引的基本操作功能(create、exist、delete),用來進一步細化es叢集是如果工作的。 客戶端部分的操作就不予介紹了,詳細可以參照elasticsearch原始碼分析之客戶

兄弟連區塊鏈教程Fabric1.0原始碼分析ledgerID資料

1、idStore概述 Fabric支援建立多個Ledger,不同Ledger以ledgerID區分。 多個ledgerID及其創世區塊儲存在idStore資料庫中,idStore資料庫基於leveldb實現。 idStore預設使用路徑:/var/hyperledger/production

python使用Elasticsearch庫下載索引資料

from elasticsearch import Elasticsearch es= Elasticsearch(hosts=[{'host': 'localhost', 'port': 9210}]) dealnum=0 if __name__=='__main__': #查詢條件

區塊鏈入門教程以太坊原始碼分析交易資料分析eth

交易的資料結構 交易的資料結構定義在core.types.transaction.go中,結構如下: type Transaction struct { data txdata // caches hash atomic.Value size atomic.Value from atomic.Value

Shrio原始碼分析(4) - 資料域(Realm)

本文在於分析Shiro原始碼,對於新學習的朋友可以參考 [開濤部落格](http://jinnianshilongnian.iteye.com/blog/2018398)進行學習。 本篇主要分析Shiro中的Realm介面。Shiro使用Realm介面作為外部資料來源,主要處

ES5.6.4原始碼分析----索引的建立過程

入口 TransportCreateIndexAction#masterOperation 解析請求中的索引名稱 final String indexName = indexNameExpressionResolver.resolveDateMathExpre

elasticsearch原始碼分析——原始碼編譯

分析原始碼,首先要從搭建編譯執行環境開始 之前用solr比較多,限於solr不能做動態擴容,現在轉戰elasticsearch 專案 版本 執行環境 windows IDE intellij 2018.1.

elasticsearch原始碼分析之分片分配(十)

分片 什麼是分片 分片是把索引資料切分成多個小的索引塊,這些小的索引塊能夠分發到同一個叢集中的不同節點。在檢索時,檢索結果是該索引每個分片上檢索結果的合併。類似於資料庫的分庫分表。 為什麼分片 1、這樣可以提高讀寫效能,實現負載均衡。 2、副本容易

elasticsearch原始碼分析——叢集狀態

現在的工程就是在原始碼的層面進行改動,之前因為一個問題出現了叢集假死的狀態。所以才深入的去分析了,原始碼的叢集同步的狀態。 簡述   首先需要明白,類似於solr使用的是zookeeper來進行叢集狀態的同步。等於是使用了三方件實現叢集狀態的維護。但

elasticsearch原始碼分析之服務端(四)

上篇部落格說明了客戶端的情況,現在繼續分析服務端都幹了些啥,es是怎麼把資料插進去的,此處以transport的bulk為入口來探究,對於單個document的傳送就忽略了。 一、服務端接收 1.1接收訊息 在客戶端分析中已經提到,netty中通訊的處理類是Mes

angr原始碼分析——DFG 資料流圖

這篇文章主要講述,angr中資料流圖(Data Flow Gragh)的構建。 DFG恢復的是CFG中每個基本塊的資料流!DFG為CFG的每個基本塊構建一個數據流圖(DFG)DFG可以通過字典self.dfgs獲得,其中key的值為基本塊的地址,或DFG中的值。param CFG:用於獲得所有基本

elasticsearch原始碼分析之啟動過程(二)

最近開始廣泛的使用elasticsearch,也開始寫一些java程式碼了,為了提高java程式碼能力,也為了更加深入一點了解elasticsearch的內部運作機制,所以開始看一些elasticsearch的原始碼了。對於這種廣受追捧的開源專案,細細品讀一定會受益匪淺,

elasticsearch原始碼分析之叢集管理

一、背景 Elasticsearch是一個實時分散式搜尋和分析引擎。它讓你以前所未有的速度處理大資料成為可能。本文主要介紹實現分散式搜尋和分析的基礎–儲存,好的儲存設計在根本上決定了查詢的效能。 es的儲存本質上是採用了lucene全文索引,在其基礎上實現了分散式功

Elasticsearch原始碼分析十四--搜尋型別

簡介query_then_fetchquery_and_fetchdfs_query_and_fetchdfs_query_then_fetchcountscan 簡介 Elasticsearch允許通過指定搜尋型別來選擇查詢在內部如何處理。不同的搜尋型別適合不同的情況;

elasticsearch原始碼分析---threadpool模組

elasticsearch的執行緒池實現在org.elasticsearch.threadpool下。 初始化過程中會載入以threadpool開頭的配置項的配置資訊,然後確定各個執行緒池的大小,預設情況下,會參照處理器個數進行設定: int availab

Elasticsearch原始碼分析 | 單節點的啟動和關閉

本文主要簡要介紹Elasticsearch單節點的啟動和關閉流程。Elasticsearch版本:6.3.2 相關文章 1、Google Guice 快速入門 2、Elasticsearch 中的 Guice 3、教你編譯除錯Elasticsearch 6.3.2原始碼 4、El

elasticsearch原始碼分析之Transport(五)

一、基本介紹 1.1概念介紹 transport模組是es通訊的基礎模組,在elasticsearch中用的很廣泛,比如叢集node之間的通訊、資料的傳輸、transport client方式的資料傳送等等,只要數和通訊、資料傳輸相關的都離不開transport模組的

elasticsearch原始碼分析之discovery(七)

Discovery模組概述 elasticsearch中的Discover模組,這個模組主要是發現模組負責發現叢集中的節點,以及選取主節點。用作處理elasticsearch中的叢集問題,是elasticsearch中比較複雜的一個模組。  discovery有幾類:

【Vue】原始碼分析--雙向資料繫結的實現

總結 Vue的雙向資料繫結主要通過Object.defineProperty來實現,先為所有的屬性加上get/set的監控,這樣當屬性值改變時就會觸發對應的set方法,然後再在set方法中通過觀