1. 程式人生 > >Zookeeper原始碼分析(三)-Leader的選舉

Zookeeper原始碼分析(三)-Leader的選舉

Zookeeper在原始碼中選舉可分為兩步
1.startLeaderElection();//建立選舉演算法

    synchronized public void startLeaderElection() {
        try {
             /**
             *建立一個投票自己的投票物件(為後面第一次給自己投票做準備)
             */

            currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
        } catch(IOException e) {
            //...
} //遍歷配置中獲取的伺服器,根據myid獲取自身的選舉地址 for (QuorumServer p : getView().values()) { if (p.id == myid) { myQuorumAddr = p.addr; break; } } //... //根據配置的選舉型別(通過zoo.cfg檔案中electionAlg進行配置),預設為3,獲取選舉演算法 this
.electionAlg = createElectionAlgorithm(electionType); }

接下來看createElectionAlgorithm方法

    protected Election createElectionAlgorithm(int electionAlgorithm){
        Election le=null;

        //TODO: use a factory rather than a switch
        switch (electionAlgorithm) {
              //...
        case
3: //預設為3,所以對此進行分析 //建立一個基於tcp連線進行選主的管理器,裡面有佇列進行訊息儲存,有SendWorker和RecvWorker進行訊息的傳送和接收 qcm = new QuorumCnxManager(this); QuorumCnxManager.Listener listener = qcm.listener; //開啟配置過的選舉埠進行監聽 if(listener != null){ listener.start(); //建立選舉演算法 le = new FastLeaderElection(this, qcm); } else { LOG.error("Null listener when initializing cnx manager"); } break; default: assert false; } return le; }

看一下QuorumCnxManager類

    public QuorumCnxManager(QuorumPeer self) {
        this.recvQueue = new ArrayBlockingQueue<Message>(RECV_CAPACITY);
        this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();
        this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();
        this.lastMessageSent = new ConcurrentHashMap<Long, ByteBuffer>();

        String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
        if(cnxToValue != null){
            this.cnxTO = new Integer(cnxToValue); 
        }

        this.self = self;

        // 設定選舉埠的監聽(等待客戶端的連線),通過SendWorker和RecvWorker進行客戶端和伺服器的資訊交換
        listener = new Listener();
    }

然後看看Listener的具體實現
public class Listener extends Thread {
  volatile ServerSocket ss = null;
        @Override
        public void run() {
            int numRetries = 0;
            InetSocketAddress addr;
            while((!shutdown) && (numRetries < 3)){
                try {
                // 建立服務端的連線
                    ss = new ServerSocket();
                    ss.setReuseAddress(true);
                    if (self.getQuorumListenOnAllIPs()) {
                        int port = self.quorumPeers.get(self.getId()).electionAddr.getPort();
                        addr = new InetSocketAddress(port);
                    } else {
                        addr = self.quorumPeers.get(self.getId()).electionAddr;
                    }
                    LOG.info("My election bind port: " + addr.toString());
                    setName(self.quorumPeers.get(self.getId()).electionAddr
                            .toString());
                            //繫結選舉地址
                    ss.bind(addr);
                    while (!shutdown) {
                    //等待客戶端的接入
                        Socket client = ss.accept();
                        setSockOpts(client);
                        LOG.info("Received connection request "
                                + client.getRemoteSocketAddress());
                                //客戶端接入後的資訊交換通過SenderWorker和RecvWorker
                        receiveConnection(client);
                        numRetries = 0;
                    }
                } catch (IOException e) {
                  }
                  //  ....

       然後看一下receiveConnection
           public boolean receiveConnection(Socket sock) {
        Long sid = null;

        try {
            // 讀取收到客戶端的sid
            DataInputStream din = new DataInputStream(sock.getInputStream());
            sid = din.readLong();
            if (sid < 0) {
                sid = din.readLong();
                //...
                }
        } catch (IOException e) {
        }


        if (sid < self.getId()) {//sid如果比本機伺服器的sid小,則刪除senderWorkerMap裡的SendWorker記錄 
            SendWorker sw = senderWorkerMap.get(sid);
            if (sw != null) {
                sw.finish();
            }

            LOG.debug("Create new connection to server: " + sid);
            //關掉與客戶端的連線
            closeSocket(sock);
            // 建立與sid的伺服器的連線
            connectOne(sid);
        } else { //否則則在senderWorkerMap裡更新SendWorker ,開啟SendWorker 和RecvWorker 進行資料的傳送和接收
            SendWorker sw = new SendWorker(sock, sid);
            RecvWorker rw = new RecvWorker(sock, sid, sw);
            sw.setRecv(rw);

            SendWorker vsw = senderWorkerMap.get(sid);

            if(vsw != null)
                vsw.finish();
            senderWorkerMap.put(sid, sw);   
            if (!queueSendMap.containsKey(sid)) {
                queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
                        SEND_CAPACITY));
            }

            sw.start();
            rw.start();

            return true;    
        }
        return false;
    }

然後看一下 connectOne
    synchronized void connectOne(long sid){
        if (senderWorkerMap.get(sid) == null){
            InetSocketAddress electionAddr;
            if (self.quorumPeers.containsKey(sid)) {
                electionAddr = self.quorumPeers.get(sid).electionAddr;
            } else {
                LOG.warn("Invalid server id: " + sid);
                return;
            }
            try {

                if (LOG.isDebugEnabled()) {
                    LOG.debug("Opening channel to server " + sid);
                }
                Socket sock = new Socket();
                setSockOpts(sock);
    //新建一個socket連線該sid的選舉地址。
                sock.connect(self.getView().get(sid).electionAddr, cnxTO);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Connected to server " + sid);
                }
                initiateConnection(sock, sid);
            } 
            //...
    }
然後看一下 initiateConnection
    public boolean initiateConnection(Socket sock, Long sid) {
        DataOutputStream dout = null;
        try {
            // 建立一個socket封裝本伺服器的sid資訊,傳送給傳入sid的伺服器,然偶關掉本socket
            dout = new DataOutputStream(sock.getOutputStream());
            dout.writeLong(self.getId());
            dout.flush();
        } catch (IOException e) {
            LOG.warn("Ignoring exception reading or writing challenge: ", e);
            closeSocket(sock);
            return false;
        }

        // If lost the challenge, then drop the new connection
        if (sid > self.getId()) {
            LOG.info("Have smaller server identifier, so dropping the " +
                     "connection: (" + sid + ", " + self.getId() + ")");
            closeSocket(sock);
            // Otherwise proceed with the connection
        } else {
        //更新新的SendWorker 和RecvWorker 
            SendWorker sw = new SendWorker(sock, sid);
            RecvWorker rw = new RecvWorker(sock, sid, sw);
            sw.setRecv(rw);

            SendWorker vsw = senderWorkerMap.get(sid);

            if(vsw != null)
                vsw.finish();

            senderWorkerMap.put(sid, sw);
            if (!queueSendMap.containsKey(sid)) {
                queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
                        SEND_CAPACITY));
            }

            sw.start();
            rw.start();

            return true;    

        }
        return false;
    }

FastLeaderElection方法中有個starter(self, manager)

    private void starter(QuorumPeer self, QuorumCnxManager manager) {
        this.self = self;
        proposedLeader = -1;
        proposedZxid = -1;
        //傳送佇列,維護了ToSend物件,即要傳送的東西,包括通知(Notification)和許可權(ACK)
        sendqueue = new LinkedBlockingQueue<ToSend>();
        //接收佇列,維護了通知(Notification)物件,裡面包含了伺服器狀態(LOOKING  or  FOLLOWER)
        recvqueue = new LinkedBlockingQueue<Notification>();
        //開啟WorkerSender和WorkerReceiver執行緒
        this.messenger = new Messenger(manager);
    }

然後看一下WorkerReceiver的執行緒

            public void run() {

                Message response;
                while (!stop) {
                    try{
                    /*
                    *從quorumCnxManager的recvQueue中取出一條message
                    */
                        response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
                        if(response == null) continue;
                    /*
                    *如果是Observer,則返回當前選舉結果,封裝成ToSend物件放入sendqueue中
                    */
   if(!self.getVotingView().containsKey(response.sid)){
                            Vote current = self.getCurrentVote();
                            ToSend notmsg = new ToSend(ToSend.mType.notification,
                                    current.getId(),
                                    current.getZxid(),
                                    logicalclock,
                                    self.getPeerState(),
                                    response.sid,
                                    current.getPeerEpoch());

                            sendqueue.offer(notmsg);
                        } else {
/*
*如果不是則封裝成Notification物件
*/
                            Notification n = new Notification();

                            // State of peer that sent this message
                            QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
                            switch (response.buffer.getInt()) {
                            case 0:
                                ackstate = QuorumPeer.ServerState.LOOKING;
                                break;
                            case 1:
                                ackstate = QuorumPeer.ServerState.FOLLOWING;
                                break;
                            case 2:
                                ackstate = QuorumPeer.ServerState.LEADING;
                                break;
                            case 3:
                                ackstate = QuorumPeer.ServerState.OBSERVING;
                                break;
                            default:
                                continue;
                            }

                            n.leader = response.buffer.getLong();
                            n.zxid = response.buffer.getLong();
                            n.electionEpoch = response.buffer.getLong();
                            n.state = ackstate;
                            n.sid = response.sid;
                            if(!backCompatibility){
                                n.peerEpoch = response.buffer.getLong();
                            } else {
                                if(LOG.isInfoEnabled()){
                                    LOG.info("Backward compatibility mode, server id=" + n.sid);
                                }
                                n.peerEpoch = ZxidUtils.getEpochFromZxid(n.zxid);
                            }

                            /*
                             * Version added in 3.4.6
                             */

                            n.version = (response.buffer.remaining() >= 4) ? 
                                         response.buffer.getInt() : 0x0;

                            /*
                             * Print notification info
                             */
                            if(LOG.isInfoEnabled()){
                                printNotification(n);
                            }

                            /*
                             * 如果quorumpeer自己也是looking狀態,這將Notification物件放入recvqueue佇列中
                             */

                            if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
                                recvqueue.offer(n);

                                /*
                                 * Send a notification back if the peer that sent this
                                 * message is also looking and its logical clock is
                                 * lagging behind.
                                 */
                                if((ackstate == QuorumPeer.ServerState.LOOKING)
                                        && (n.electionEpoch < logicalclock)){
                                    Vote v = getVote();
                                    ToSend notmsg = new ToSend(ToSend.mType.notification,
                                            v.getId(),
                                            v.getZxid(),
                                            logicalclock,
                                            self.getPeerState(),
                                            response.sid,
                                            v.getPeerEpoch());
                                    sendqueue.offer(notmsg);
                                }
                            } else {
                                /*
                                 * 如果自己不是looking,而該message即對方server是looking,將當前選舉結果封裝成ToSend,放入傳送佇列sendqueue
                                 */
                                Vote current = self.getCurrentVote();
                                if(ackstate == QuorumPeer.ServerState.LOOKING){
                                    if(LOG.isDebugEnabled()){
                                        LOG.debug("Sending new notification. My id =  " +
                                                self.getId() + " recipient=" +
                                                response.sid + " zxid=0x" +
                                                Long.toHexString(current.getZxid()) +
                                                " leader=" + current.getId());
                                    }

                                    ToSend notmsg;
                                    if(n.version > 0x0) {
                                        notmsg = new ToSend(
                                                ToSend.mType.notification,
                                                current.getId(),
                                                current.getZxid(),
                                                current.getElectionEpoch(),
                                                self.getPeerState(),
                                                response.sid,
                                                current.getPeerEpoch());

                                    } else {
                                        Vote bcVote = self.getBCVote();
                                        notmsg = new ToSend(
                                                ToSend.mType.notification,
                                                bcVote.getId(),
                                                bcVote.getZxid(),
                                                bcVote.getElectionEpoch(),
                                                self.getPeerState(),
                                                response.sid,
                                                bcVote.getPeerEpoch());
                                    }
                                    sendqueue.offer(notmsg);
                                }
                            }
                        }
                    } catch (InterruptedException e) {
                        System.out.println("Interrupted Exception while waiting for new message" +
                                e.toString());
                    }
                }
                LOG.info("WorkerReceiver is down");
            }

接下來看WorkerSender的執行緒

            public void run() {
                while (!stop) {
                    try {
                    //從sendqueue裡獲取一個ToSend的物件
                        ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
                        if(m == null) continue;

                        process(m);
                    } catch (InterruptedException e) {
                        break;
                    }
                }
                LOG.info("WorkerSender is down");
            }

-------------------------------------------------------
看process(m)
            void process(ToSend m) {
            //將ToSend物件轉換為ByteBuffer物件 
                ByteBuffer requestBuffer = buildMsg(m.state.ordinal(), 
                                                        m.leader,
                                                        m.zxid, 
                                                        m.electionEpoch, 
                                                        m.peerEpoch);
                //通過QuorumCnxManager物件傳送
                manager.toSend(m.sid, requestBuffer);
            }
-------------------------------------------------------
看 toSend(m.sid, requestBuffer) 方法
    public void toSend(Long sid, ByteBuffer b) {
//如果是自己的話直接加入recvqueue佇列中
        if (self.getId() == sid) {
             b.position(0);
             addToRecvQueue(new Message(b.duplicate(), sid));
        } else {//如果不是自己的話
            // 存入到queueSendMap中
             if (!queueSendMap.containsKey(sid)) {
                 ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(
                         SEND_CAPACITY);
                 queueSendMap.put(sid, bq);
                 addToSendQueue(bq, b);

             } else {
                 ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
                 if(bq != null){
                     addToSendQueue(bq, b);
                 } else {
                     LOG.error("No queue for server " + sid);
                 }
             }
             // 連線該sid的伺服器選舉埠,初始化連線
             connectOne(sid);

        }
    }

2.super.start();//啟動quorumPeer執行緒,進行投票選舉

    public void run() {
        /*
        *省略不介紹
        */
        try {
            /*
             * Main loop
             */
            while (running) {
                switch (getPeerState()) {
                case LOOKING://如果狀態是LOOKING,則進入這裡
                    LOG.info("LOOKING");
                        //...
                        try {
                           // roZkMgr.start();
                            //setBCVote(null);
 //選舉演算法這裡才開始選舉,之前只是初始化,並沒有開始  
                                      setCurrentVote(makeLEStrategy().lookForLeader());
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception",e);
                            setPeerState(ServerState.LOOKING);
                        } finally {
                          //...
                        }
                    } else {
                        try {
                            setBCVote(null);
                            setCurrentVote(makeLEStrategy().lookForLeader());
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception", e);
                            setPeerState(ServerState.LOOKING);
                        }
                    }
                    break;
                case OBSERVING:
                    try {
                        LOG.info("OBSERVING");
                        setObserver(makeObserver(logFactory));
                        observer.observeLeader();
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception",e );                        
                    } finally {
                        observer.shutdown();
                        setObserver(null);
                        setPeerState(ServerState.LOOKING);
                    }
                    break;
                case FOLLOWING:
                    try {
                        LOG.info("FOLLOWING");
                        setFollower(makeFollower(logFactory));
                        follower.followLeader();
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception",e);
                    } finally {
                        follower.shutdown();
                        setFollower(null);
                        setPeerState(ServerState.LOOKING);
                    }
                    break;
                case LEADING:
                    LOG.info("LEADING");
                    try {
                        setLeader(makeLeader(logFactory));
                        leader.lead();
                        setLeader(null);
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception",e);
                    } finally {
                        if (leader != null) {
                            leader.shutdown("Forcing shutdown");
                            setLeader(null);
                        }
                        setPeerState(ServerState.LOOKING);
                    }
                    break;
                }
            }
        } finally {
            //...
        }
    }
public Vote lookForLeader() throws InterruptedException {
        //...
        try {
            /*
            * 收到的投票
            */
            HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
            HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();

            int notTimeout = finalizeWait;

            synchronized(this){
               /* 跟新邏輯時鐘*/
                logicalclock++;
                /*
               //更新投票資訊
               //getInitId() 即是獲取選誰,id就是myid裡指定的那個數字,所以說是給自己投票
               */
                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
            }

            LOG.info("New election. My id =  " + self.getId() +
                    ", proposed zxid=0x" + Long.toHexString(proposedZxid));
                    /*只是往傳送佇列(sendqueue)裡插入一條投票資訊,而WorkerSender負責從傳送佇列裡取一條投票,讓QuorumCnxManager物件進行傳送*/
            sendNotifications();
            /*迴圈交換通知,直到選出leader*/
            while ((self.getPeerState() == ServerState.LOOKING) &&
                    (!stop)){

                /*從recvqueue獲取一條收到的通知*/
                Notification n = recvqueue.poll(notTimeout,
                        TimeUnit.MILLISECONDS);

                /*如果沒有通知就去判斷下所有傳送佇列是否還有訊息*/
                if(n == null){
                    if(manager.haveDelivered()){
                        /*訊息發完了,繼續傳送,一直到選出leader為止 */ 
                        sendNotifications();
                    } else {
                        /*訊息還在,可能其他server還沒啟動,嘗試連線其他伺服器*/
                        manager.connectAll();
                    }

                    /*
                     * Exponential backoff
                     */
                    int tmpTimeOut = notTimeout*2;
                    notTimeout = (tmpTimeOut < maxNotificationInterval?
                            tmpTimeOut : maxNotificationInterval);
                    LOG.info("Notification time out: " + notTimeout);
                }
                /*如果收到通知*/
                else if(self.getVotingView().containsKey(n.sid)) {
                    switch (n.state) {
                    case LOOKING:
                        // If notification > current, replace and send messages out
                        if (n.electionEpoch > logicalclock) {
                            logicalclock = n.electionEpoch;
                            recvset.clear();
                            if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                    getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                                    //檢查下收到的這張選票通知是否可以勝出,勝出則把自己的更新為對方的
                                updateProposal(n.leader, n.zxid, n.peerEpoch);
                            } else {
                            //不勝出則更新自身資訊
                                updateProposal(getInitId(),
                                        getInitLastLoggedZxid(),
                                        getPeerEpoch());
                            }
                            //廣播出去,即插入sendqueue佇列中
                            sendNotifications();
                        } else if (n.electionEpoch < logicalclock) {
                            if(LOG.isDebugEnabled()){
                                LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
                                        + Long.toHexString(n.electionEpoch)
                                        + ", logicalclock=0x" + Long.toHexString(logicalclock));
                            }
                            break;
                        } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                proposedLeader, proposedZxid, proposedEpoch)) {
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                            sendNotifications();
                        }

                        if(LOG.isDebugEnabled()){
                            LOG.debug("Adding vote: from=" + n.sid +
                                    ", proposed leader=" + n.leader +
                                    ", proposed zxid=0x" + Long.toHexString(n.zxid) +
                                    ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
                        }
//新增到本機投票集合,用來做選舉終結判斷 
                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
// 驗證是否該投票勝出,預設是QuorumMaj演算法(超過一半勝出)
                        if (termPredicate(recvset,
                                new Vote(proposedLeader, proposedZxid,
                                        logicalclock, proposedEpoch))) {

                            while((n = recvqueue.poll(finalizeWait,
                                    TimeUnit.MILLISECONDS)) != null){
                                if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                        proposedLeader, proposedZxid, proposedEpoch)){
                                    recvqueue.put(n);
                                    break;
                                }
                            }

//修改狀態,LEADING or FOLLOWING  
                            if (n == null) {
                                self.setPeerState((proposedLeader == self.getId()) ?
                                        ServerState.LEADING: learningState());
                                //返回最終選票結果
                                Vote endVote = new Vote(proposedLeader,
                                                        proposedZxid,
                                                        logicalclock,
                                                        proposedEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }
                        break;
                    case OBSERVING:
                        LOG.debug("Notification from observer: " + n.sid);
                        break;
                    case FOLLOWING:
                    case LEADING:
                        /*
                         * Consider all notifications from the same epoch
                         * together.
                         */
                        if(n.electionEpoch == logicalclock){
                            recvset.put(n.sid, new Vote(n.leader,
                                                          n.zxid,
                                                          n.electionEpoch,
                                                          n.peerEpoch));

                            if(ooePredicate(recvset, outofelection, n)) {
                                self.setPeerState((n.leader == self.getId()) ?
                                        ServerState.LEADING: learningState());

                                Vote endVote = new Vote(n.leader, 
                                        n.zxid, 
                                        n.electionEpoch, 
                                        n.peerEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }

                        /*
                         * Before joining an established ensemble, verify
                         * a majority is following the same leader.
                         */
                        outofelection.put(n.sid, new Vote(n.version,
                                                            n.leader,
                                                            n.zxid,
                                                            n.electionEpoch,
                                                            n.peerEpoch,
                                                            n.state));

                        if(ooePredicate(outofelection, outofelection, n)) {
                            synchronized(this){
                                logicalclock = n.electionEpoch;
                                self.setPeerState((n.leader == self.getId()) ?
                                        ServerState.LEADING: learningState());
                            }
                            Vote endVote = new Vote(n.leader,
                                                    n.zxid,
                                                    n.electionEpoch,
                                                    n.peerEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                        break;
                    default:
                        LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)",
                                n.state, n.sid);
                        break;
                    }
                } else {
                    LOG.warn("Ignoring notification from non-cluster member " + n.sid);
                }
            }
            return null;
        } finally {
            // ...
        }
    }