elasticsearch原始碼分析——叢集狀態
現在的工程就是在原始碼的層面進行改動,之前因為一個問題出現了叢集假死的狀態。所以才深入的去分析了,原始碼的叢集同步的狀態。
簡述
首先需要明白,類似於solr使用的是zookeeper來進行叢集狀態的同步。等於是使用了三方件實現叢集狀態的維護。但是要明白elasticsearch沒有用到zookeeper,etcd來管理節點的主備邏輯。
所以,叢集狀態同步是怎麼完成的呢。
推薦看一下這篇文章 ELASTICSEARCH 機制和架構 這個網站寫了很多elasticsearch相關的分析,對我的啟發不小。我也只是在他的文章的期初上做點發揮。
節點型別
不說那麼複雜,簡單關注兩個節點型別。
master節點
首先,在elasticsearch.yml檔案中只有配置了node.master: true ,本節點才能保證可以被選為主節點。
如果自己做原始碼分析,最好是將master和data節點分開,如果可以就自己多打點日誌。或者開啟debug日誌,可以簡單跟蹤一下流程。單節點除錯的話,因為很多流程是非同步的,所以不一定能分離的很清楚。
其次,主節點主要就是負責叢集狀態的下發。關注ClusterService類。
狀態更新的入口,至於怎麼走到這個入口的慢慢分析:
void runTasks(TaskInputs taskInputs) {
...
TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState, startTimeNS); // 第一個重點,主節點計算metadata,比如建立index之後的叢集狀態。
taskOutputs.notifyFailedTasks();
if (taskOutputs.clusterStateUnchanged()) {
taskOutputs.notifySuccessfulTasksOnUnchangedClusterState();
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
logger.debug("processing [{}]: took [{}] no change in cluster_state", taskInputs.summary, executionTime);
warnAboutSlowTaskIfNeeded(executionTime, taskInputs.summary);
} else {
ClusterState newClusterState = taskOutputs.newClusterState;
if (logger.isTraceEnabled()) {
logger.trace("cluster state updated, source [{}]\n{}", taskInputs.summary, newClusterState);
} else if (logger.isDebugEnabled()) {
logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), taskInputs.summary);
}
try {
publishAndApplyChanges(taskInputs, taskOutputs); // 看名字就知道什麼意思了,將叢集狀態下發。
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
logger.debug("processing [{}]: took [{}] done applying updated cluster_state (version: {}, uuid: {})", taskInputs.summary,
executionTime, newClusterState.version(), newClusterState.stateUUID());
warnAboutSlowTaskIfNeeded(executionTime, taskInputs.summary);
} catch (Exception e) {
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
final long version = newClusterState.version();
final String stateUUID = newClusterState.stateUUID();
final String fullState = newClusterState.toString();
logger.warn(
(Supplier<?>) () -> new ParameterizedMessage(
"failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]\n{}",
executionTime,
version,
stateUUID,
taskInputs.summary,
fullState),
e);
// TODO: do we want to call updateTask.onFailure here?
}
}
}
下面關注一下,狀態時怎麼下發的,這個流程也比較長,慢慢更新吧。
每一次狀態更新都會對應一個version,根據這個version就可以判斷,哪一次更新是最新的。
private void publishAndApplyChanges(TaskInputs taskInputs, TaskOutputs taskOutputs) {
ClusterState previousClusterState = taskOutputs.previousClusterState;
ClusterState newClusterState = taskOutputs.newClusterState;
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(taskInputs.summary, newClusterState, previousClusterState);
// new cluster state, notify all listeners
final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {
String summary = nodesDelta.shortSummary();
if (summary.length() > 0) {
logger.info("{}, reason: {}", summary, taskInputs.summary);
}
}
final Discovery.AckListener ackListener = newClusterState.nodes().isLocalNodeElectedMaster() ?
taskOutputs.createAckListener(threadPool, newClusterState) :
null;
nodeConnectionsService.connectToNodes(newClusterState.nodes());
// if we are the master, publish the new state to all nodes
// we publish here before we send a notification to all the listeners, since if it fails
// we don't want to notify
// 這裡就是主節點的轉發邏輯
if (newClusterState.nodes().isLocalNodeElectedMaster()) {
logger.debug("publishing cluster state version [{}]", newClusterState.version());
try { // 好吧,又是函數語言程式設計,經過我的一路跟蹤,預設使用的ZenDiscovery的publish方法,後面詳細解釋這個流程。
clusterStatePublisher.accept(clusterChangedEvent, ackListener);
} catch (Discovery.FailedToCommitClusterStateException t) {
final long version = newClusterState.version();
logger.warn(
(Supplier<?>) () -> new ParameterizedMessage(
"failing [{}]: failed to commit cluster state version [{}]", taskInputs.summary, version),
t);
// ensure that list of connected nodes in NodeConnectionsService is in-sync with the nodes of the current cluster state
nodeConnectionsService.connectToNodes(previousClusterState.nodes());
nodeConnectionsService.disconnectFromNodesExcept(previousClusterState.nodes());
taskOutputs.publishingFailed(t);
return;
}
}
logger.debug("applying cluster state version {}", newClusterState.version());
try {
// nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency
if (clusterChangedEvent.state().blocks().disableStatePersistence() == false && clusterChangedEvent.metaDataChanged()) {
final Settings incomingSettings = clusterChangedEvent.state().metaData().settings();
clusterSettings.applySettings(incomingSettings);
}
} catch (Exception ex) {
logger.warn("failed to apply cluster settings", ex);
}
logger.debug("set local cluster state to version {}", newClusterState.version());
// 注意這個地方,master節點是先給其它節點發送請求,如果有節點沒有響應,預設的是30s超時,之後才會走到本地節點的狀態更新。記得是本地的data節點,所以將master和data節點進行分離,原始碼比較好分析。
// 這裡就有一個問題,加入說一個shard有三個shard分佈在三個node上,每個shard刪除加入說需要1s的話。這裡相當遠是同步的方法,所以總共的刪除時間就需要2s。
callClusterStateAppliers(newClusterState, clusterChangedEvent);
nodeConnectionsService.disconnectFromNodesExcept(newClusterState.nodes());
updateState(css -> newClusterState);
Stream.concat(clusterStateListeners.stream(), timeoutClusterStateListeners.stream()).forEach(listener -> {
try {
logger.trace("calling [{}] with change to version [{}]", listener, newClusterState.version());
listener.clusterChanged(clusterChangedEvent);
} catch (Exception ex) {
logger.warn("failed to notify ClusterStateListener", ex);
}
});
//manual ack only from the master at the end of the publish
if (newClusterState.nodes().isLocalNodeElectedMaster()) {
try {
ackListener.onNodeAck(newClusterState.nodes().getLocalNode(), null);
} catch (Exception e) {
final DiscoveryNode localNode = newClusterState.nodes().getLocalNode();
logger.debug(
(Supplier<?>) () -> new ParameterizedMessage("error while processing ack for master node [{}]", localNode),
e);
}
}
taskOutputs.processedDifferentClusterState(previousClusterState, newClusterState);
if (newClusterState.nodes().isLocalNodeElectedMaster()) {
try {
taskOutputs.clusterStatePublished(clusterChangedEvent);
} catch (Exception e) {
logger.error(
(Supplier<?>) () -> new ParameterizedMessage(
"exception thrown while notifying executor of new cluster state publication [{}]",
taskInputs.summary),
e);
}
}
}
狀態分發
狀態的分發,其實包括兩個階段。一個叫send一個叫commit。目的就是保證叢集狀態的一致性。master首先發送send請求,如果有足夠的節點發送了響應,那接下來master節點再發送commit請求,這時候其它節點才開始執行。那麼這就牽扯到了幾個問題。
1、send請求傳送之後,其它節點會講這個state儲存在一個佇列裡面。
2、接收到commit請求的時候,將佇列中的節點標記為marked,然後進行處理。
3、send請求,SEND_ACTION_NAME = “internal:discovery/zen/publish/send”;
4、commit請求,COMMIT_ACTION_NAME = “internal:discovery/zen/publish/commit”
順著這個action name你就能找到它的傳送和處理邏輯。elasticsearch很多地方都是這樣進行請求傳送和處理的。
處理邏輯
一路跟啊跟的,你就能看到建立和刪除的流程是在以下地方執行的。IndicesClusterStateService,其實也就是在上面的ClusterService做本地更新的時候呼叫的。就是這個方法,callClusterStateAppliers(newClusterState, clusterChangedEvent);
@Override
public synchronized void applyClusterState(final ClusterChangedEvent event) {
if (!lifecycle.started()) {
return;
}
final ClusterState state = event.state();
// we need to clean the shards and indices we have on this node, since we
// are going to recover them again once state persistence is disabled (no master / not recovered)
// TODO: feels hacky, a block disables state persistence, and then we clean the allocated shards, maybe another flag in blocks?
if (state.blocks().disableStatePersistence()) {
for (AllocatedIndex<? extends Shard> indexService : indicesService) {
indicesService.removeIndex(indexService.index(), NO_LONGER_ASSIGNED,
"cleaning index (disabled block persistence)"); // also cleans shards
}
return;
}
updateFailedShardsCache(state);
deleteIndices(event); // also deletes shards of deleted indices
removeUnallocatedIndices(event); // also removes shards of removed indices
failMissingShards(state);
removeShards(state); // removes any local shards that doesn't match what the master expects
updateIndices(event); // can also fail shards, but these are then guaranteed to be in failedShardsCache
createIndices(state);
createOrUpdateShards(state);
}
關注點:
- 此方法是synchronized,同步的方法,也就是說,前一個狀態沒有更新完,下一個狀態是進不來的。
那麼就有一個問題,如果建立或者刪除耗時較長,那不就有阻塞了?其實這個方法裡面的都是元資料的更新,刪除和比較耗時的資料recovery流程都是在後臺執行緒執行的。所以邏輯上是不會卡主執行緒的。其實牽扯到recovery的流程還是有一定的複雜度在裡面的,後續專門寫一篇文章介紹吧。
經過這麼一個複雜的流程,叢集的狀態也就更新了。
data node
主要就是負責資料的寫入,預設data node的值為true。
主要關注,叢集狀態時怎麼在data node進行更新的。
send訊息
上面有提到send請求使用的action名是SEND_ACTION_NAME,根據這個就可以找到處理邏輯。
protected void handleIncomingClusterStateRequest(BytesTransportRequest request, TransportChannel channel) throws IOException {
Compressor compressor = CompressorFactory.compressor(request.bytes());
StreamInput in = request.bytes().streamInput();
try {
if (compressor != null) {
in = compressor.streamInput(in);
}
in = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry);
in.setVersion(request.version());
synchronized (lastSeenClusterStateMutex) {
final ClusterState incomingState;
// If true we received full cluster state - otherwise diffs
if (in.readBoolean()) {
incomingState = ClusterState.readFrom(in, clusterStateSupplier.get().nodes().getLocalNode());
logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(),
request.bytes().length());
} else if (lastSeenClusterState != null) {
Diff<ClusterState> diff = ClusterState.readDiffFrom(in, lastSeenClusterState.nodes().getLocalNode());
incomingState = diff.apply(lastSeenClusterState);
logger.debug("received diff cluster state version [{}] with uuid [{}], diff size [{}]",
incomingState.version(), incomingState.stateUUID(), request.bytes().length());
} else {
logger.debug("received diff for but don't have any local cluster state - requesting full state");
throw new IncompatibleClusterStateVersionException("have no local cluster state");
}
// sanity check incoming state
validateIncomingState(incomingState, lastSeenClusterState);
pendingStatesQueue.addPending(incomingState); // 關鍵點,主要是加到pending佇列裡面
lastSeenClusterState = incomingState;
}
} finally {
IOUtils.close(in);
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
這裡就可以看到send只是確保data node節點接收到請求,但是並沒有進行處理先放在pendingStatesQueue中。進行回覆,主節點就知道這個data node能接收到訊息。後面master節點會發送commit請求過來。
commit請求
COMMIT_ACTION_NAME,一樣的辦法ctrl+h搜尋,就可以看到這個action是怎麼註冊的,以及對應的處理邏輯。
protected void handleCommitRequest(CommitClusterStateRequest request, final TransportChannel channel) {
final ClusterState state = pendingStatesQueue.markAsCommitted(request.stateUUID,
new PendingClusterStatesQueue.StateProcessedListener() {
@Override
public void onNewClusterStateProcessed() { // 非同步框架會看到很多這樣的邏輯,處理完成之後就會呼叫sendResponse方法
try {
// send a response to the master to indicate that this cluster state has been processed post committing it.
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} catch (Exception e) {
logger.debug("failed to send response on cluster state processed", e);
onNewClusterStateFailed(e);
}
}
@Override
public void onNewClusterStateFailed(Exception e) {
try {
channel.sendResponse(e);
} catch (Exception inner) {
inner.addSuppressed(e);
logger.debug("failed to send response on cluster state processed", inner);
}
}
});
if (state != null) {
newPendingClusterStatelistener.onNewClusterState("master " + state.nodes().getMasterNode() +
" committed version [" + state.version() + "]"); // 具體處理邏輯
}
}
後續還是走到了ZenDistovery的處理邏輯。
private class NewPendingClusterStateListener implements PublishClusterStateAction.NewPendingClusterStateListener {
@Override
public void onNewClusterState(String reason) {
processNextPendingClusterState(reason);
}
}
processNextPendingClusterState最終會提交一個BatchedTask,具體的處理邏輯就又回到ClusterService裡面了,就對上上面的流程。但這裡要注意一點就是,
特別注意!!!!!
1、threadExecutor,跟進去初始化的邏輯就可以看到這個有限佇列的大小是1。是1,也就代表著如果這個優先佇列的節點沒有處理完,沒有remove掉,那麼這個執行緒池就會將後續的請求快取到workqueue。
2、需要知道前面的所有的狀態更新是要提交到pendingStatesQueue,所以如果這個執行緒池一直被卡主,就會導致pendingStatesQueue請求一直在積累。這個pendingStatesQueue有一個邏輯就是大小是25,如果超過大小,就會將最早的狀態更新請求刪除掉。我們的工程上要對elasticsearch進行改動,添加了C++的邏輯,結果在這裡就遇到了一個坑,後端因為問題C++,死鎖了,結果這個執行緒池就一直在這裡卡主,後續的請求根本就進不來。導致pendingStatesQueue不斷的進行刪除,但是一直不能處理
3、curl 127.0.0.1:9200/_cat/tasks?v 可以檢視後臺正在執行的任務。
public void submitTasks(List<? extends BatchedTask> tasks, @Nullable TimeValue timeout) throws EsRejectedExecutionException {
if (tasks.isEmpty()) {
return;
}
final BatchedTask firstTask = tasks.get(0);
assert tasks.stream().allMatch(t -> t.batchingKey == firstTask.batchingKey) :
"tasks submitted in a batch should share the same batching key: " + tasks;
// convert to an identity map to check for dups based on task identity
final Map<Object, BatchedTask> tasksIdentity = tasks.stream().collect(Collectors.toMap(
BatchedTask::getTask,
Function.identity(),
(a, b) -> { throw new IllegalStateException("cannot add duplicate task: " + a); },
IdentityHashMap::new));
synchronized (tasksPerBatchingKey) {
LinkedHashSet<BatchedTask> existingTasks = tasksPerBatchingKey.computeIfAbsent(firstTask.batchingKey,
k -> new LinkedHashSet<>(tasks.size()));
for (BatchedTask existing : existingTasks) {
// check that there won't be two tasks with the same identity for the same batching key
BatchedTask duplicateTask = tasksIdentity.get(existing.getTask());
if (duplicateTask != null) {
throw new IllegalStateException("task [" + duplicateTask.describeTasks(
Collections.singletonList(existing)) + "] with source [" + duplicateTask.source + "] is already queued");
}
}
existingTasks.addAll(tasks);
}
if (timeout != null) {
threadExecutor.execute(firstTask, timeout, () -> onTimeoutInternal(tasks, timeout));
} else {
threadExecutor.execute(firstTask);
}
}