Elasticsearch加入叢集流程
簡介
es節點啟動時,做的最重要的一件事就是加入叢集,今天分析下es節點加入叢集的原始碼。
主要知識點
es基礎
基於lucene的分散式搜尋引擎,不在累述,可以自行查閱資料。
Bully演算法
提到分散式選舉演算法,大家都知道Paxos演算法,但是這個演算法比較複雜,es選取的是一個很簡單的Bully演算法,演算法描述如下:
- 某個程序通過廣播,獲取所有能夠連線到的程序的ID。
- 如果發現自己是ID最大的程序,則廣播自己成為master節點,並廣播該訊息。
- 如果程序發現有其他程序比自己的ID大,則認為自己不能成為master,等待master廣播。
整個選主流程非常簡單,下面我們去es程式碼中一窺究竟。
程式碼分析
Node啟動的時候做了很多事情,我們這次主要關注的涉及以下程式碼,discovery的預設實現是ZenDiscovery。
public Node start() throws NodeValidationException {
...
// 啟動Discovery,這個模組用來發現其他節點。
discovery.start();
transportService.acceptIncomingRequests();
// 開始加入叢集
discovery.startInitialJoin();
...
}
ZenDiscovery是Es預設實現的節點發現服務,上面呼叫了startInitialJoin()方法:
@Override public void startInitialJoin() { // start the join thread from a cluster state update. See {@link JoinThreadControl} for details. clusterService.submitStateUpdateTask("initial_join", new LocalClusterUpdateTask() { @Override public ClusterTasksResult<LocalClusterUpdateTask> execute(ClusterState currentState) throws Exception { // do the join on a different thread, the DiscoveryService waits for 30s anyhow till it is discovered joinThreadControl.startNewThreadIfNotRunning(); return unchanged(); } @Override public void onFailure(String source, @org.elasticsearch.common.Nullable Exception e) { logger.warn("failed to start initial join process", e); } }); }
這裡直接向clusterService提交了一個任務,在execute方法中,通過joinThreadControl來啟動一個執行緒進行join。JoinThreadControl是ZenDistovery的內部類,主要用來控制join執行緒,保證只有一個執行緒執行join任務,以及join成功後的處理,這樣可以使得startInitialJoin()方法迅速返回,不阻塞node節點的start方法。
下面來看下startNewThreadIfNotRunning()方法:
public void startNewThreadIfNotRunning() {
ClusterService.assertClusterStateThread();
if (joinThreadActive()) {
return;
}
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
Thread currentThread = Thread.currentThread();
if (!currentJoinThread.compareAndSet(null, currentThread)) {
return;
}
while (running.get() && joinThreadActive(currentThread)) {
try {
innerJoinCluster();
return;
} catch (Exception e) {
logger.error("unexpected error while joining cluster,trying again", e);
// Because we catch any exception here, we want to know in
// tests if an uncaught exception got to this point and the test infra uncaught exception
// leak detection can catch this. In practise no uncaught exception should leak
assert ExceptionsHelper.reThrowIfNotNull(e);
}
}
// cleaning the current thread from currentJoinThread is done by explicit calls.
}
});
}
這個方法也很簡單,直接呼叫了innerJoinCluster()方法,注意這裡是while循序,只要joinThreadControl還在執行,並且當前是當前執行緒在執行join任務,丟擲異常的情況下要重新執行,直到join成功。
再看下innerJoinCluster()方法:
private void innerJoinCluster() {
//1--------------------------------
DiscoveryNode masterNode = null;
final Thread currentThread = Thread.currentThread();
nodeJoinController.startElectionContext();
while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) {
masterNode = findMaster();
}
if (!joinThreadControl.joinThreadActive(currentThread)) {
logger.trace("thread is no longer in currentJoinThread. Stopping.");
return;
}
// 2-----------------------------
if (clusterService.localNode().equals(masterNode)) {
final int requiredJoins = Math.max(0, electMaster.minimumMasterNodes() - 1); // we count as one
logger.debug("elected as master, waiting for incoming joins ([{}] needed)", requiredJoins);
nodeJoinController.waitToBeElectedAsMaster(requiredJoins, masterElectionWaitForJoinsTimeout,
new NodeJoinController.ElectionCallback() {
@Override
public void onElectedAsMaster(ClusterState state) {
joinThreadControl.markThreadAsDone(currentThread);
// we only starts nodesFD if we are master (it may be that we received a cluster state while pinging)
nodesFD.updateNodesAndPing(state); // start the nodes FD
}
@Override
public void onFailure(Throwable t) {
logger.trace("failed while waiting for nodes to join, rejoining", t);
joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
}
}
);
// 3------------------------------------
} else {
// process any incoming joins (they will fail because we are not the master)
nodeJoinController.stopElectionContext(masterNode + " elected");
// send join request
final boolean success = joinElectedMaster(masterNode);
// finalize join through the cluster state update thread
final DiscoveryNode finalMasterNode = masterNode;
clusterService.submitStateUpdateTask("finalize_join (" + masterNode + ")", new LocalClusterUpdateTask() {
@Override
public ClusterTasksResult<LocalClusterUpdateTask> execute(ClusterState currentState) throws Exception {
if (!success) {
// failed to join. Try again...
joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
return unchanged();
}
if (currentState.getNodes().getMasterNode() == null) {
// Post 1.3.0, the master should publish a new cluster state before acking our join request. we now should have
// a valid master.
logger.debug("no master node is set, despite of join request completing. retrying pings.");
joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
return unchanged();
}
if (!currentState.getNodes().getMasterNode().equals(finalMasterNode)) {
return joinThreadControl.stopRunningThreadAndRejoin(currentState, "master_switched_while_finalizing_join");
}
// Note: we do not have to start master fault detection here because it's set at {@link #processNextPendingClusterState }
// when the first cluster state arrives.
joinThreadControl.markThreadAsDone(currentThread);
return unchanged();
}
@Override
public void onFailure(String source, @Nullable Exception e) {
logger.error("unexpected error while trying to finalize cluster join", e);
joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
}
});
}
}
這個方法比較長一些,我們把它分成3部分,一步一步看:
- 這一步見名知意,目的就是拿到master節點,並且是在死迴圈中,注意這一步拿到的不一定是真正的master節點,只是最有可能成為master的節點,我們要在下面再做驗證。
- 如果拿到的master節點是自己,那自己首先要得到足夠多的節點支援,如果在確定的時間中得到了足夠的票數,那麼就可以確認自己就是master節點,並啟動nodesFD開始檢測其他節點。如果不能得到足夠的票數,就要執行markThreadAsDoneAndStartNew()方法,相當於重新執行一次選舉,因為這裡callback是個非同步過程,所以這裡是先mark上次的執行緒已經結束,然後再重新向執行緒池提交任務。
- 如果拿到的master節點不是自己,那麼自己就要join到現有的master節點中,並且同步master節點的clusterState。在callback中獲取自己任務的master節點的clusterState,並做一系列判斷,如果自己認定的master不是真正的master,則重新發起一輪join任務(markThreadAsDoneAndStartNew());
下面我看下findMaster()方法:
private DiscoveryNode findMaster() {
logger.trace("starting to ping");
List<ZenPing.PingResponse> fullPingResponses = pingAndWait(pingTimeout).toList();
if (fullPingResponses == null) {
logger.trace("No full ping responses");
return null;
}
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder();
if (fullPingResponses.size() == 0) {
sb.append(" {none}");
} else {
for (ZenPing.PingResponse pingResponse : fullPingResponses) {
sb.append("\n\t--> ").append(pingResponse);
}
}
logger.trace("full ping responses:{}", sb);
}
final DiscoveryNode localNode = clusterService.localNode();
// add our selves
assert fullPingResponses.stream().map(ZenPing.PingResponse::node)
.filter(n -> n.equals(localNode)).findAny().isPresent() == false;
fullPingResponses.add(new ZenPing.PingResponse(localNode, null, clusterService.state()));
// filter responses
final List<ZenPing.PingResponse> pingResponses = filterPingResponses(fullPingResponses, masterElectionIgnoreNonMasters, logger);
List<DiscoveryNode> activeMasters = new ArrayList<>();
for (ZenPing.PingResponse pingResponse : pingResponses) {
// We can't include the local node in pingMasters list, otherwise we may up electing ourselves without
// any check / verifications from other nodes in ZenDiscover#innerJoinCluster()
if (pingResponse.master() != null && !localNode.equals(pingResponse.master())) {
activeMasters.add(pingResponse.master());
}
}
// nodes discovered during pinging
List<ElectMasterService.MasterCandidate> masterCandidates = new ArrayList<>();
for (ZenPing.PingResponse pingResponse : pingResponses) {
if (pingResponse.node().isMasterNode()) {
masterCandidates.add(new ElectMasterService.MasterCandidate(pingResponse.node(), pingResponse.getClusterStateVersion()));
}
}
if (activeMasters.isEmpty()) {
if (electMaster.hasEnoughCandidates(masterCandidates)) {
final ElectMasterService.MasterCandidate winner = electMaster.electMaster(masterCandidates);
logger.trace("candidate {} won election", winner);
return winner.getNode();
} else {
// if we don't have enough master nodes, we bail, because there are not enough master to elect from
logger.warn("not enough master nodes discovered during pinging (found [{}], but needed [{}]), pinging again",
masterCandidates, electMaster.minimumMasterNodes());
return null;
}
} else {
assert !activeMasters.contains(localNode) : "local node should never be elected as master when other nodes indicate an active master";
// lets tie break between discovered nodes
return electMaster.tieBreakActiveMasters(activeMasters);
}
}
大概做了以下事情,最終返回master節點:
- 同步ping所有已知節點,等待回覆。
- 將自己加入到pingResponse中,並根據配置,決定是否過濾掉非master資格節點,預設false.
- 將所有認為自己是master的節點加入activeMasters列表,將所有有master資格的節點加入masterCandidates列表。
- 如果activeMasters列表為空,證明沒有master節點,本地進行一輪選舉,選擇clusterStateVersion最大的節點作為master,clusterStateVersion相同的情況下按節點名排序。
- 如果activeMasters列表不為空,從所有activeMasters列表中,選擇clusterStateVersion最大的節點作為master,clusterStateVersion相同的情況下按節點名排序。
- 返回master節點。
再來看節點認為自己是master節點時的處理
if (clusterService.localNode().equals(masterNode)) {
final int requiredJoins = Math.max(0, electMaster.minimumMasterNodes() - 1); // we count as one
logger.debug("elected as master, waiting for incoming joins ([{}] needed)", requiredJoins);
nodeJoinController.waitToBeElectedAsMaster(requiredJoins, masterElectionWaitForJoinsTimeout,
new NodeJoinController.ElectionCallback() {
@Override
public void onElectedAsMaster(ClusterState state) {
joinThreadControl.markThreadAsDone(currentThread);
// we only starts nodesFD if we are master (it may be that we received a cluster state while pinging)
nodesFD.updateNodesAndPing(state); // start the nodes FD
}
@Override
public void onFailure(Throwable t) {
logger.trace("failed while waiting for nodes to join, rejoining", t);
joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
}
}
);
}
做的事情很清楚,根據配置,master節點需要等待至少electMaster.minimumMasterNodes()個節點支援才行(包括自己支援自己一票),這裡會同步等待一段時間,如果等到了足夠的票數,則callback中呼叫updateNodesAndPing,開始監控所有節點,否則需要呼叫failContextIfNeeded(),關閉context並停止選舉,重新開始一輪join任務。
最後來看節點認定的master不是自己的情況下如何處理:
} else {
// process any incoming joins (they will fail because we are not the master)
nodeJoinController.stopElectionContext(masterNode + " elected");
// send join request
final boolean success = joinElectedMaster(masterNode);
// finalize join through the cluster state update thread
final DiscoveryNode finalMasterNode = masterNode;
clusterService.submitStateUpdateTask("finalize_join (" + masterNode + ")", new LocalClusterUpdateTask() {
@Override
public ClusterTasksResult<LocalClusterUpdateTask> execute(ClusterState currentState) throws Exception {
if (!success) {
// failed to join. Try again...
joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
return unchanged();
}
if (currentState.getNodes().getMasterNode() == null) {
// Post 1.3.0, the master should publish a new cluster state before acking our join request. we now should have
// a valid master.
logger.debug("no master node is set, despite of join request completing. retrying pings.");
joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
return unchanged();
}
if (!currentState.getNodes().getMasterNode().equals(finalMasterNode)) {
return joinThreadControl.stopRunningThreadAndRejoin(currentState, "master_switched_while_finalizing_join");
}
// Note: we do not have to start master fault detection here because it's set at {@link #processNextPendingClusterState }
// when the first cluster state arrives.
joinThreadControl.markThreadAsDone(currentThread);
return unchanged();
}
@Override
public void onFailure(String source, @Nullable Exception e) {
logger.error("unexpected error while trying to finalize cluster join", e);
joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
}
});
}
首先就是要join到自己認為的master節點,但是這個master節點是自己認定的,不一定是真正的master節點,所以要根據master節點回復中的clusterState確定自己是否正確選擇了master節點,如果join失敗、或者回復說自己沒有master節點、或者回復說master節點不是自己選中的那個,都要重新進行一次join。