1. 程式人生 > >Elasticsearch加入叢集流程

Elasticsearch加入叢集流程

簡介

es節點啟動時,做的最重要的一件事就是加入叢集,今天分析下es節點加入叢集的原始碼。

主要知識點

es基礎

基於lucene的分散式搜尋引擎,不在累述,可以自行查閱資料。

Bully演算法

提到分散式選舉演算法,大家都知道Paxos演算法,但是這個演算法比較複雜,es選取的是一個很簡單的Bully演算法,演算法描述如下:

  1. 某個程序通過廣播,獲取所有能夠連線到的程序的ID。
  2. 如果發現自己是ID最大的程序,則廣播自己成為master節點,並廣播該訊息。
  3. 如果程序發現有其他程序比自己的ID大,則認為自己不能成為master,等待master廣播。

整個選主流程非常簡單,下面我們去es程式碼中一窺究竟。

程式碼分析

Node啟動的時候做了很多事情,我們這次主要關注的涉及以下程式碼,discovery的預設實現是ZenDiscovery。

public Node start() throws NodeValidationException {
    ...
    // 啟動Discovery,這個模組用來發現其他節點。
    discovery.start();
    transportService.acceptIncomingRequests();
    // 開始加入叢集
    discovery.startInitialJoin();
    ...
}

ZenDiscovery是Es預設實現的節點發現服務,上面呼叫了startInitialJoin()方法:

@Override
public void startInitialJoin() {
    // start the join thread from a cluster state update. See {@link JoinThreadControl} for details.
    clusterService.submitStateUpdateTask("initial_join", new LocalClusterUpdateTask() {

        @Override
        public ClusterTasksResult<LocalClusterUpdateTask> execute(ClusterState currentState) throws Exception {
            // do the join on a different thread, the DiscoveryService waits for 30s anyhow till it is discovered
            joinThreadControl.startNewThreadIfNotRunning();                return unchanged();
        }

        @Override
        public void onFailure(String source, @org.elasticsearch.common.Nullable Exception e) {
            logger.warn("failed to start initial join process", e);
        }
    });
}

這裡直接向clusterService提交了一個任務,在execute方法中,通過joinThreadControl來啟動一個執行緒進行join。JoinThreadControl是ZenDistovery的內部類,主要用來控制join執行緒,保證只有一個執行緒執行join任務,以及join成功後的處理,這樣可以使得startInitialJoin()方法迅速返回,不阻塞node節點的start方法。

下面來看下startNewThreadIfNotRunning()方法:

public void startNewThreadIfNotRunning() {
    ClusterService.assertClusterStateThread();
    if (joinThreadActive()) {
        return;
    }
    threadPool.generic().execute(new Runnable() {
        @Override
        public void run() {
            Thread currentThread = Thread.currentThread();
            if (!currentJoinThread.compareAndSet(null, currentThread)) {
                return;
            }
            while (running.get() && joinThreadActive(currentThread)) {
                try {
                    innerJoinCluster();
                    return;
                } catch (Exception e) {
                    logger.error("unexpected error while joining cluster,trying again", e);
                    // Because we catch any exception here, we want to know in
                    // tests if an uncaught exception got to this point and the test infra uncaught exception
                    // leak detection can catch this. In practise no uncaught exception should leak
                    assert ExceptionsHelper.reThrowIfNotNull(e);
                }
            }
            // cleaning the current thread from currentJoinThread is done by explicit calls.
        }
    });
}

這個方法也很簡單,直接呼叫了innerJoinCluster()方法,注意這裡是while循序,只要joinThreadControl還在執行,並且當前是當前執行緒在執行join任務,丟擲異常的情況下要重新執行,直到join成功。

再看下innerJoinCluster()方法:

private void innerJoinCluster() {
    //1--------------------------------
    DiscoveryNode masterNode = null;
    final Thread currentThread = Thread.currentThread();
    nodeJoinController.startElectionContext();
    while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) {
        masterNode = findMaster();
    }

    if (!joinThreadControl.joinThreadActive(currentThread)) {
        logger.trace("thread is no longer in currentJoinThread. Stopping.");
        return;
    }

    // 2-----------------------------
    if (clusterService.localNode().equals(masterNode)) {
        final int requiredJoins = Math.max(0, electMaster.minimumMasterNodes() - 1); // we count as one
        logger.debug("elected as master, waiting for incoming joins ([{}] needed)", requiredJoins);
        nodeJoinController.waitToBeElectedAsMaster(requiredJoins, masterElectionWaitForJoinsTimeout,
            new NodeJoinController.ElectionCallback() {
                    @Override
                    public void onElectedAsMaster(ClusterState state) {
                        joinThreadControl.markThreadAsDone(currentThread);
                        // we only starts nodesFD if we are master (it may be that we received a cluster state while pinging)
                        nodesFD.updateNodesAndPing(state); // start the nodes FD
                    }

                    @Override
                    public void onFailure(Throwable t) {
                        logger.trace("failed while waiting for nodes to join, rejoining", t);
                        joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                    }
                }

        );
        
    // 3------------------------------------
    } else {
        // process any incoming joins (they will fail because we are not the master)
        nodeJoinController.stopElectionContext(masterNode + " elected");

        // send join request
        final boolean success = joinElectedMaster(masterNode);

        // finalize join through the cluster state update thread
        final DiscoveryNode finalMasterNode = masterNode;
        clusterService.submitStateUpdateTask("finalize_join (" + masterNode + ")", new LocalClusterUpdateTask() {
            @Override
            public ClusterTasksResult<LocalClusterUpdateTask> execute(ClusterState currentState) throws Exception {
                if (!success) {
                    // failed to join. Try again...
                    joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                    return unchanged();
                }

                if (currentState.getNodes().getMasterNode() == null) {
                    // Post 1.3.0, the master should publish a new cluster state before acking our join request. we now should have
                    // a valid master.
                    logger.debug("no master node is set, despite of join request completing. retrying pings.");
                    joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                    return unchanged();
                }

                if (!currentState.getNodes().getMasterNode().equals(finalMasterNode)) {
                    return joinThreadControl.stopRunningThreadAndRejoin(currentState, "master_switched_while_finalizing_join");
                }

                // Note: we do not have to start master fault detection here because it's set at {@link #processNextPendingClusterState }
                // when the first cluster state arrives.
                joinThreadControl.markThreadAsDone(currentThread);
                return unchanged();
            }

            @Override
            public void onFailure(String source, @Nullable Exception e) {
                logger.error("unexpected error while trying to finalize cluster join", e);
                joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
            }
        });
    }
}

這個方法比較長一些,我們把它分成3部分,一步一步看:

  1. 這一步見名知意,目的就是拿到master節點,並且是在死迴圈中,注意這一步拿到的不一定是真正的master節點,只是最有可能成為master的節點,我們要在下面再做驗證。
  2. 如果拿到的master節點是自己,那自己首先要得到足夠多的節點支援,如果在確定的時間中得到了足夠的票數,那麼就可以確認自己就是master節點,並啟動nodesFD開始檢測其他節點。如果不能得到足夠的票數,就要執行markThreadAsDoneAndStartNew()方法,相當於重新執行一次選舉,因為這裡callback是個非同步過程,所以這裡是先mark上次的執行緒已經結束,然後再重新向執行緒池提交任務。
  3. 如果拿到的master節點不是自己,那麼自己就要join到現有的master節點中,並且同步master節點的clusterState。在callback中獲取自己任務的master節點的clusterState,並做一系列判斷,如果自己認定的master不是真正的master,則重新發起一輪join任務(markThreadAsDoneAndStartNew());

下面我看下findMaster()方法:

private DiscoveryNode findMaster() {
    logger.trace("starting to ping");
    List<ZenPing.PingResponse> fullPingResponses = pingAndWait(pingTimeout).toList();
    if (fullPingResponses == null) {
        logger.trace("No full ping responses");
        return null;
    }
    if (logger.isTraceEnabled()) {
        StringBuilder sb = new StringBuilder();
        if (fullPingResponses.size() == 0) {
            sb.append(" {none}");
        } else {
            for (ZenPing.PingResponse pingResponse : fullPingResponses) {
                sb.append("\n\t--> ").append(pingResponse);
            }
        }
        logger.trace("full ping responses:{}", sb);
    }

    final DiscoveryNode localNode = clusterService.localNode();

    // add our selves
    assert fullPingResponses.stream().map(ZenPing.PingResponse::node)
        .filter(n -> n.equals(localNode)).findAny().isPresent() == false;

    fullPingResponses.add(new ZenPing.PingResponse(localNode, null, clusterService.state()));

    // filter responses
    final List<ZenPing.PingResponse> pingResponses = filterPingResponses(fullPingResponses, masterElectionIgnoreNonMasters, logger);

    List<DiscoveryNode> activeMasters = new ArrayList<>();
    for (ZenPing.PingResponse pingResponse : pingResponses) {
        // We can't include the local node in pingMasters list, otherwise we may up electing ourselves without
        // any check / verifications from other nodes in ZenDiscover#innerJoinCluster()
        if (pingResponse.master() != null && !localNode.equals(pingResponse.master())) {
            activeMasters.add(pingResponse.master());
        }
    }

    // nodes discovered during pinging
    List<ElectMasterService.MasterCandidate> masterCandidates = new ArrayList<>();
    for (ZenPing.PingResponse pingResponse : pingResponses) {
        if (pingResponse.node().isMasterNode()) {
            masterCandidates.add(new ElectMasterService.MasterCandidate(pingResponse.node(), pingResponse.getClusterStateVersion()));
        }
    }

    if (activeMasters.isEmpty()) {
        if (electMaster.hasEnoughCandidates(masterCandidates)) {
            final ElectMasterService.MasterCandidate winner = electMaster.electMaster(masterCandidates);
            logger.trace("candidate {} won election", winner);
            return winner.getNode();
        } else {
            // if we don't have enough master nodes, we bail, because there are not enough master to elect from
            logger.warn("not enough master nodes discovered during pinging (found [{}], but needed [{}]), pinging again",
                        masterCandidates, electMaster.minimumMasterNodes());
            return null;
        }
    } else {
        assert !activeMasters.contains(localNode) : "local node should never be elected as master when other nodes indicate an active master";
        // lets tie break between discovered nodes
        return electMaster.tieBreakActiveMasters(activeMasters);
    }
}

大概做了以下事情,最終返回master節點:

  1. 同步ping所有已知節點,等待回覆。
  2. 將自己加入到pingResponse中,並根據配置,決定是否過濾掉非master資格節點,預設false.
  3. 將所有認為自己是master的節點加入activeMasters列表,將所有有master資格的節點加入masterCandidates列表。
  4. 如果activeMasters列表為空,證明沒有master節點,本地進行一輪選舉,選擇clusterStateVersion最大的節點作為master,clusterStateVersion相同的情況下按節點名排序。
  5. 如果activeMasters列表不為空,從所有activeMasters列表中,選擇clusterStateVersion最大的節點作為master,clusterStateVersion相同的情況下按節點名排序。
  6. 返回master節點。

再來看節點認為自己是master節點時的處理

    if (clusterService.localNode().equals(masterNode)) {
        final int requiredJoins = Math.max(0, electMaster.minimumMasterNodes() - 1); // we count as one
        logger.debug("elected as master, waiting for incoming joins ([{}] needed)", requiredJoins);
        nodeJoinController.waitToBeElectedAsMaster(requiredJoins, masterElectionWaitForJoinsTimeout,
            new NodeJoinController.ElectionCallback() {
                    @Override
                    public void onElectedAsMaster(ClusterState state) {
                        joinThreadControl.markThreadAsDone(currentThread);
                        // we only starts nodesFD if we are master (it may be that we received a cluster state while pinging)
                        nodesFD.updateNodesAndPing(state); // start the nodes FD
                    }

                    @Override
                    public void onFailure(Throwable t) {
                        logger.trace("failed while waiting for nodes to join, rejoining", t);
                        joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                    }
                }

        );
    }

做的事情很清楚,根據配置,master節點需要等待至少electMaster.minimumMasterNodes()個節點支援才行(包括自己支援自己一票),這裡會同步等待一段時間,如果等到了足夠的票數,則callback中呼叫updateNodesAndPing,開始監控所有節點,否則需要呼叫failContextIfNeeded(),關閉context並停止選舉,重新開始一輪join任務。

最後來看節點認定的master不是自己的情況下如何處理:

    } else {
        // process any incoming joins (they will fail because we are not the master)
        nodeJoinController.stopElectionContext(masterNode + " elected");

        // send join request
        final boolean success = joinElectedMaster(masterNode);

        // finalize join through the cluster state update thread
        final DiscoveryNode finalMasterNode = masterNode;
        clusterService.submitStateUpdateTask("finalize_join (" + masterNode + ")", new LocalClusterUpdateTask() {
            @Override
            public ClusterTasksResult<LocalClusterUpdateTask> execute(ClusterState currentState) throws Exception {
                if (!success) {
                    // failed to join. Try again...
                    joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                    return unchanged();
                }

                if (currentState.getNodes().getMasterNode() == null) {
                    // Post 1.3.0, the master should publish a new cluster state before acking our join request. we now should have
                    // a valid master.
                    logger.debug("no master node is set, despite of join request completing. retrying pings.");
                    joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                    return unchanged();
                }

                if (!currentState.getNodes().getMasterNode().equals(finalMasterNode)) {
                    return joinThreadControl.stopRunningThreadAndRejoin(currentState, "master_switched_while_finalizing_join");
                }

                // Note: we do not have to start master fault detection here because it's set at {@link #processNextPendingClusterState }
                // when the first cluster state arrives.
                joinThreadControl.markThreadAsDone(currentThread);
                return unchanged();
            }

            @Override
            public void onFailure(String source, @Nullable Exception e) {
                logger.error("unexpected error while trying to finalize cluster join", e);
                joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
            }
        });
    }

首先就是要join到自己認為的master節點,但是這個master節點是自己認定的,不一定是真正的master節點,所以要根據master節點回復中的clusterState確定自己是否正確選擇了master節點,如果join失敗、或者回復說自己沒有master節點、或者回復說master節點不是自己選中的那個,都要重新進行一次join。