elasticsearch index 之 create index(-)
從本篇開始,就進入了Index的核心代碼部分。這裏首先分析一下索引的創建過程。elasticsearch中的索引是多個分片的集合,它只是邏輯上的索引,並不具備實際的索引功能,所有對數據的操作最終還是由每個分片完成。創建索引的過程,從elasticsearch集群上來說就是寫入索引元數據的過程,這一操作只能在master節點上完成。這是一個阻塞式動作,在加上分配在集群上均衡的過程也非常耗時,因此在一次創建大量索引的過程master節點會出現單點性能瓶頸,能夠看到響應過程很慢。
在開始具體源碼分析之前,首先回顧一下Action部分的內容(參考index action分析),elasticsearch的每一個功能都對應兩個Action,*action和Transport*action。*action中定義了每個功能對應的路徑,同時Action的instance綁定對應的Transport*Action。所有功能請求都需要在集群上轉發,這大概也是每個功能都有Transport*Action的原因吧。對於create當然也不例外,它的開始點也是TransportCreateAction。另外,在action support分析中分析過,不同的action需要經過和需要操作的節點也不同。create index只能由master節點進行,而且也只在master節點上進行,保證集群數據的一致性。因此TransportCreateAction繼承了TransportMasterNodeOperationAction,並實現了materOperation方法。它的方法如下所示:
protected void masterOperation(final CreateIndexRequest request, final ClusterState state, final ActionListener<CreateIndexResponse> listener) throws ElasticsearchException { String cause = request.cause(); if (cause.length() == 0) { cause = "api"; }final CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(request, cause, request.index()) .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout()) .settings(request.settings()).mappings(request.mappings()) .aliases(request.aliases()).customs(request.customs()); createIndexService.createIndex(updateRequest,new ActionListener<ClusterStateUpdateResponse>() { @Override public void onResponse(ClusterStateUpdateResponse response) { listener.onResponse(new CreateIndexResponse(response.isAcknowledged())); } @Override public void onFailure(Throwable t) { if (t instanceof IndexAlreadyExistsException) { logger.trace("[{}] failed to create", t, request.index()); } else { logger.debug("[{}] failed to create", t, request.index()); } listener.onFailure(t); } }); }
這裏看上很簡單,只是調用了createIndexService(它其實是MetaDataCreateIndexService)的方法,就是修改集群matedata過程。修改前首先獲取到index名稱對應的lock,這樣保證操作數據一致性,然後生成updatetask,交給clusterservice處理。代碼如下所示:
public void createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) { // 獲取鎖,只對該索引的操作加鎖,而不是整個cluster final Semaphore mdLock = metaDataService.indexMetaDataLock(request.index()); // 如果能夠獲取鎖離開創建索引,否則在下面啟動新的線程進行 if (mdLock.tryAcquire()) { createIndex(request, listener, mdLock); return; } threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new ActionRunnable(listener) { @Override public void doRun() throws InterruptedException { if (!mdLock.tryAcquire(request.masterNodeTimeout().nanos(), TimeUnit.NANOSECONDS)) { listener.onFailure(new ProcessClusterEventTimeoutException(request.masterNodeTimeout(), "acquire index lock")); return; } createIndex(request, listener, mdLock); } }); }
createIndex方法,會封裝create請求,然後向cluster發送一個updatetask。代碼如下所示:
private void createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener, final Semaphore mdLock) { ImmutableSettings.Builder updatedSettingsBuilder = ImmutableSettings.settingsBuilder(); updatedSettingsBuilder.put(request.settings()).normalizePrefix(IndexMetaData.INDEX_SETTING_PREFIX); request.settings(updatedSettingsBuilder.build()); clusterService.submitStateUpdateTask("create-index [" + request.index() + "], cause [" + request.cause() + "]", Priority.URGENT, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener)
建立索引,修改配置,增加或者修改mapping都是對集群狀態修改,它們的過程都很相似,都是通過clusterService提交一個更新操作,同時附帶有優先級。clusterservice會根據優先級和更新狀態task的類型來進行對應的操作。如下所示:
public void submitStateUpdateTask(final String source, Priority priority, final ClusterStateUpdateTask updateTask) { if (!lifecycle.started()) { return; } try { final UpdateTask task = new UpdateTask(source, priority, updateTask);//根據優先級新建不同的task if (updateTask instanceof TimeoutClusterStateUpdateTask) {//超時任務,這類任務需要即時返回,因此立刻執行。 final TimeoutClusterStateUpdateTask timeoutUpdateTask = (TimeoutClusterStateUpdateTask) updateTask; updateTasksExecutor.execute(task, threadPool.scheduler(), timeoutUpdateTask.timeout(), new Runnable() { @Override public void run() { threadPool.generic().execute(new Runnable() { @Override public void run() { timeoutUpdateTask.onFailure(task.source(), new ProcessClusterEventTimeoutException(timeoutUpdateTask.timeout(), task.source())); } }); } }); } else {//其它類型,可以延遲執行,則交給線程池來執行。 updateTasksExecutor.execute(task); } } catch (EsRejectedExecutionException e) { // ignore cases where we are shutting down..., there is really nothing interesting // to be done here... if (!lifecycle.stoppedOrClosed()) { throw e; } } }
說完它們的執行過程,再來看一下create index的具體邏輯。這個邏輯在matedataservice所提交的AckedClusterStateUpdateTask中的execute方法中。總體來說,這一過程就是將request中關於索引的配置mapping等取出來加入到當前的clustermatedata中,構造一個新的matedata的過程。這一過程還是比較復雜,限於篇幅將在下次中進行分析。
總結:創建索引的過程就是master節點更新集群matedata的過程,為了保證數據一致性,需要獲取鎖。因此存在單點瓶頸。對於外部調用來說,跟其它功能一樣,外部接口調用CreateIndexAction的相關方法,然後通過TransPortCreateIndexAction講請求發送到集群上,進行索引創建。
elasticsearch index 之 create index(-)