1. 程式人生 > >zookeeper中的預設選舉過程

zookeeper中的預設選舉過程

zookeeper的3.4.0後預設Leader選舉演算法只保留了Tcp版本的FastLeaderElection演算法,該演算法的啟動在QuorumPeer中的start()方法中,的startLeaderElection()方法。

    synchronized public void startLeaderElection() {
    	try {
    		currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
    	} catch(IOException e) {
    		RuntimeException re = new RuntimeException(e.getMessage());
    		re.setStackTrace(e.getStackTrace());
    		throw re;
    	}
        for (QuorumServer p : getView().values()) {
            if (p.id == myid) {
                myQuorumAddr = p.addr;
                break;
            }
        }
        if (myQuorumAddr == null) {
            throw new RuntimeException("My id " + myid + " not in the peer list");
        }
        if (electionType == 0) {
            try {
                udpSocket = new DatagramSocket(myQuorumAddr.getPort());
                responder = new ResponderThread();
                responder.start();
            } catch (SocketException e) {
                throw new RuntimeException(e);
            }
        }
        this.electionAlg = createElectionAlgorithm(electionType);
    }
該方法首先生成一張投自己的選票,把自己的id,zxid跟當前的epoch數封裝成一張選票。最後通過createElectionAlgorithm()方法生成選舉功能的類。
    protected Election createElectionAlgorithm(int electionAlgorithm){
        Election le=null;
                
        //TODO: use a factory rather than a switch
        switch (electionAlgorithm) {
        case 0:
            le = new LeaderElection(this);
            break;
        case 1:
            le = new AuthFastLeaderElection(this);
            break;
        case 2:
            le = new AuthFastLeaderElection(this, true);
            break;
        case 3:
            qcm = new QuorumCnxManager(this);
            QuorumCnxManager.Listener listener = qcm.listener;
            if(listener != null){
                listener.start();
                le = new FastLeaderElection(this, qcm);
            } else {
                LOG.error("Null listener when initializing cnx manager");
            }
            break;
        default:
            assert false;
        }
        return le;
    }

Zookeeper中electionAlgorithm的值預設為3,所以這裡生成QuorumCnxManager跟FastLeaderElection來完成選舉功能。這裡FastLeaderElection僅完成了構造,併為開始選舉。

    public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){
        this.stop = false;
        this.manager = manager;
        starter(self, manager);
    }

    private void starter(QuorumPeer self, QuorumCnxManager manager) {
        this.self = self;
        proposedLeader = -1;
        proposedZxid = -1;

        sendqueue = new LinkedBlockingQueue<ToSend>();
        recvqueue = new LinkedBlockingQueue<Notification>();
        this.messenger = new Messenger(manager);
    }
        Messenger(QuorumCnxManager manager) {

            this.ws = new WorkerSender(manager);

            Thread t = new Thread(this.ws,
                    "WorkerSender[myid=" + self.getId() + "]");
            t.setDaemon(true);
            t.start();

            this.wr = new WorkerReceiver(manager);

            t = new Thread(this.wr,
                    "WorkerReceiver[myid=" + self.getId() + "]");
            t.setDaemon(true);
            t.start();
        }

構造方法生成兩條佇列來分別存放要傳送的訊息跟接收到的訊息。同時通過之前一起生成的QuorumCnxManager(負責與各個伺服器之間的通訊)來生成workerSender跟workReceiver兩條執行緒來分別負責訊息的接收解析與傳送。

            public void run() {
                while (!stop) {
                    try {
                        ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
                        if(m == null) continue;

                        process(m);
                    } catch (InterruptedException e) {
                        break;
                    }
                }
                LOG.info("WorkerSender is down");
            }

其中workerSender的run方法實現很簡單,不斷從send佇列中取得訊息,傳入process方法。

            void process(ToSend m) {
                ByteBuffer requestBuffer = buildMsg(m.state.ordinal(), 
                                                        m.leader,
                                                        m.zxid, 
                                                        m.electionEpoch, 
                                                        m.peerEpoch);
                manager.toSend(m.sid, requestBuffer);
            }

process無非把從sendqueue佇列中取得的訊息,交給QuorumCnxManager傳送給別的伺服器。

我們再來看下負責接收訊息的workeReceiver的run()方法。

            public void run() {

                Message response;
                while (!stop) {
                    // Sleeps on receive
                    try{
                        response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
                        if(response == null) continue;

                        /*
                         * If it is from an observer, respond right away.
                         * Note that the following predicate assumes that
                         * if a server is not a follower, then it must be
                         * an observer. If we ever have any other type of
                         * learner in the future, we'll have to change the
                         * way we check for observers.
                         */
                        if(!self.getVotingView().containsKey(response.sid)){
                            Vote current = self.getCurrentVote();
                            ToSend notmsg = new ToSend(ToSend.mType.notification,
                                    current.getId(),
                                    current.getZxid(),
                                    logicalclock,
                                    self.getPeerState(),
                                    response.sid,
                                    current.getPeerEpoch());

                            sendqueue.offer(notmsg);
                        } else {
                            // Receive new message
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Receive new notification message. My id = "
                                        + self.getId());
                            }

                            /*
                             * We check for 28 bytes for backward compatibility
                             */
                            if (response.buffer.capacity() < 28) {
                                LOG.error("Got a short response: "
                                        + response.buffer.capacity());
                                continue;
                            }
                            boolean backCompatibility = (response.buffer.capacity() == 28);
                            response.buffer.clear();

                            // Instantiate Notification and set its attributes
                            Notification n = new Notification();
                            
                            // State of peer that sent this message
                            QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
                            switch (response.buffer.getInt()) {
                            case 0:
                                ackstate = QuorumPeer.ServerState.LOOKING;
                                break;
                            case 1:
                                ackstate = QuorumPeer.ServerState.FOLLOWING;
                                break;
                            case 2:
                                ackstate = QuorumPeer.ServerState.LEADING;
                                break;
                            case 3:
                                ackstate = QuorumPeer.ServerState.OBSERVING;
                                break;
                            default:
                                continue;
                            }
                            
                            n.leader = response.buffer.getLong();
                            n.zxid = response.buffer.getLong();
                            n.electionEpoch = response.buffer.getLong();
                            n.state = ackstate;
                            n.sid = response.sid;
                            if(!backCompatibility){
                                n.peerEpoch = response.buffer.getLong();
                            } else {
                                if(LOG.isInfoEnabled()){
                                    LOG.info("Backward compatibility mode, server id=" + n.sid);
                                }
                                n.peerEpoch = ZxidUtils.getEpochFromZxid(n.zxid);
                            }

                            /*
                             * Version added in 3.4.6
                             */

                            n.version = (response.buffer.remaining() >= 4) ? 
                                         response.buffer.getInt() : 0x0;

                            /*
                             * Print notification info
                             */
                            if(LOG.isInfoEnabled()){
                                printNotification(n);
                            }

                            /*
                             * If this server is looking, then send proposed leader
                             */

                            if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
                                recvqueue.offer(n);

                                /*
                                 * Send a notification back if the peer that sent this
                                 * message is also looking and its logical clock is
                                 * lagging behind.
                                 */
                                if((ackstate == QuorumPeer.ServerState.LOOKING)
                                        && (n.electionEpoch < logicalclock)){
                                    Vote v = getVote();
                                    ToSend notmsg = new ToSend(ToSend.mType.notification,
                                            v.getId(),
                                            v.getZxid(),
                                            logicalclock,
                                            self.getPeerState(),
                                            response.sid,
                                            v.getPeerEpoch());
                                    sendqueue.offer(notmsg);
                                }
                            } else {
                                /*
                                 * If this server is not looking, but the one that sent the ack
                                 * is looking, then send back what it believes to be the leader.
                                 */
                                Vote current = self.getCurrentVote();
                                if(ackstate == QuorumPeer.ServerState.LOOKING){
                                    if(LOG.isDebugEnabled()){
                                        LOG.debug("Sending new notification. My id =  " +
                                                self.getId() + " recipient=" +
                                                response.sid + " zxid=0x" +
                                                Long.toHexString(current.getZxid()) +
                                                " leader=" + current.getId());
                                    }
                                    
                                    ToSend notmsg;
                                    if(n.version > 0x0) {
                                        notmsg = new ToSend(
                                                ToSend.mType.notification,
                                                current.getId(),
                                                current.getZxid(),
                                                current.getElectionEpoch(),
                                                self.getPeerState(),
                                                response.sid,
                                                current.getPeerEpoch());
                                        
                                    } else {
                                        Vote bcVote = self.getBCVote();
                                        notmsg = new ToSend(
                                                ToSend.mType.notification,
                                                bcVote.getId(),
                                                bcVote.getZxid(),
                                                bcVote.getElectionEpoch(),
                                                self.getPeerState(),
                                                response.sid,
                                                bcVote.getPeerEpoch());
                                    }
                                    sendqueue.offer(notmsg);
                                }
                            }
                        }
                    } catch (InterruptedException e) {
                        System.out.println("Interrupted Exception while waiting for new message" +
                                e.toString());
                    }
                }
                LOG.info("WorkerReceiver is down");
            }
        }

這個run方法相比之下真的是複雜多了,我們一點一點來看。

先從QuorumCnxManager中存放接收到的訊息的佇列中去取得,然後對這個訊息進行解析。取得發訊息的伺服器id來判斷該id是否處於投票伺服器佇列中,如果不在(該伺服器角色可能是observer),則忽略該訊息,並且傳送一條包含自己選票資訊的訊息給這個伺服器。針對一條訊息確定這個訊息的大小應該大於28位元組,判斷訊息傳送的角色(int)等資訊,最後將該訊息資訊注入到notification類中,方便後續複用。

之後判斷此時伺服器處於什麼角色,如果是looking,那麼說明此時伺服器需要這條訊息來參與選舉過程,那麼將該訊息存入到QuorumCnxManager中存放接收訊息的佇列中。如果此時,訊息的傳送方伺服器也處於looking狀態,並且它的輪數小於當前輪數,於是將自己當前的選票傳送給這條訊息的傳送者。

如果此時,當前伺服器不處於looking角色並且傳送方處於looking角色,那麼則將選舉結果返回。

以上就是兩條執行緒的邏輯,也就是FastLeaderElection內部接收好傳送訊息的邏輯。

都是準備工作,選舉真正邏輯的開始在QuorumPeer執行緒開啟後,呼叫其run方法。在run()方法中,如果當前處於looking狀態中,則呼叫FastLeaderElection的lookForLeader()方法,正式開始選舉。

                        try {
                            roZkMgr.start();
                            setBCVote(null);
                            setCurrentVote(makeLEStrategy().lookForLeader());
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception",e);
                            setPeerState(ServerState.LOOKING);
                        } finally {
                            // If the thread is in the the grace period, interrupt
                            // to come out of waiting.
                            roZkMgr.interrupt();
                            roZk.shutdown();
                        }
                    } else {
                        try {
                            setBCVote(null);
                            setCurrentVote(makeLEStrategy().lookForLeader());
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception", e);
                            setPeerState(ServerState.LOOKING);
                        }
                    }
                    break;

我們來看下LookForLeader的邏輯。以下是註釋給的方法說明。

開始新一輪領導人選舉。 每當我們的QuorumPeer將其狀態更改為LOOKING,並呼叫此方法傳送通知給所有其他同行。

    public Vote lookForLeader() throws InterruptedException {
        try {
            self.jmxLeaderElectionBean = new LeaderElectionBean();
            MBeanRegistry.getInstance().register(
                    self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
        } catch (Exception e) {
            LOG.warn("Failed to register with JMX", e);
            self.jmxLeaderElectionBean = null;
        }
        if (self.start_fle == 0) {
           self.start_fle = System.currentTimeMillis();
        }
        try {
            HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();

            HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();

            int notTimeout = finalizeWait;

            synchronized(this){
                logicalclock++;
                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
            }

            LOG.info("New election. My id =  " + self.getId() +
                    ", proposed zxid=0x" + Long.toHexString(proposedZxid));
            sendNotifications();

            /*
             * Loop in which we exchange notifications until we find a leader
             */

            while ((self.getPeerState() == ServerState.LOOKING) &&
                    (!stop)){
                /*
                 * Remove next notification from queue, times out after 2 times
                 * the termination time
                 */
                Notification n = recvqueue.poll(notTimeout,
                        TimeUnit.MILLISECONDS);

                /*
                 * Sends more notifications if haven't received enough.
                 * Otherwise processes new notification.
                 */
                if(n == null){
                    if(manager.haveDelivered()){
                        sendNotifications();
                    } else {
                        manager.connectAll();
                    }

                    /*
                     * Exponential backoff
                     */
                    int tmpTimeOut = notTimeout*2;
                    notTimeout = (tmpTimeOut < maxNotificationInterval?
                            tmpTimeOut : maxNotificationInterval);
                    LOG.info("Notification time out: " + notTimeout);
                }
                else if(self.getVotingView().containsKey(n.sid)) {
                    /*
                     * Only proceed if the vote comes from a replica in the
                     * voting view.
                     */
                    switch (n.state) {
                    case LOOKING:
                        // If notification > current, replace and send messages out
                        if (n.electionEpoch > logicalclock) {
                            logicalclock = n.electionEpoch;
                            recvset.clear();
                            if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                    getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                                updateProposal(n.leader, n.zxid, n.peerEpoch);
                            } else {
                                updateProposal(getInitId(),
                                        getInitLastLoggedZxid(),
                                        getPeerEpoch());
                            }
                            sendNotifications();
                        } else if (n.electionEpoch < logicalclock) {
                            if(LOG.isDebugEnabled()){
                                LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
                                        + Long.toHexString(n.electionEpoch)
                                        + ", logicalclock=0x" + Long.toHexString(logicalclock));
                            }
                            break;
                        } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                proposedLeader, proposedZxid, proposedEpoch)) {
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                            sendNotifications();
                        }

                        if(LOG.isDebugEnabled()){
                            LOG.debug("Adding vote: from=" + n.sid +
                                    ", proposed leader=" + n.leader +
                                    ", proposed zxid=0x" + Long.toHexString(n.zxid) +
                                    ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
                        }

                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

                        if (termPredicate(recvset,
                                new Vote(proposedLeader, proposedZxid,
                                        logicalclock, proposedEpoch))) {

                            // Verify if there is any change in the proposed leader
                            while((n = recvqueue.poll(finalizeWait,
                                    TimeUnit.MILLISECONDS)) != null){
                                if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                        proposedLeader, proposedZxid, proposedEpoch)){
                                    recvqueue.put(n);
                                    break;
                                }
                            }

                            /*
                             * This predicate is true once we don't read any new
                             * relevant message from the reception queue
                             */
                            if (n == null) {
                                self.setPeerState((proposedLeader == self.getId()) ?
                                        ServerState.LEADING: learningState());

                                Vote endVote = new Vote(proposedLeader,
                                                        proposedZxid,
                                                        logicalclock,
                                                        proposedEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }
                        break;
                    case OBSERVING:
                        LOG.debug("Notification from observer: " + n.sid);
                        break;
                    case FOLLOWING:
                    case LEADING:
                        /*
                         * Consider all notifications from the same epoch
                         * together.
                         */
                        if(n.electionEpoch == logicalclock){
                            recvset.put(n.sid, new Vote(n.leader,
                                                          n.zxid,
                                                          n.electionEpoch,
                                                          n.peerEpoch));
                           
                            if(ooePredicate(recvset, outofelection, n)) {
                                self.setPeerState((n.leader == self.getId()) ?
                                        ServerState.LEADING: learningState());

                                Vote endVote = new Vote(n.leader, 
                                        n.zxid, 
                                        n.electionEpoch, 
                                        n.peerEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }

                        /*
                         * Before joining an established ensemble, verify
                         * a majority is following the same leader.
                         */
                        outofelection.put(n.sid, new Vote(n.version,
                                                            n.leader,
                                                            n.zxid,
                                                            n.electionEpoch,
                                                            n.peerEpoch,
                                                            n.state));
           
                        if(ooePredicate(outofelection, outofelection, n)) {
                            synchronized(this){
                                logicalclock = n.electionEpoch;
                                self.setPeerState((n.leader == self.getId()) ?
                                        ServerState.LEADING: learningState());
                            }
                            Vote endVote = new Vote(n.leader,
                                                    n.zxid,
                                                    n.electionEpoch,
                                                    n.peerEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                        break;
                    default:
                        LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)",
                                n.state, n.sid);
                        break;
                    }
                } else {
                    LOG.warn("Ignoring notification from non-cluster member " + n.sid);
                }
            }
            return null;
        } finally {
            try {
                if(self.jmxLeaderElectionBean != null){
                    MBeanRegistry.getInstance().unregister(
                            self.jmxLeaderElectionBean);
                }
            } catch (Exception e) {
                LOG.warn("Failed to unregister with JMX", e);
            }
            self.jmxLeaderElectionBean = null;
        }
    }

先是關於jmx的註冊(暫時忽略)。記錄下選舉開始的時間,在選舉完正式leader開始工作後計算兩者時間間隔得出選舉時長。然後在選舉開始前給logicalclock加一,表示開啟新一輪選舉。

首先呼叫updateProposal投自己為leader,然後呼叫sendNotifactions(),就是給所有參與投票的伺服器傳送訊息。

    private void sendNotifications() {
        for (QuorumServer server : self.getVotingView().values()) {
            long sid = server.id;

            ToSend notmsg = new ToSend(ToSend.mType.notification,
                    proposedLeader,
                    proposedZxid,
                    logicalclock,
                    QuorumPeer.ServerState.LOOKING,
                    sid,
                    proposedEpoch);
            if(LOG.isDebugEnabled()){
                LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x"  +
                      Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock)  +
                      " (n.round), " + sid + " (recipient), " + self.getId() +
                      " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
            }
            sendqueue.offer(notmsg);
        }
    }

這個方法方便了所有配置了的參與投票的伺服器,自動構造訊息傳送自己的選票給其他伺服器,當然只是把訊息加入到傳送佇列中。

如果自己還是looking狀態,那麼開始while迴圈,根據收到的訊息進行解析,進行相應的操作。

剛剛接收執行緒中的邏輯已經確保此時收到的是傳送方是參與投票的伺服器的選舉訊息。此時根據收到的訊息的傳送方的狀態進行相應的操作。首先如果對方是looking狀態,那麼根據收到的訊息的輪數跟當前伺服器輪數比較,如果比當前伺服器輪數大,說明當前伺服器有一段時間沒有參與叢集同步(可能宕機了)那麼,當前伺服器則沒有資格參與leader競選,把剛剛收到的輪數更大的票選當做自己的新一輪票選。並且清空自己的接收到的票選的集合,重新開始接受票選。

當然,如果接受到的輪數小於當前伺服器輪數則忽略該票選(訊息)。

如果兩個票選輪數相同,則呼叫totalOrderPredicate()方法。

    protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
        LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +
                Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
        if(self.getQuorumVerifier().getWeight(newId) == 0){
            return false;
        }
        
        /*
         * We return true if one of the following three cases hold:
         * 1- New epoch is higher
         * 2- New epoch is the same as current epoch, but new zxid is higher
         * 3- New epoch is the same as current epoch, new zxid is the same
         *  as current zxid, but server id is higher.
         */
        
        return ((newEpoch > curEpoch) || 
                ((newEpoch == curEpoch) &&
                ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
    }

先比較輪數,輪數相同則比較Zxid,Zxid相同則比較伺服器id。比較出大小後,以較大者票選為依據判斷是否更新自己的票選為接收到的票選。然後updateProposal更新,sendNotifications傳送給其他參與投票伺服器。然後將收到的訊息存於名為recvset的map中(以傳送方伺服器id為key,票選為value)。

一條訊息處理完成後,呼叫termPredicate來判斷是否有產生選舉結果。

    protected boolean termPredicate(
            HashMap<Long, Vote> votes,
            Vote vote) {

        HashSet<Long> set = new HashSet<Long>();

        /*
         * First make the views consistent. Sometimes peers will have
         * different zxids for a server depending on timing.
         */
        for (Map.Entry<Long,Vote> entry : votes.entrySet()) {
            if (vote.equals(entry.getValue())){
                set.add(entry.getKey());
            }
        }

        return self.getQuorumVerifier().containsQuorum(set);
    }

termPredicate(recvset,new Vote(proposedLeader, proposedZxid,logicalclock, proposedEpoch),我們可以看到以當前投票支援的leader為參照,遍歷收到的訊息集合,如果與當前選擇相同的伺服器個數超過參與選票的伺服器半數,那麼就確認了當前伺服器的選舉結果產生,結果就是自己所投票的那個伺服器。

                            while((n = recvqueue.poll(finalizeWait,
                                    TimeUnit.MILLISECONDS)) != null){
                                if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                        proposedLeader, proposedZxid, proposedEpoch)){
                                    recvqueue.put(n);
                                    break;
                                }
                            }

此時並不急著直接更新伺服器的狀態,而是等待一段時間(預設200ms)來確定是否有新的更優的投票。如果這一期間自己票選被更新,那麼把新訊息放回佇列,繼續之前的選舉流程。

                            if (n == null) {
                                self.setPeerState((proposedLeader == self.getId()) ?
                                        ServerState.LEADING: learningState());

                                Vote endVote = new Vote(proposedLeader,
                                                        proposedZxid,
                                                        logicalclock,
                                                        proposedEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }

如果超過等待時間後,沒有成功收到新訊息,那麼選舉結束,如果自己是選中的leader則成為leader,否則成為learner。

最後看下如果訊息傳送方是leading情況下的程式碼

                    case LEADING:
                        /*
                         * Consider all notifications from the same epoch
                         * together.
                         */
                        if(n.electionEpoch == logicalclock){
                            recvset.put(n.sid, new Vote(n.leader,
                                                          n.zxid,
                                                          n.electionEpoch,
                                                          n.peerEpoch));
                           
                            if(ooePredicate(recvset, outofelection, n)) {
                                self.setPeerState((n.leader == self.getId()) ?
                                        ServerState.LEADING: learningState());

                                Vote endVote = new Vote(n.leader, 
                                        n.zxid, 
                                        n.electionEpoch, 
                                        n.peerEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }

                        /*
                         * Before joining an established ensemble, verify
                         * a majority is following the same leader.
                         */
                        outofelection.put(n.sid, new Vote(n.version,
                                                            n.leader,
                                                            n.zxid,
                                                            n.electionEpoch,
                                                            n.peerEpoch,
                                                            n.state));
           
                        if(ooePredicate(outofelection, outofelection, n)) {
                            synchronized(this){
                                logicalclock = n.electionEpoch;
                                self.setPeerState((n.leader == self.getId()) ?
                                        ServerState.LEADING: learningState());
                            }
                            Vote endVote = new Vote(n.leader,
                                                    n.zxid,
                                                    n.electionEpoch,
                                                    n.peerEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                        break;
其實邏輯已經很簡單了,(這種情況通常是該臺機器啟動較晚,在它啟動前leader已經選舉出來)如果是leading那麼說明leader已經被選出了(先收集同一epoch下的訊息,判斷該leader是否是大多數伺服器的leader),此時直接根據收到票選訊息更新自己的角色,同步狀態。

相關推薦

zookeeper預設選舉過程

zookeeper的3.4.0後預設Leader選舉演算法只保留了Tcp版本的FastLeaderElection演算法,該演算法的啟動在QuorumPeer中的start()方法中,的startLeaderElection()方法。 synchronized publ

zookeeper master選舉與leader選舉

開始學zookeeper 中,master選舉於leader 選舉2個概念比較模糊,談下自己的理解,如有不對地方請指出共同進步。 1.master選舉原理 有多個master,每次只能有一個master負責主要的工作,其他的master作為備份,同時對負責工作的maste

Zookeeper預設選舉

zookeeper中預設的選舉模式是通過類FastLeaderElection類來實現的,在QuorumPeer類的啟動方法start()方法中,通過startLeaderElection()方法首先生成一開始自己的選票,首先會把投票的leader選為自己的id,同時帶上自己

Zookeeper之Leader選舉過程

Leader在叢集中是一個非常重要的角色,負責了整個事務的處理和排程,保證分散式資料一致性的關鍵所在。既然Leader在ZooKeeper叢集中這麼重要所以一定要保證叢集在任何時候都有且僅有一個Leader存在。 概念 Zookeeper Server三種角色:Leader,Follower,Observer

Zookeeper的FastLeaderElection選舉算法簡述

什麽 節點數 模式 系統 便是 sid 等待 基本 nbsp Zookeeper是一個開源的分布式應用協調項目, 當中為了保證各節點的協同工作,Zookeeper在工作時須要有一個Leader。 而Leader是怎樣被選舉出來的?Zookeep中使用的缺省算法稱為Fas

Zookeeper詳解(七):Zookeeper集群啟動過程和Leader選舉

文件 信息 accep upm ron factory 通信 pan actor Zookeeper集群啟動過程預啟動統一由QuorumPeerMain作為啟動類讀取zoo.cfg配置文件創建並啟動歷史文件清理器DatadirCleanupManager判斷當前是集群模式還

【分散式】Zookeeper的Leader選舉-選舉過程介紹

【分散式】Zookeeper的Leader選舉-選舉過程介紹 選舉開始,伺服器會各自為自己投票,在投票完成後,會將投票資訊傳送給叢集中的所有伺服器(觀察者伺服器不參與選舉)。 選票由兩部分組成:伺服器唯一標識myid和事務編號zxid,即(myid,xzid)。 zxid越大說明資料越新,在選擇演算法中

zookeeper選舉過程

以一個簡單的例子來說明整個選舉的過程。 假設有五臺伺服器組成的zookeeper叢集,它們的id從1-5,同時它們都是最新啟動的,也就是沒有歷史資料,在存放資料量這一點上,都是一樣的。假設這些伺服器依序啟動。 (1)伺服器1啟動,此時只有它一臺伺服器啟動了,它發出去的報沒有任何響應,所以

VirtualBox安裝CentOS7過程記錄

linux ssh centos virtualbox 在開發過程中常常需要進行一些預研,而有些操作對操作系統可能具有破壞性且是不可恢復的,或者需要在不同的操作系統中去觀察結果,雖然在某些場合下Ghost可以解決一部分問題,但是有時候很繁瑣,因此在虛擬機中進行測試不失為一種不錯的選擇。自201

C#調用SQL存儲過程並用DataGridView顯示執行結果

exec char 登錄名 dataset type data comm and def //連接數據庫 SqlConnection con = new SqlConnection("server=服務器名稱;database=數據庫名稱;user id=登錄名;pwd=登

Sql 存儲過程詳細案例

為什麽 -- 加密 tro 腳本 tput lec 傳統 target 轉自:http://www.cnblogs.com/yank/p/4235609.html 概念 存儲過程(Stored Procedure):已預編譯為一個可執行過程的一個或多個SQL語句。

Zookeeper詳解(三):Zookeeper的Znode特性

zookeeper數據模型 znode 節點數據 數據模型ZK擁有一個命名空間就像一個精簡的文件系統,不同的是它的命名空間中的每個節點擁有它自己或者它下面子節點相關聯的數據。ZK中必須使用絕對路徑也就是使用“/”開頭。Znode:ZK目錄樹中每個節點對應一個Znode。每個Znode維護這一個屬性

MongoDB選舉過程

通過 結果 心跳 如果 pos 結束 接收 狀態 流程 MongoDB的復制集具有自動容忍部分節點宕機的功能,在復制集出現問題時時,會觸發選舉相關的過程,完成主從節點自動切換.每個復制集成員都會在後臺運行與復制集所有節點的心跳線程,在兩種情況下會觸發狀態檢測過程: 復制集

Zookeeper詳解(四):Zookeeper的zkCli.sh客戶端使用

zkCli.sh zookeeper客戶端 最好配置上環境變量連接操作:zkCli.sh -timeout 1000 -r -server 127.0.0.1 # -timeout 設置客戶端和服務器之間的超時時長,單位毫秒 # -r 只讀模式,不加就是讀寫模式 # -server IP:PORT 要

8.8.ZooKeeper 原理和選舉機制

TE 宋體 per 機制 CA tro 通過 family 沒有 1.ZooKeeper原理   Zookeeper雖然在配置文件中並沒有指定master和slave但是,zookeeper工作時,是有一個節點為leader,其他則為follower,Leader是通 過內

Access2010調用過程帶call與不帶call的問題

Access201 vba 過程調用 call 今天在上課,給學生講到了access2010中過程的調用,在印象中見過的access2010的材料都提到:過程的調用call是可以省略的。可在上課時就出現的問題了!!!一、 情境再現在講解過程的作用之一:可以減少代碼的重復提高共享效率。用了下面

php嵌入到html的執行過程

兩種 js代碼 瀏覽器 php代碼 技術 處理 htm img 解析 1. php嵌入到html中的執行過程?   當php功能模塊在處理一個php文件時,它只關心php代碼(使用php標簽包含的代碼),對於非php代碼,它會原樣輸出;   例如右圖代碼:     php功

zookeeper叢集的選舉機制

 Zookeeper預設的演算法是FastLeaderElection, 採用投票數大於半數則勝出的邏輯。     選舉依據:         伺服器ID: &n

VS設定逐過程執行屬性和運算子

在VS除錯程式碼的時候,想進入關鍵系或者屬性的具體實現的時候會彈出這個視窗 點選“是”之後,vs就直接跳過關鍵字,即使逐語句也無法進入具體實現,下次還是繼續彈出這個視窗。 點選“否”之後,再次除錯的時候就不會提示這個視窗,但是還是無法逐語句的進行除錯。 原因: VS在不知道

關於verilog綜合的過程,可綜合與不可綜合的理解

前言: 1):數位電路設計過程:         一:行為級:分析電路功能、效能以及其他相容性問題,只驗證設計功能,不考慮設計的任何時序資訊;         二:RTL級:暫存器級,只能使用可綜合語句結構進