1. 程式人生 > >Zookeeper 原始碼(五)Leader 選舉

Zookeeper 原始碼(五)Leader 選舉

Zookeeper 原始碼(五)Leader 選舉

前面學習了 Zookeeper 服務端的相關細節,其中對於叢集啟動而言,很重要的一部分就是 Leader 選舉,接著就開始深入學習 Leader 選舉。

一、選舉規則

Leader 選舉是保證分散式資料一致性的關鍵所在。當 Zookeeper 叢集中的一臺伺服器出現以下兩種情況之一時,需要進入 Leader 選舉。

  • 伺服器初始化啟動
  • 伺服器執行期間無法和 Leader 保持連線

下面以伺服器啟動時期的 Leader 選舉為例進行分析講解。

在叢集初始化階段,當有一臺伺服器 Server1 啟動時,其單獨無法進行和完成 Leader 選舉,當第二臺伺服器 Server2 啟動時,此時兩臺機器可以相互通訊,每臺機器都試圖找到L eader,於是進入 Leader 選舉過程。選舉過程如下

(1) 每個Server發出一個投票。由於是初始情況,Server1 和 Server2 都會將自己作為 Leader 伺服器來進行投票,每次投票會包含所推舉的伺服器的 myid 和 ZXID,使用 (myid, ZXID) 來表示,此時 Server1 的投票為 (1, 0),Server2 的投票為 (2, 0),然後各自將這個投票發給叢集中其他機器。

(2) 接受來自各個伺服器的投票。叢集的每個伺服器收到投票後,首先判斷該投票的有效性,如檢查是否是本輪投票、是否來自 LOOKING 狀態的伺服器。

(3) 處理投票。針對每一個投票,伺服器都需要將別人的投票和自己的投票進行 PK,PK 規則如下

  • 優先檢查 ZXID。ZXID 比較大的伺服器優先作為 Leader。
  • 如果 ZXID 相同,那麼就比較 myid。myid 較大的伺服器作為 Leader 伺服器。

對於 Server1 而言,它的投票是 (1, 0),接收 Server2 的投票為 (2, 0),首先會比較兩者的 ZXID,均為 0,再比較 myid,此時 Server2 的 myid 最大,於是更新自己的投票為 (2, 0),然後重新投票,對於 Server2 而言,其無須更新自己的投票,只是再次向叢集中所有機器發出上一次投票資訊即可。

(4) 統計投票。每次投票後,伺服器都會統計投票資訊,判斷是否已經有過半機器接受到相同的投票資訊,對於 Server1、Server2 而言,都統計出叢集中已經有兩臺機器接受了 (2, 0) 的投票資訊,此時便認為已經選出了 Leader。

(5) 改變伺服器狀態。一旦確定了 Leader,每個伺服器就會更新自己的狀態,如果是 Follower,那麼就變更為 FOLLOWING,如果是 Leader,就變更為 LEADING。

Leader 選舉

由上面規則可知,通常那臺伺服器上的資料越新(ZXID 會越大),其成為 Leader 的可能性越大,也就越能夠保證資料的恢復。如果 ZXID 相同,則 SID 越大機會越大。

二、Leader 選舉實現

2.1 幾個基本概念

(1) 伺服器的狀態

伺服器具有四種狀態,分別是 LOOKING、FOLLOWING、LEADING、OBSERVING。

  • LOOKING 尋找 Leader 狀態。當伺服器處於該狀態時,它會認為當前叢集中沒有 Leader,因此需要進入 Leader 選舉狀態。
  • FOLLOWING 跟隨者狀態。表明當前伺服器角色是 Follower。
  • LEADING 領導者狀態。表明當前伺服器角色是 Leader。
  • OBSERVING 觀察者狀態。表明當前伺服器角色是 Observer。

(2) Vote 資料結構

屬性 說明
id 被推舉的 Leader 的 sid(myid)
zxid 被推舉的 Leader 的事務 id
electionEpoch 用來判斷多個投票是否在同一輪選舉週期中,每進行一輪 Leader 選舉自增 1
peerEpoch 被推舉的 Leader 的 epoch
state 伺服器的狀態,有 LOOKING、FOLLOWING、LEADING、OBSERVING

2.2 QuorumCnxManager(網路I/O)

每臺伺服器在啟動的過程中,會啟動一個 QuorumPeerManager,負責各臺伺服器之間的底層 Leader 選舉過程中的網路通訊。

(1) 初始化 QuorumCnxManager 【QuorumPeer】

protected Election createElectionAlgorithm(int electionAlgorithm){
    qcm = new QuorumCnxManager(this);
    QuorumCnxManager.Listener listener = qcm.listener;
    if(listener != null){
        listener.start();
        FastLeaderElection fle = new FastLeaderElection(this, qcm);
        fle.start();
        le = fle;
    } 
}

在 createElectionAlgorithm 會啟動 QuorumCnxManager,本小節重點關注 QuorumCnxManager 幹了那些事。

public QuorumCnxManager(QuorumPeer self) {
    this.recvQueue = new ArrayBlockingQueue<Message>(RECV_CAPACITY);
    this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();
    this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();
    this.lastMessageSent = new ConcurrentHashMap<Long, ByteBuffer>();
    
    String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
    if(cnxToValue != null){
        this.cnxTO = Integer.parseInt(cnxToValue);
    }
    
    this.self = self;

    // Starts listener thread that waits for connection requests 
    listener = new Listener();
    listener.setName("QuorumPeerListener");
}

(2) 訊息佇列

QuorumCnxManager 內部維護了一系列的佇列,用來儲存接收到的、待發送的訊息以及訊息的傳送器,除接收佇列以外,其他佇列都按照 SID 分組形成佇列集合。

  • recvQueue 訊息接收佇列,用於存放那些從其他伺服器接收到的訊息。
  • queueSendMap 訊息傳送佇列,用於儲存那些待發送的訊息,按照 SID 進行分組。
  • senderWorkerMap 傳送器集合,每個 SenderWorker 訊息傳送器,都對應一臺遠端 Zookeeper 伺服器,負責訊息的傳送,也按照 SID 進行分組。
  • lastMessageSent 最近傳送過的訊息,為每個 SID 保留最近傳送過的一個訊息。

(3) Listener

可以看到 Listener 初始化了一個 ServerSocket,預設埠為 3888 進行底層 Leader 選舉通訊。

public class Listener extends ZooKeeperThread {
    @Override
    public void run() {
        int numRetries = 0;
        InetSocketAddress addr;

        while((!shutdown) && (numRetries < 3)){
            try {
                // 1. 建立 ServerSocket
                ss = new ServerSocket();
                ss.setReuseAddress(true);
                if (self.getQuorumListenOnAllIPs()) {
                    int port = self.getElectionAddress().getPort();
                    addr = new InetSocketAddress(port);
                } else {
                    self.recreateSocketAddresses(self.getId());
                    addr = self.getElectionAddress();
                }
                LOG.info("My election bind port: " + addr.toString());
                setName(addr.toString());
                ss.bind(addr);
                while (!shutdown) {
                    Socket client = ss.accept();
                    setSockOpts(client);
                    LOG.info("Received connection request "
                            + client.getRemoteSocketAddress());
                    // 2. 處理請求 Socket
                    receiveConnection(client);
                    numRetries = 0;
                }
            } catch (IOException e) {
                if (shutdown) {
                    break;
                }
                LOG.error("Exception while listening", e);
                numRetries++;
                try {
                    ss.close();
                    Thread.sleep(1000);
                } catch (IOException ie) {
                    LOG.error("Error closing server socket", ie);
                } catch (InterruptedException ie) {
                    LOG.error("Interrupted while sleeping. " +
                        "Ignoring exception", ie);
                }
            }
        }
        LOG.info("Leaving listener");
        if (!shutdown) {
            LOG.error("As I'm leaving the listener thread, "
                    + "I won't be able to participate in leader "
                    + "election any longer: "
                    + self.getElectionAddress());
        } else if (ss != null) {
            // Clean up for shutdown.
            try {
                ss.close();
            } catch (IOException ie) {
                // Don't log an error for shutdown.
                LOG.debug("Error closing server socket", ie);
            }
        }
    }
}

為了避免兩臺機器之間重複地建立 TCP 連線,Zookeeper 只允許 SID 大的伺服器主動和其他機器建立連線,否則斷開連線。在接收到建立連線請求後,伺服器通過對比自己和遠端伺服器的 SID 值來判斷是否接收連線請求,如果當前伺服器發現自己的 SID 更大,那麼會斷開當前連線,然後自己主動和遠端伺服器建立連線。一旦連線建立,就會根據遠端伺服器的 SID 來建立相應的訊息傳送器 SendWorker 和訊息接收器 RecvWorker,並啟動。

每個 RecvWorker 只需要不斷地從這個 TCP 連線中讀取訊息,並將其儲存到 recvQueue 佇列中。每個 SendWorker 只需要不斷地從對應的訊息傳送佇列中獲取出一個訊息傳送即可,同時將這個訊息放入 lastMessageSent 中。

2.3 FastLeaderElection(選舉演算法核心)

  • 外部投票:特指其他伺服器發來的投票。
  • 內部投票:伺服器自身當前的投票。
  • 選舉輪次:Zookeeper 伺服器 Leader 選舉的輪次,即 logicalclock。
  • PK:對內部投票和外部投票進行對比來確定是否需要變更內部投票。

FastLeaderElection選舉流程

(1) 初始化 FastLeaderElection

public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){
    this.stop = false;
    this.manager = manager;
    starter(self, manager);
}

private void starter(QuorumPeer self, QuorumCnxManager manager) {
    this.self = self;
    proposedLeader = -1;
    proposedZxid = -1;

    // 初始化 sendqueue 和 recvqueue
    sendqueue = new LinkedBlockingQueue<ToSend>();
    recvqueue = new LinkedBlockingQueue<Notification>();
    this.messenger = new Messenger(manager);
}

Messenger(QuorumCnxManager manager) {
    // 1. WorkerSender 選票接收器,負責從 QuorumCnxManager 接收選票後儲存到 recvqueue 中
    this.ws = new WorkerSender(manager);
    this.wsThread = new Thread(this.ws,
            "WorkerSender[myid=" + self.getId() + "]");
    this.wsThread.setDaemon(true);

    // 2. WorkerReceiver 選票傳送器,負責從 sendqueue 中獲取待發送的選票並傳遞給 QuorumCnxManager
    this.wr = new WorkerReceiver(manager);
    this.wrThread = new Thread(this.wr,
            "WorkerReceiver[myid=" + self.getId() + "]");
    this.wrThread.setDaemon(true);
}

在 FastLeaderElection 中有幾個屬性需要我們重點關注一下:

  • sendqueue 選票傳送佇列,用於儲存待發送的選票。

  • recvqueue 選票接收佇列,用於儲存接收到的外部投票。

  • WorkerReceiver 選票接收器。其會不斷地從 QuorumCnxManager 中獲取其他伺服器發來的選舉訊息,並將其轉換成一個選票,然後儲存到 recvqueue 中,在選票接收過程中,如果發現該外部選票的選舉輪次小於當前伺服器的,那麼忽略該外部投票,同時立即傳送自己的內部投票。

  • WorkerSender 選票傳送器,不斷地從 sendqueue 中獲取待發送的選票,並將其傳遞到底層 QuorumCnxManager 中。

(2) lookForLeader(核心演算法)

public Vote lookForLeader() throws InterruptedException {
    // 省略...
    if (self.start_fle == 0) {
       self.start_fle = Time.currentElapsedTime();
    }
    try {
        HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
        HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
        int notTimeout = finalizeWait;

        // 1. 啟動時先投自己一票並廣播給其它伺服器
        synchronized(this){
            logicalclock.incrementAndGet();
            updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
        }
        sendNotifications();

        while ((self.getPeerState() == ServerState.LOOKING) &&
                (!stop)){
         
            // 2. 獲取其它伺服器傳送過來的選票
            Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);

            
            // 3. 如果沒有選票,則先判斷是否已建立連線,如建立投自己一票,如沒有建立連線則立即建立連線
            if(n == null){
                if(manager.haveDelivered()){
                    sendNotifications();
                } else {
                    manager.connectAll();
                }

                int tmpTimeOut = notTimeout*2;
                notTimeout = (tmpTimeOut < maxNotificationInterval?
                        tmpTimeOut : maxNotificationInterval);
                LOG.info("Notification time out: " + notTimeout);
            } 
            else if (self.getCurrentAndNextConfigVoters().contains(n.sid)) {
                switch (n.state) {
                case LOOKING:
                    // If notification > current, replace and send messages out
                    if (n.electionEpoch > logicalclock.get()) {
                        logicalclock.set(n.electionEpoch);
                        recvset.clear();
                        if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                        } else {
                            updateProposal(getInitId(),
                                    getInitLastLoggedZxid(),
                                    getPeerEpoch());
                        }
                        sendNotifications();
                    } else if (n.electionEpoch < logicalclock.get()) {
                        if(LOG.isDebugEnabled()){
                            LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
                                    + Long.toHexString(n.electionEpoch)
                                    + ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
                        }
                        break;
                    } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                            proposedLeader, proposedZxid, proposedEpoch)) {
                        updateProposal(n.leader, n.zxid, n.peerEpoch);
                        sendNotifications();
                    }

                    recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

                    if (termPredicate(recvset,
                            new Vote(proposedLeader, proposedZxid,
                                    logicalclock.get(), proposedEpoch))) {

                        // Verify if there is any change in the proposed leader
                        while((n = recvqueue.poll(finalizeWait,
                                TimeUnit.MILLISECONDS)) != null){
                            if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                    proposedLeader, proposedZxid, proposedEpoch)){
                                recvqueue.put(n);
                                break;
                            }
                        }

                        if (n == null) {
                            self.setPeerState((proposedLeader == self.getId()) ?
                                    ServerState.LEADING: learningState());

                            Vote endVote = new Vote(proposedLeader,
                                    proposedZxid, proposedEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                    }
                    break;
                case OBSERVING:
                    LOG.debug("Notification from observer: " + n.sid);
                    break;
                case FOLLOWING:
                case LEADING:
                    if(n.electionEpoch == logicalclock.get()){
                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                        if(termPredicate(recvset, new Vote(n.leader,
                                        n.zxid, n.electionEpoch, n.peerEpoch, n.state))
                                        && checkLeader(outofelection, n.leader, n.electionEpoch)) {
                            self.setPeerState((n.leader == self.getId()) ?
                                    ServerState.LEADING: learningState());

                            Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                    }

                    outofelection.put(n.sid, new Vote(n.leader, 
                            IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state));
                    if (termPredicate(outofelection, new Vote(n.leader,
                            IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state))
                            && checkLeader(outofelection, n.leader, IGNOREVALUE)) {
                        synchronized(this){
                            logicalclock.set(n.electionEpoch);
                            self.setPeerState((n.leader == self.getId()) ?
                                    ServerState.LEADING: learningState());
                        }
                        Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
                        leaveInstance(endVote);
                        return endVote;
                    }
                    break;
                default:
                    LOG.warn("Notification state unrecoginized: " + n.state
                          + " (n.state), " + n.sid + " (n.sid)");
                    break;
                }
            } else {
                LOG.warn("Ignoring notification from non-cluster member " + n.sid);
            }
        }
        return null;
    } finally {
        try {
            if(self.jmxLeaderElectionBean != null){
                MBeanRegistry.getInstance().unregister(
                        self.jmxLeaderElectionBean);
            }
        } catch (Exception e) {
            LOG.warn("Failed to unregister with JMX", e);
        }
        self.jmxLeaderElectionBean = null;
    }
}

參考:

  1. 《Zookeeper的Leader選舉》:https://www.cnblogs.com/leesf456/p/6107600.html
  2. 從 Paxos 到 Zookeeper : 分散式一致性原理與實踐

每天用心記錄一點點。內容也許不重要,但習慣很重要!