1. 程式人生 > >ZK叢集的Leader選舉原始碼閱讀

ZK叢集的Leader選舉原始碼閱讀

前言

ZooKeeper對Zab協議的實現有自己的主備模型,即Leader和learner(Observer + Follower),有如下幾種情況需要進行領導者的選舉工作

  • 情形1: 叢集在啟動的過程中,需要選舉Leader
  • 情形2: 叢集正常啟動後,leader因故障掛掉了,需要選舉Leader
  • 情形3: 叢集中的Follower數量不足以通過半數檢驗,Leader會掛掉自己,選舉新leader
  • 情景4: 叢集正常執行,新增加1個Follower

本篇博文,從這四個方面進行原始碼的追蹤閱讀

程式入口

QuorumPeer.java相當於叢集中的每一個節點server,在它的start()

方法中,完成當前節點的啟動工作,原始碼如下:

    // todo 進入了 QuorumPeer(意為仲裁人數)類中,可以把這個類理解成叢集中的某一個點
    @Override
    public synchronized void start() {
        // todo 從磁碟中載入資料到記憶體中
        loadDataBase();

        // todo 啟動上下文的這個工廠,他是個執行緒類, 接受客戶端的請求
        cnxnFactory.start();

        // todo 開啟leader的選舉工作
        startLeaderElection();

        // todo 確定伺服器的角色, 啟動的就是當前類的run方法在900行
        super.start();
    }

第一個loadDataBase();目的是將資料從叢集中恢復到記憶體中

第二個cnxnFactory.start();是當前的節點可以接受來自客戶端(java程式碼,或者控制檯)傳送過來的連線請求

第三個startLeaderElection();開啟leader的選舉工作, 但是其實他是初始化了一系列的輔助類,用來輔助leader的選舉,並非真正在選舉

當前類,quorumPeer繼承了ZKThread,它本身就是一個執行緒類,super.start();就是啟動它的run方法,在他的Run方法中有一個while迴圈,一開始在程式啟動的階段,所有的節點的預設值都是Looking,於是會進入這個分支中,在這個分之中會進行真正的leader選舉工作

小結

從程式的入口介紹中,可以看出本篇文章在會著重看下startLeaderElection();做了哪些工作? 以及在looking分支中如何選舉leader

情形1:叢集在啟動的過程中,選舉新Leader

進入startLeaderElection();方法,原始碼如下, 他主要做了兩件事

  • 對本類QuorumPeer.java維護的變數(volatile private Vote currentVote;)初始化
  • createElectionAlgorithm()建立一個leader選舉的方法

    其實到現在,就剩下一個演算法沒過期了,就是fastLeaderElection

   // TODO 開啟投票選舉Leader的工作
    synchronized public void startLeaderElection() {
        try {
            // todo 建立了一個封裝了投票結果物件   包含myid 最大的zxid 第幾輪Leader
            // todo 先投票給自己
            // todo 跟進它的建構函式
            currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
        } catch(IOException e) {
            RuntimeException re = new RuntimeException(e.getMessage());
            re.setStackTrace(e.getStackTrace());
            throw re;
        }
        for (QuorumServer p : getView().values()) {
            if (p.id == myid) {
                myQuorumAddr = p.addr;
                break;
            }
        }
        if (myQuorumAddr == null) {
            throw new RuntimeException("My id " + myid + " not in the peer list");
        }
        if (electionType == 0) {
            try {
                udpSocket = new DatagramSocket(myQuorumAddr.getPort());
                responder = new ResponderThread();
                responder.start();
            } catch (SocketException e) {
                throw new RuntimeException(e);
            }
        }
        // todo  建立一個領導者選舉的演算法,這個演算法還剩下一個唯一的實現 快速選舉
        this.electionAlg = createElectionAlgorithm(electionType);
    }

繼續跟進createElectionAlgorithm(electionType), 在這個方法中做了如下三件大事

  • 建立了QuorumCnxnManager
  • 建立Listenner
  • 建立FastLeaderElection
protected Election createElectionAlgorithm(int electionAlgorithm){
    Election le=null;
            
    //TODO: use a factory rather than a switch
    switch (electionAlgorithm) {
    case 0:
        le = new LeaderElection(this);
        break;
    case 1:
        le = new AuthFastLeaderElection(this);
        break;
    case 2:
        le = new AuthFastLeaderElection(this, true);
        break;
    case 3:
        // todo 建立CnxnManager 上下文的管理器
        qcm = createCnxnManager();
        QuorumCnxManager.Listener listener = qcm.listener;
        if(listener != null){

            // todo 在這裡將listener 開啟
            listener.start();
            // todo  例項化領導者選舉的演算法
            le = new FastLeaderElection(this, qcm);
        } else {
            LOG.error("Null listener when initializing cnx manager");
        }
        break;

準備選舉環境

QuorumManager

上圖是QuorumCnxManager的類圖,看一下,它有6個內部類, 其中的除了Message外其他都是可以單獨執行的執行緒類

這個類有著舉足輕重的作用,它是叢集中全體節點共享輔助類, 那到底有什麼作用呢? 我不賣關子直接說,因為leader的選舉是通過投票決議出來的,既然要相互投票,那叢集中的各個點就得兩兩之間建立連線,這個QuorumCnxManager就負責維護叢集中的各個點的通訊

它維護了兩種佇列,原始碼在下面,第一個佇列被存入了ConcurrentHashMap中 key就是節點的myid(或者說是serverId),值可以理解成儲存它往其他伺服器傳送投票的佇列

第二個佇列是收到的其他伺服器傳送過來的msg

// todo key=serverId(myid)   value = 儲存著當前伺服器向其他伺服器傳送訊息的佇列
final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;

// todo 接收到的所有資料都在這個佇列中
public final ArrayBlockingQueue<Message> recvQueue;

如上圖是手繪的QuorumCnxManager.java的體系圖,最直觀的可以看到它內部的三條執行緒類,那三條執行緒類的run()方法又分別做了什麼呢?

SendWorker的run(), 可以看到它根據sid取出了當前節點對應的佇列,然後將佇列中的資料往外發送


    public void run() {
            threadCnt.incrementAndGet();
            try {
                ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
                if (bq == null || isSendQueueEmpty(bq)) {
                   ByteBuffer b = lastMessageSent.get(sid);
                   if (b != null) {
                       LOG.debug("Attempting to send lastMessage to sid=" + sid);
                       send(b);
                   }
                }
            } catch (IOException e) {
                LOG.error("Failed to send last message. Shutting down thread.", e);
                this.finish();
            }
            
            try {
                while (running && !shutdown && sock != null) {

                    ByteBuffer b = null;
                    try {
                        // todo 取出任務所在的佇列
                        ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);

                        if (bq != null) {
                            // todo 將bq,新增進sendQueue
                            b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
                        } else {
                            LOG.error("No queue of incoming messages for " +
                                      "server " + sid);
                            break;
                        }

                        if(b != null){
                            lastMessageSent.put(sid, b);
                            // todo
                            send(b);
                        }
                    } catch (InterruptedException e) {
                        LOG.warn("Interrupted while waiting for message on queue",
                                e);
                    }
                }

RecvWorker的run方法,接受到了msg,然後將msg存入了recvQueue佇列中

        public void run() {
            threadCnt.incrementAndGet();
            try {
                while (running && !shutdown && sock != null) {
                    /**
                     * Reads the first int to determine the length of the
                     * message
                     */
                    int length = din.readInt();
                    if (length <= 0 || length > PACKETMAXSIZE) {
                        throw new IOException(
                                "Received packet with invalid packet: "
                                        + length);
                    }
                    /**
                     * Allocates a new ByteBuffer to receive the message
                     */
                    // todo 從陣列中把資料讀取到陣列中
                    byte[] msgArray = new byte[length];
                    din.readFully(msgArray, 0, length);
                    // todo 將陣列包裝成ByteBuf
                    ByteBuffer message = ByteBuffer.wrap(msgArray);
                    // todo 新增到RecvQueue中
                    addToRecvQueue(new Message(message.duplicate(), sid));
                }

]

Listenner的run(),它會使用我們在配置檔案中配置的叢集鍵通訊使用的埠(如上圖的3888)建立彼此之間的連線

還能發現,叢集中各個點之間的通訊使用的傳統socket通訊

        InetSocketAddress addr;
            while((!shutdown) && (numRetries < 3)){
                try {
                    // todo 建立serversocket
                    ss = new ServerSocket();
                    ss.setReuseAddress(true);
                    if (listenOnAllIPs) {
                        int port = view.get(QuorumCnxManager.this.mySid)
                            .electionAddr.getPort();
                        //todo 它取出來的地址就是address就是我們在配置檔案中配置叢集時新增進去的 port 3888...
                        addr = new InetSocketAddress(port);
                    } else {
                        addr = view.get(QuorumCnxManager.this.mySid)
                            .electionAddr;
                    }
                    LOG.info("My election bind port: " + addr.toString());
                    setName(view.get(QuorumCnxManager.this.mySid)
                            .electionAddr.toString());
                    // todo 繫結埠
                    ss.bind(addr);
                    while (!shutdown) {
                        // todo 阻塞接受其他的伺服器發起連線
                        Socket client = ss.accept();
                        setSockOpts(client);
                        LOG.info("Received connection request "
                                + client.getRemoteSocketAddress());
                       // todo  如果啟用了仲裁SASL身份驗證,則非同步接收和處理連線請求
                        // todo  這是必需的,因為sasl伺服器身份驗證過程可能需要幾秒鐘才能完成,這可能會延遲下一個對等連線請求。
                        if (quorumSaslAuthEnabled) {
                            // todo 非同步接受一個連線
                            receiveConnectionAsync(client);
                        } else {
                            // todo 跟進這個方法
                            receiveConnection(client);
                        }
                        numRetries = 0;
                    }

繼續跟進原始碼,回到QuorumPeer.javacreateElectionAlgorithm()方法中,重新擷取原始碼如下,完成了QuorumCnxManager的建立,後進行Listener的啟動, Listenner的啟動標記著叢集中的各個節點之間有了兩兩之間建立通訊能力, 同時Listenner是個執行緒類,它的Run()方法就在上面的程式碼中

FastLeaderElection

啟動Listenner之後, 開始例項化領導者選舉的演算法物件new FastLeaderElection(this, qcm)

    ...
     break;
        case 3:
            // todo 建立CnxnManager 上下文的管理器
            qcm = createCnxnManager();
   QuorumCnxManager.Listener listener = qcm.listener;
            if(listener != null){
                // todo 在這裡將listener 開啟
                listener.start();
                // todo  例項化領導者選舉的演算法
                le = new FastLeaderElection(this, qcm);
            } else {
                LOG.error("Null listener when initializing cnx manager");
            }

如下圖是FasterElection的類圖

直觀的看到它三個直接內部類

  • Messager(它又有兩個內部執行緒類)
    • WorkerRecriver
      • 負責將
    • WorkerSender
  • Notification
    • 一般是當新節點啟動時狀態為looking,然後發起投票決議,其他節點收到後會用Notification告訴它自己信任的leader
  • ToSend
    • 給對方傳送,或者來自其他節點的訊息。這些訊息既可以是通知,也可以是接收通知的ack

對應著QuorumCnxManager維護的兩種佇列,FasterElection同樣維護下面兩個佇列與之照應,一個是sendqueue另一個是recvqueue

LinkedBlockingQueue<ToSend> sendqueue;
LinkedBlockingQueue<Notification> recvqueue;

具體怎麼玩呢? 如下圖

就是當節點啟動過程中對外的投票會存入FasterElectionsendqueue,然後經過QuorumCnxManagersendWorker通過NIO傳送出去, 與之相反的過程,收到的其他節點的投票會被QuorumCnxManagerrecvWorker收到,然後存入QuorumCnxManagerrecvQueue中,這個佇列中的msg會繼續被FasterElection的內部執行緒類workerRecviver取出存放到FasterElectionrecvqueue中

通過追蹤程式碼,可以發現,Message的兩個內部執行緒都被作為守護執行緒的方式開啟

Messenger(QuorumCnxManager manager) {
    // todo WorderSender 作為一條新的執行緒
    this.ws = new WorkerSender(manager);

    Thread t = new Thread(this.ws,
            "WorkerSender[myid=" + self.getId() + "]");
    t.setDaemon(true);
    t.start();

    //todo------------------------------------
    // todo WorkerReceiver  作為一條新的執行緒
    this.wr = new WorkerReceiver(manager);

    t = new Thread(this.wr,
            "WorkerReceiver[myid=" + self.getId() + "]");
    t.setDaemon(true);
    t.start();
}

小結

程式碼看到這裡,其實選舉leader的準備工作已經完成了,也就是說quorumPeer.javastart()方法中的startLeaderElection();已經準備領導選舉的環境,就是上圖


真正開始選舉

下面就去看一下quorumPeer.java的這個執行緒類的啟動,部分run()方法的擷取,我們關心它的lookForLeader()方法

while (running) {
switch (getPeerState()) {
    /**
     * todo 四種可能的狀態, 經過了leader選舉之後, 不同的伺服器就有不同的角色
     * todo 也就是說,不同的伺服器會會走動下面不同的分支中
     * LOOKING 正在進行領導者選舉
     * Observing
     * Following
     * Leading
     */
case LOOKING:
    // todo 當為Looking狀態時,會進入領導者選舉的階段
    LOG.info("LOOKING");

    if (Boolean.getBoolean("readonlymode.enabled")) {
        LOG.info("Attempting to start ReadOnlyZooKeeperServer");

        // Create read-only server but don't start it immediately
        // todo 建立了一個 只讀的server但是不著急立即啟動它
        final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(
                logFactory, this,
                new ZooKeeperServer.BasicDataTreeBuilder(),
                this.zkDb);

        // Instead of starting roZk immediately, wait some grace(優雅) period(期間) before we decide we're partitioned.
        // todo 為了立即啟動roZK ,在我們決定分割槽之前先等一會
        // Thread is used here because otherwise it would require changes in each of election strategy classes which is
        // unnecessary code coupling.
        //todo  這裡新開啟一條執行緒,避免每一個選舉策略類上有不同的改變 而造成的程式碼的耦合
        Thread roZkMgr = new Thread() {
            public void run() {
                try {
                    // lower-bound grace period to 2 secs
                    sleep(Math.max(2000, tickTime));
                    if (ServerState.LOOKING.equals(getPeerState())) {
                        // todo 啟動上面那個只讀的Server
                        roZk.startup();
                    }
                } catch (InterruptedException e) {
                    LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
                } catch (Exception e) {
                    LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
                }
            }
        };
        try {
            roZkMgr.start();
            setBCVote(null);
            // todo 上面的程式碼都不關係,直接看它的 lookForLeader()方法
            // todo 直接點進去,進入的是介面,我們看它的實現類
            setCurrentVote(makeLEStrategy().lookForLeader());
        } catch (Exception e) {
            LOG.warn("Unexpected exception",e);
            setPeerState(ServerState.LOOKING);
        } finally {
            // If the thread is in the the grace period, interrupt
            // to come out of waiting.
            roZkMgr.interrupt();
            roZk.shutdown();
        }

下面是lookForLeader()的原始碼解讀
說實話這個方法還真的是挺長的,但是吧這個方法真的很重要,因為我們可以從這個方法中找到網路上大家針對Leader的選舉總結的點點滴滴

第一點: 每次的投票都會先投自己一票,說白了new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());將自己的myid,最大的zxid,以及第幾屆封裝起來,但是還有一個細節,就是在投自己的同時,還是會將存有自己資訊的這一票通過socket傳送給其他的節點

接受別人的投票是通過QuorumManagerrecvWorker執行緒類將投票新增進recvQueue佇列中,投票給自己時,就不走這條路線了,而是選擇直接將票新增進recvQueue佇列中

在下面程式碼中存在一行HashMap<Long, Vote> recvset = new HashMap<Long, Vote>(); 這個map可以理解成一個小信箱,每一個節點都會維護一個信箱,這裡面可能存放著自己投給自己的票,或者別人投給自己的票,或者別人投給別人的票,或者自己投給別人的票,通過統計這個信箱中的票數可以決定某一個節點是否可以成為leader,原始碼如下, 使用信箱中的資訊,

    // todo 根據別人的投票,以及自己的投票判斷,本輪得到投票的叢集能不能成為leader
    if (termPredicate(recvset,
            new Vote(proposedLeader, proposedZxid,
                    logicalclock.get(), proposedEpoch))) {
        // todo 到這裡說明接收到投票的機器已經是準leader了

        // Verify if there is any change in the proposed leader
        // todo 校驗一下, leader有沒有變動
        while ((n = recvqueue.poll(finalizeWait,
                TimeUnit.MILLISECONDS)) != null) {
            if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                    proposedLeader, proposedZxid, proposedEpoch)) {
                recvqueue.put(n);
                break;
            }
        }
        if (n == null) {
                // todo 判斷自己是不是leader, 如果是,更改自己的狀態未leading , 否則根據配置檔案確定狀態是 Observer 還是Follower
                // todo leader選舉出來後, QuorumPeer中的run方法中的while再迴圈,不同角色的伺服器就會進入到 不同的分支
                self.setPeerState((proposedLeader == self.getId()) ?
                        ServerState.LEADING : learningState());
        
                Vote endVote = new Vote(proposedLeader,
                        proposedZxid,
                        logicalclock.get(),
                        proposedEpoch);
                leaveInstance(endVote);
                return endVote;
            }
        }

termPredicate()函式中有如下的邏輯,self.getQuorumVerifier().containsQuorum(set);它的實現如下,實際上就是在進行過半機制的檢驗,結論就是當某個節點擁有了叢集中一半以上的節點的投票時,它就會把自己的狀態修改成leading, 其他的節點根據自己的需求將狀態該變成following或者observing

    public boolean containsQuorum(Set<Long> set){
        return (set.size() > half);
    }

維護著一個時鐘,標記這是第幾次投票了logicalclock他是AutomicLong型別的變數,他有什麼用呢? 通過下面的程式碼可以看到如下的邏輯,就是當自己的時鐘比當前接收到投票的時鐘小時,說明自己可能因為其他原因錯過了某次投票,所以更新自己的時鐘,重新判斷投自己還是投別人, 同理,如果接收到的投票的時鐘小於自己當前的時鐘,說明這個票是沒有意義的,直接丟棄不理會

   if (n.electionEpoch > logicalclock.get()) {
                                // todo 將自己的時鐘調整為更新的時間
                                logicalclock.set(n.electionEpoch);
                                // todo 清空自己的投票箱
                                recvset.clear();

那麼根據什麼判斷是投給自己還是投給別人呢? 通過解析出票的封裝類中封裝的節點的資訊,什麼資訊呢?zxid,myid,epoch 通常情況是epoch大的優先成為leader,一般來說epoch都會相同,所以zxid大的優先成為leader,如果zxid再相同,則myid大的優先成為leader

檢查到別的節點比自己更適合當leader,會重新投票,選舉更適合的節點

完整的原始碼

// todo 當前進入的是FastLeaderElection.java的實現類
public Vote lookForLeader() throws InterruptedException {
try {
    // todo 建立用來選舉Leader的Bean
    self.jmxLeaderElectionBean = new LeaderElectionBean();

    MBeanRegistry.getInstance().register(
            self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
} catch (Exception e) {
    LOG.warn("Failed to register with JMX", e);
    self.jmxLeaderElectionBean = null;
}
if (self.start_fle == 0) {
    self.start_fle = Time.currentElapsedTime();
}
try {
    // todo 每臺伺服器獨有的投票箱 , 存放其他伺服器投過來的票的map
    // todo long型別的key (sid)標記誰給當前的server投的票   Vote型別的value 投的票
    HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();

    HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();

    int notTimeout = finalizeWait;

    synchronized (this) {
        //todo Automic 型別的時鐘
        logicalclock.incrementAndGet();
        //todo 一開始啟動時,入參位置的值都取自己的,相當於投票給自己
        updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
    }

    LOG.info("New election. My id =  " + self.getId() +
            ", proposed zxid=0x" + Long.toHexString(proposedZxid));
    // todo 傳送出去,投票自己
    sendNotifications();

    /*
     * Loop in which we exchange notifications until we find a leader
     */
    // todo 如果自己一直處於LOOKING的狀態,一直迴圈
    while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
        /*
         * Remove next notification from queue, times out after 2 times
         * the termination time
         */
        //todo  嘗試獲取其他伺服器的投票的資訊

        // todo 從接受訊息的佇列中取出一個msg(這個佇列中的資料就是它投票給自己的票)
        // todo 在QuorumCxnManager.java中 傳送的投票的邏輯中,如果是傳送給自己的,就直接加到recvQueue,而不經過socket
        // todo 所以它在這裡是取出了自己的投票
        Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);

        /*
         * Sends more notifications if haven't received enough.
         * Otherwise processes new notification.
         */
        // todo 第一輪投票這裡不為空
        if (n == null) {
            // todo 第二輪就沒有投票了,為null, 進入這個分支
            // todo 進行判斷 ,如果叢集中有三臺伺服器,現在僅僅啟動一臺伺服器,還剩下兩臺伺服器沒啟動
            // todo 那就會有3票, 其中1票直接放到 recvQueue , 另外兩票需要傳送給其他兩臺機器的邏輯就在這裡判斷
            // todo 驗證是通不過的,因為queueSendMap中的兩條佇列都不為空
            if (manager.haveDelivered()) {
                sendNotifications();
            } else {
                // todo 進入這個邏輯
                manager.connectAll();
            }

            /*
             * Exponential backoff
             */
            int tmpTimeOut = notTimeout * 2;
            notTimeout = (tmpTimeOut < maxNotificationInterval ?
                    tmpTimeOut : maxNotificationInterval);
            LOG.info("Notification time out: " + notTimeout);
        } else if (validVoter(n.sid) && validVoter(n.leader)) {
            // todo 收到了其他伺服器的投票資訊後,來到下面的分支中處理
            /*
             * Only proceed if the vote comes from a replica in the
             * voting view for a replica in the voting view.
             * todo 僅當投票來自投票檢視中的副本時,才能繼續進行投票。
             */
            switch (n.state) {
                case LOOKING:
                    // todo 表示獲取到投票的伺服器的狀態也是looking

                    // If notification > current, replace and send messages out
                    // todo 對比接收到的頭片的 epoch和當前時鐘先後

                    // todo 接收到的投票 > 當前伺服器的時鐘
                    // todo 表示當前server在投票過程中可能以為故障比其他機器少投了幾次,需要重新投票
                    if (n.electionEpoch > logicalclock.get()) {
                        // todo 將自己的時鐘調整為更新的時間
                        logicalclock.set(n.electionEpoch);
                        // todo 清空自己的投票箱
                        recvset.clear();
                        // todo 用別人的資訊和自己的資訊對比,選出一個更適合當leader的,如果還是自己適合,不作為, 對方適合,修改投票,投 對方
                        if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                        } else {
                            updateProposal(getInitId(),
                                    getInitLastLoggedZxid(),
                                    getPeerEpoch());
                        }
                        sendNotifications();

                        // todo 接收到的投票 < 當前伺服器的時鐘
                        // todo 說明這個投票已經不能再用了
                    } else if (n.electionEpoch < logicalclock.get()) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
                                    + Long.toHexString(n.electionEpoch)
                                    + ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
                        }
                        break;
                        // todo 別人的投票時鐘和我的時鐘是相同的
                        // todo 滿足 totalOrderPredicate 後,會更改當前的投票,重新投票
                        /**
                         *   在 totalOrderPredicate 比較兩者之間誰更滿足條件
                         *   ((newEpoch > curEpoch) ||
                         *   ((newEpoch == curEpoch) &&
                         *   ((newZxid > curZxid) ||
                         *   ((newZxid == curZxid) &&
                         *   (newId > curId)))));
                         */
                        // todo 返回true說明 對方更適合當leader
                    } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                            proposedLeader, proposedZxid, proposedEpoch)) {
                        updateProposal(n.leader, n.zxid, n.peerEpoch);
                        sendNotifications();
                    }

                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Adding vote: from=" + n.sid +
                                ", proposed leader=" + n.leader +
                                ", proposed zxid=0x" + Long.toHexString(n.zxid) +
                                ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
                    }
                    // todo 將自己的投票存放到投票箱子中
                    recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

                    // todo 根據別人的投票,以及自己的投票判斷,本輪得到投票的叢集能不能成為leader
                    if (termPredicate(recvset,
                            new Vote(proposedLeader, proposedZxid,
                                    logicalclock.get(), proposedEpoch))) {
                        // todo 到這裡說明接收到投票的機器已經是準leader了

                        // Verify if there is any change in the proposed leader
                        // todo 校驗一下, leader有沒有變動
                        while ((n = recvqueue.poll(finalizeWait,
                                TimeUnit.MILLISECONDS)) != null) {
                            if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                    proposedLeader, proposedZxid, proposedEpoch)) {
                                recvqueue.put(n);
                                break;
                            }
                        }

                        /*
                         * This predicate is true once we don't read any new
                         * relevant message from the reception queue
                         */
                        if (n == null) {
                            // todo 判斷自己是不是leader, 如果是,更改自己的狀態未leading , 否則根據配置檔案確定狀態是 Observer 還是Follower
                            // todo leader選舉出來後, QuorumPeer中的run方法中的while再迴圈,不同角色的伺服器就會進入到 不同的分支
                            self.setPeerState((proposedLeader == self.getId()) ?
                                    ServerState.LEADING : learningState());

                            Vote endVote = new Vote(proposedLeader,
                                    proposedZxid,
                                    logicalclock.get(),
                                    proposedEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                    }
                    break;
                case OBSERVING:
                    // todo 禁止Observer參加投票
                    LOG.debug("Notification from observer: " + n.sid);
                    break;
                case FOLLOWING:
                case LEADING:
                    /*
                     * Consider all notifications from the same epoch
                     * together.
                     */
                    if (n.electionEpoch == logicalclock.get()) {
                        recvset.put(n.sid, new Vote(n.leader,
                                n.zxid,
                                n.electionEpoch,
                                n.peerEpoch));

                        if (ooePredicate(recvset, outofelection, n)) {
                            self.setPeerState((n.leader == self.getId()) ?
                                    ServerState.LEADING : learningState());

                            Vote endVote = new Vote(n.leader,
                                    n.zxid,
                                    n.electionEpoch,
                                    n.peerEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                    }

                    /*
                     * Before joining an established ensemble, verify
                     * a majority is following the same leader.
                     */
                    outofelection.put(n.sid, new Vote(n.version,
                            n.leader,
                            n.zxid,
                            n.electionEpoch,
                            n.peerEpoch,
                            n.state));

                    if (ooePredicate(outofelection, outofelection, n)) {
                        synchronized (this) {
                            logicalclock.set(n.electionEpoch);
                            self.setPeerState((n.leader == self.getId()) ?
                                    ServerState.LEADING : learningState());
                        }
                        Vote endVote = new Vote(n.leader,
                                n.zxid,
                                n.electionEpoch,
                                n.peerEpoch);
                        leaveInstance(endVote);
                        return endVote;
                    }
                    break;
                default:
                    LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)",
                            n.state, n.sid);
                    break;
            }
        } else {
            if (!validVoter(n.leader)) {
                LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
            }
            if (!validVoter(n.sid)) {
                LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
            }
        }
    }
    return null;

經過如上的判斷各個節點的就可以選舉出不同的角色,再次回到QuorumPeer.javarun()中進行迴圈時,不再會進入case LOOKING:程式碼塊了,而是按照自己不同的角色各司其職,完成不同的初始化啟動

情形2: 叢集正常啟動後,leader因故障掛掉了,選舉新Leader

第二種選舉leader的情況,叢集正常啟動後,leader因故障掛掉了,選舉新Leader

這部分的邏輯是怎樣的呢?

leader雖然掛了,但是角色為Follower的server依然會去執行QuorumPeer.javarun()方法中的無限while迴圈,當它執行follower.followLeader();方法時找不到leader,就會出異常,最終執行finally程式碼塊中的邏輯,可以看到它修改了自己的狀態為looking,進而重新選舉leader

   break;
        case FOLLOWING:
            // todo server 當選follow角色
            try {
                LOG.info("FOLLOWING");
                setFollower(makeFollower(logFactory));
                follower.followLeader();
            } catch (Exception e) {
                LOG.warn("Unexpected exception",e);
            } finally {
                follower.shutdown();
                setFollower(null);
                setPeerState(ServerState.LOOKING);
            }
            break;

情形3: 叢集中的Follower數量不足以通過半數檢驗,Leader會掛掉自己,然後選舉新leader

情形3: 假設叢集中2臺Follower,1臺leader,那麼當掛掉一臺Follower時,剩下1臺Follower無法滿足過半檢查機制因此會重新選舉leader

回到原始碼:leader每次都進入case LEADING:去執行leader.lead();

 case LEADING:
            // todo 伺服器成功當選成leader
            LOG.info("LEADING");
            try {
                setLeader(makeLeader(logFactory));
                // todo 跟進lead
                leader.lead();
                setLeader(null);
            } catch (Exception e) {
                LOG.warn("Unexpected exception",e);
            } finally {
                if (leader != null) {
                    leader.shutdown("Forcing shutdown");
                    setLeader(null);
                }
                setPeerState(ServerState.LOOKING);
            }
            break;

但是在leader.lead();中每次執行都會進行如下的判斷,很明顯,當不滿足半數檢驗時,leader直接掛掉自己,最終將叢集中所有節點的狀態改成LOOKING,重新選舉


              if (!tickSkip && !self.getQuorumVerifier().containsQuorum(syncedSet)) {
                //if (!tickSkip && syncedCount < self.quorumPeers.size() / 2) {
                    // Lost quorum, shutdown
                    shutdown("Not sufficient followers synced, only synced with sids: [ "
                            + getSidSetString(syncedSet) + " ]");
                    // make sure the order is the same!
                    // the leader goes to looking
                    return;
              } 

情景4: 叢集正常執行,新增加1個Follower

新增加的進來的Follower在啟動時它的狀態是looking, 同樣她也會去嘗試選舉leader,同樣會把第一票投給自己,但是對於一個穩定的叢集來說
叢集中的各個橘色已經確定下來了,在這種情況下,會進入FastLeaderElection.javalookForLeader()方法的如下分支,使當前新新增進來的節點
直接認Leader

case OBSERVING:
        // todo 禁止Observer參加投票
        LOG.debug("Notification from observer: " + n.sid);
        break;
    case FOLLOWING:
    case LEADING:
        /*
            * Consider all notifications from the same epoch
            * together.
            */
        if (n.electionEpoch == logicalclock.get()) {
            recvset.put(n.sid, new Vote(n.leader,
                    n.zxid,
                    n.electionEpoch,
                    n.peerEpoch));

            if (ooePredicate(recvset, outofelection, n)) {
                self.setPeerState((n.leader == self.getId()) ?
                        ServerState.LEADING : learningState());

                Vote endVote = new Vote(n.leader,
                        n.zxid,
                        n.electionEpoch,
                        n.peerEpoch);
                leaveInstance(endVote);
                return endVote;
            }
        }

如果有錯誤歡迎指出,如果對您有幫助,歡迎點支援