1. 程式人生 > >elasticsearch叢集選舉原始碼解析

elasticsearch叢集選舉原始碼解析

elasticsearch的節點Node在啟動的時候(也就是在start()方法中)開始加入叢集,並準備參與選舉。

在Node的start()方法中,會呼叫ZenDiscovery的startInitialJoin()方法開始加入叢集並準備進行參與選舉。

@Override
public void startInitialJoin() {
    // start the join thread from a cluster state update. See {@link JoinThreadControl} for details.
    clusterService.submitStateUpdateTask("initial_join", new LocalClusterUpdateTask() {

        @Override
        public ClusterTasksResult<LocalClusterUpdateTask> execute(ClusterState currentState) throws Exception {
            // do the join on a different thread, the DiscoveryService waits for 30s anyhow till it is discovered
            joinThreadControl.startNewThreadIfNotRunning();
            return unchanged();
        }

        @Override
        public void onFailure(String source, @org.elasticsearch.common.Nullable Exception e) {
            logger.warn("failed to start initial join process", e);
        }
    });
}

這裡會向clusterService提交一個任務Task準備放入執行緒池中執行,這裡的實現是一個LocalClusterUpdateTask,重寫了execute(),其中實則是呼叫了joinThreadControl的startNewThreadIfNotRunning()。joinThreadControl作為ZenDiscovery的一個內部類,主要用來保證執行加入叢集執行緒的唯一性。

private final AtomicBoolean running = new AtomicBoolean(false);
private final AtomicReference<Thread> currentJoinThread = new AtomicReference<>();

joinThreadControl通過一個AtomicBoolean型別的running來表示Node加入叢集與選舉的開始與結束,而currentJoinThread則通過AtomicReference來保證工作執行緒的可見性與唯一性。

在startNewThreadIfNotRunning()方法中先通過joinThreadActive()方法確保當前並沒有工作執行緒在執行。

public boolean joinThreadActive() {
    Thread currentThread = currentJoinThread.get();
    return running.get() && currentThread != null && currentThread.isAlive();
}

如果沒有,那麼就新建一個工作執行緒準備開始加入叢集。

public void startNewThreadIfNotRunning() {
    ClusterService.assertClusterStateThread();
    if (joinThreadActive()) {
        return;
    }
    threadPool.generic().execute(new Runnable() {
        @Override
        public void run() {
            Thread currentThread = Thread.currentThread();
            if (!currentJoinThread.compareAndSet(null, currentThread)) {
                return;
            }
            while (running.get() && joinThreadActive(currentThread)) {
                try {
                    innerJoinCluster();
                    return;
                } catch (Exception e) {
                    logger.error("unexpected error while joining cluster, trying again", e);
                    // Because we catch any exception here, we want to know in
                    // tests if an uncaught exception got to this point and the test infra uncaught exception
                    // leak detection can catch this. In practise no uncaught exception should leak
                    assert ExceptionsHelper.reThrowIfNotNull(e);
                }
            }
            // cleaning the current thread from currentJoinThread is done by explicit calls.
        }
    });
}

在其run()方法中,通過cas更新curentJoinThread,並在服務結束之前,並且當前執行緒是ZenDiscovery的工作執行緒之時,不斷再迴圈中執行innerJoinCluster()。

InnerJoinCluster()分為兩步,在本節點還未選擇出自己所認定master節點之前,會一直不斷迴圈呼叫findMaster()去得到自己認定的master節點。

while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) {
    masterNode = findMaster();
}

在findMaster()中,會根據pingAndWait()方法去獲取叢集內其他節點關於選舉的ping請求的回覆,具體的獲取在之前的文章已經詳細解釋。

final DiscoveryNode localNode = clusterService.localNode();

// add our selves
assert fullPingResponses.stream().map(ZenPing.PingResponse::node)
    .filter(n -> n.equals(localNode)).findAny().isPresent() == false;

fullPingResponses.add(new ZenPing.PingResponse(localNode, null, clusterService.state()));

接下來會過濾掉本地節點的資料,重新加入當前本地節點的選舉資料,由於剛加入選舉的緣故,所以其還並沒有master節點的選擇。

final List<ZenPing.PingResponse> pingResponses = filterPingResponses(fullPingResponses, masterElectionIgnoreNonMasters, logger);

在預設情況下,masterElectionIgnoreNonMasters為false,因此data節點的選舉資料也會被考慮到選舉的過程中。

List<DiscoveryNode> activeMasters = new ArrayList<>();
for (ZenPing.PingResponse pingResponse : pingResponses) {
    // We can't include the local node in pingMasters list, otherwise we may up electing ourselves without
    // any check / verifications from other nodes in ZenDiscover#innerJoinCluster()
    if (pingResponse.master() != null && !localNode.equals(pingResponse.master())) {
        activeMasters.add(pingResponse.master());
    }
}

// nodes discovered during pinging
List<ElectMasterService.MasterCandidate> masterCandidates = new ArrayList<>();
for (ZenPing.PingResponse pingResponse : pingResponses) {
    if (pingResponse.node().isMasterNode()) {
        masterCandidates.add(new ElectMasterService.MasterCandidate(pingResponse.node(), pingResponse.getClusterStateVersion()));
    }
}

之後,遍歷所有收到的ping請求的節點結果,取出所有已經選出的不為自己的master節點,加入到activeMasters中。再遍歷所有ping請求的節點結果,將所有屬性master為true的節點加入到候選人陣列masterCandidates當中。

之後選擇如果activeMasters不為空,說明該叢集中已經存在master節點,那麼就在activeMasterss中選擇id最小的節點作為自己投票選擇的master節點,並返回。

public DiscoveryNode tieBreakActiveMasters(Collection<DiscoveryNode> activeMasters) {
    return activeMasters.stream().min(ElectMasterService::compareNodes).get();
}
private static int compareNodes(DiscoveryNode o1, DiscoveryNode o2) {
    if (o1.isMasterNode() && !o2.isMasterNode()) {
        return -1;
    }
    if (!o1.isMasterNode() && o2.isMasterNode()) {
        return 1;
    }
    return o1.getId().compareTo(o2.getId());
}

如果activeMasters為空,說明此時叢集還並沒有選舉出master節點。

if (electMaster.hasEnoughCandidates(masterCandidates)) {
    final ElectMasterService.MasterCandidate winner = electMaster.electMaster(masterCandidates);
    logger.trace("candidate {} won election", winner);
    return winner.getNode();
}

那麼首先判斷當前masterCandidates陣列中的候選節點個數是否已經大於最小開始選舉接節點數量(預設為-1),如果大於,則通過electMaster的electMaster()方法獲取自己所投票的master節點並返回。

public MasterCandidate electMaster(Collection<MasterCandidate> candidates) {
    assert hasEnoughCandidates(candidates);
    List<MasterCandidate> sortedCandidates = new ArrayList<>(candidates);
    sortedCandidates.sort(MasterCandidate::compare);
    return sortedCandidates.get(0);
}
public static int compare(MasterCandidate c1, MasterCandidate c2) {
    // we explicitly swap c1 and c2 here. the code expects "better" is lower in a sorted
    // list, so if c2 has a higher cluster state version, it needs to come first.
    int ret = Long.compare(c2.clusterStateVersion, c1.clusterStateVersion);
    if (ret == 0) {
        ret = compareNodes(c1.getNode(), c2.getNode());
    }
    return ret;
}
private static int compareNodes(DiscoveryNode o1, DiscoveryNode o2) {
    if (o1.isMasterNode() && !o2.isMasterNode()) {
        return -1;
    }
    if (!o1.isMasterNode() && o2.isMasterNode()) {
        return 1;
    }
    return o1.getId().compareTo(o2.getId());
}

這裡所要投票的master節點的選擇則是從候選節點陣列中選擇id最小版本最新的節點。

這樣,當前節點所要在選舉中投票的master節點已經被選出。

此時存在兩種情況,如果當前節點所選擇的master節點正式自己,則會正式準備成為master節點,但是前提是他必須收到叢集中別的節點的投票中有半數以上投向自己。

那麼便會開始呼叫waitToBeElectedAsMaster()方法準備接收別的節點的投票結果等待投自己的超過半數以成為master節點。

final CountDownLatch done = new CountDownLatch(1);
final ElectionCallback wrapperCallback = new ElectionCallback() {
    @Override
    public void onElectedAsMaster(ClusterState state) {
        done.countDown();
        callback.onElectedAsMaster(state);
    }

    @Override
    public void onFailure(Throwable t) {
        done.countDown();
        callback.onFailure(t);
    }
};

首先會生成一個CountDoneLatch用來等待別的叢集的投票和等待的timeout,並生成一個結束阻塞的callback用來在完成時結束阻塞並去完成一個節點正式成為master節點要做的流程。

synchronized (this) {
    assert electionContext != null : "waitToBeElectedAsMaster is called we are not accumulating joins";
    myElectionContext = electionContext;
    electionContext.onAttemptToBeElected(requiredMasterJoins, wrapperCallback);
    checkPendingJoinsAndElectIfNeeded();
}

try {
    if (done.await(timeValue.millis(), TimeUnit.MILLISECONDS)) {
        // callback handles everything
        return;
    }
} catch (InterruptedException e) {

}

CountDownLatch進入await在timeout的時間限制內等待別的節點的投票。

在ZenDiscovery的構造方法中就已經根據路徑discovery/zen/join註冊了相應的requestHandler,其中會觸發MemberShipListener的onJoin()方法,並呼叫handleJoinRequest()方法,在這個方法裡,主要會對於傳送join請求(也就是選舉投票)的節點進行驗證,驗證通過之後將會根據nodeJoinController的handleJoinRequest()方法對成為master節點的要求的投票給自己的節點數量masterJoinsCount加一,並判斷是否可以成為master節點。

public synchronized void handleJoinRequest(final DiscoveryNode node, final MembershipAction.JoinCallback callback) {
    if (electionContext != null) {
        electionContext.addIncomingJoin(node, callback);
        checkPendingJoinsAndElectIfNeeded();
    } else {
        clusterService.submitStateUpdateTask("zen-disco-node-join",
            node, ClusterStateTaskConfig.build(Priority.URGENT),
            joinTaskExecutor, new JoinTaskListener(callback, logger));
    }
}
private synchronized void checkPendingJoinsAndElectIfNeeded() {
    assert electionContext != null : "election check requested but no active context";
    final int pendingMasterJoins = electionContext.getPendingMasterJoinsCount();
    if (electionContext.isEnoughPendingJoins(pendingMasterJoins) == false) {
        if (logger.isTraceEnabled()) {
            logger.trace("not enough joins for election. Got [{}], required [{}]", pendingMasterJoins,
                electionContext.requiredMasterJoins);
        }
    } else {
        if (logger.isTraceEnabled()) {
            logger.trace("have enough joins for election. Got [{}], required [{}]", pendingMasterJoins,
                electionContext.requiredMasterJoins);
        }
        electionContext.closeAndBecomeMaster();
        electionContext = null; // clear this out so future joins won't be accumulated
    }
}

在checkPendingJoinsAndElectIfNeed()方法中,如果已經接收到的join請求也就是投票自己的節點數量已經超過叢集中節點數量的半數,那麼呼叫closeAndBecomeMaster()方法結束本次選舉正式成為master節點。

public void markThreadAsDoneAndStartNew(Thread joinThread) {
    ClusterService.assertClusterStateThread();
    if (!markThreadAsDone(joinThread)) {
        return;
    }
    startNewThreadIfNotRunning();
}

如果在規定的timeout裡,並沒有收到足夠的投票,那麼說明本節點的選舉失敗。則會回到通過markThreadAsDoneAndStartNew()關閉當前執行緒,並重新啟動一個執行緒在startNewThreadIfNotRunning()方法中開始下一次迴圈中,繼續上述選舉的流程參與選舉。

如果通過findMaster()得到的所要選舉的節點並不是自己,則會通過joinElectedMaster()方法向所選舉成為master的節點發送自己的投票。

while (true) {
    try {
        logger.trace("joining master {}", masterNode);
        membership.sendJoinRequestBlocking(masterNode, clusterService.localNode(), joinTimeout);
        return true;
    } catch (Exception e) {
        final Throwable unwrap = ExceptionsHelper.unwrapCause(e);
        if (unwrap instanceof NotMasterException) {
            if (++joinAttempt == this.joinRetryAttempts) {
                logger.info("failed to send join request to master [{}], reason [{}], tried [{}] times", masterNode, ExceptionsHelper.detailedMessage(e), joinAttempt);
                return false;
            } else {
                logger.trace("master {} failed with [{}]. retrying... (attempts done: [{}])", masterNode, ExceptionsHelper.detailedMessage(e), joinAttempt);
            }
        } else {
            if (logger.isTraceEnabled()) {
                logger.trace((Supplier<?>) () -> new ParameterizedMessage("failed to send join request to master [{}]", masterNode), e);
            } else {
                logger.info("failed to send join request to master [{}], reason [{}]", masterNode, ExceptionsHelper.detailedMessage(e));
            }
            return false;
        }
    }

    try {
        Thread.sleep(this.joinRetryDelay.millis());
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}

傳送的目標節點的路徑也正是前文中所提到的discovery/zen/join。如果在有限次數內成功,就說明當前節點所投票的目標節點成功成為master節點,本次選舉也宣告完成。

而若是在有限次數內都沒有成功(選舉的節點沒有收到超過半數master選票或者因為種種原因關閉)則會返回false,會和之前試圖成為master節點失敗一樣,重新開啟一個執行緒去參與下一輪選舉。如果成功,直接退出即可,有關跟master同步的master fault detection已經在clusterService中被開啟。