1. 程式人生 > 其它 >Zookeeper學習--zab協議和啟動選舉

Zookeeper學習--zab協議和啟動選舉

## Zookeeper學習--zab協議和啟動選舉 本章記錄zookeeper學習過程中,關於zab協議的原理,server端啟動後的自動選舉等。部分內容參考自zookeeper官網和咕泡學院教材。本次分析zookeeper原始碼版本:3.6.0。 @[toc] ### 基本介紹 zab是幹嘛的?不知道,那麼看了一下官網的介紹,意思大概是zookeeper的原子廣播協議。但是,從網路上對zab的描述,其分為多種模式,廣播是其中一種,還有經常聽說的崩潰恢復。zookeeper基於zab協議實現了叢集主備模式的系統架構下的資料一致性以及選舉機制。 > Zab is the ZooKeeper Atomic Broadcast protocol. We use it to propagate state changes produced by the ZooKeeper leader. #### 原子廣播 原子廣播用於解決叢集模式下的資料一致性問題,根據官網的描述,zk屬於`順序一致性`。`zab`提供的原子廣播機制是一種`2PC`協議,這種協議表示,只需要叢集中過半的節點確認既可提交。 ##### 順序一致性 因為zk使用過半提交的策略,因此意味著其是最終一致性。在zk中,順序一致性是更強的一致性保證,對於單個物件,如果被更新後能夠立即被後續的讀請求讀到。參考:[Consistency Guarantees]() ##### 二階段提交 [Two-phased Commit]() > A two-phase commit protocol is an algorithm that lets all clients in a distributed system agree either to commit a transaction or abort. > > In ZooKeeper, you can implement a two-phased commit by having a coordinator create a transaction node, say "/app/Tx", and one child node per participating site, say "/app/Tx/s_i". When coordinator creates the child node, it leaves the content undefined. Once each site involved in the transaction receives the transaction from the coordinator, the site reads each child node and sets a watch. Each site then processes the query and votes "commit" or "abort" by writing to its respective node. Once the write completes, the other sites are notified, and as soon as all sites have all votes, they can decide either "abort" or "commit". Note that a node can decide "abort" earlier if some site votes for "abort". > > An interesting aspect of this implementation is that the only role of the coordinator is to decide upon the group of sites, to create the ZooKeeper nodes, and to propagate the transaction to the corresponding sites. In fact, even propagating the transaction can be done through ZooKeeper by writing it in the transaction node. > > There are two important drawbacks of the approach described above. One is the message complexity, which is O(n²). The second is the impossibility of detecting failures of sites through ephemeral nodes. To detect the failure of a site using ephemeral nodes, it is necessary that the site create the node. > > To solve the first problem, you can have only the coordinator notified of changes to the transaction nodes, and then notify the sites once coordinator reaches a decision. Note that this approach is scalable, but it's is slower too, as it requires all communication to go through the coordinator. > > To address the second problem, you can have the coordinator propagate the transaction to the sites, and have each site creating its own ephemeral node. > > 直譯: > > 兩階段提交協議是一種演算法,它允許分散式系統中的所有客戶端同意提交事務或中止事務。 > > 在ZooKeeper中,可以通過讓協調器建立一個事務節點,比如“/app/Tx”,以及每個參與站點建立一個子節點,比如“/app/Tx/s_i”來實現兩階段提交。當協調器建立子節點時,將保留未定義的內容。一旦涉及事務的每個站點從協調器接收到事務,該站點將讀取每個子節點並設定監視。每個站點然後處理查詢並通過寫入到各自的節點來投票“提交”或“中止”。一旦寫操作完成,其他站點就會收到通知,並且一旦所有站點都有了所有的投票,它們就可以決定是“中止”還是“提交”。注意,如果一些站點投票支援“abort”,則節點可以更早地決定“abort”。 > > 這個實現的一個有趣的方面是,協調器的唯一角色是決定站點組,建立ZooKeeper節點,並將事務傳播到相應的站點。事實上,甚至傳播事務也可以通過在事務節點中寫入ZooKeeper來完成。 > > 上述方法有兩個重要的缺點。一個是訊息複雜度,也就是O(n²)。其次是無法通過臨時節點檢測到站點的故障。要使用臨時節點檢測站點的故障,站點必須建立節點。 > > 為了解決第一個問題,您只能將事務節點的更改通知協調器,然後在協調器做出決定後通知站點。請注意,這種方法是可伸縮的,但是速度也比較慢,因為它要求所有通訊都經過協調器。 > > 為了解決第二個問題,可以讓協調器將事務傳播到站點,並讓每個站點建立自己的臨時節點。 #### 崩潰恢復 當zkServer叢集執行中,其中`Leader`節點因網路或服務崩潰等原因導致通訊中斷,`zab`會進入崩潰恢復模式。崩潰恢復模式下,`follower`節點會重新進入`looking`狀態並進行`Leader`選舉。當`Leader`節點選舉出來後開始進入資料同步階段,同步完成後,`zab`退出崩潰恢復模式。 崩潰恢復保證,已經處理過的訊息不會被丟失,而未被處理的訊息不會再次出現。 ### 原始碼分析 原始碼的分析前,需要先了解以下幾個基本概念。另外在zk的原始碼中,使用了大量的生產者消費者模式,不瞭解生產者消費者模式的建議先補充相關知識,不然無法理解後面程式碼的執行。 > 思考:建議閱讀原始碼的前提是帶著疑問去閱讀,比如說,我們知道,在zookeeper中,叢集啟動後會自動選舉出leader,那麼leader是怎麼決策出來的,而不是關注的點可以暫時不去關注,從頭到尾每個方法都看,是看不懂的。 #### proposed 每一個事務請求處理會以proposed的形式傳送到全部伺服器。 > *Proposal* : a unit of agreement. Proposals are agreed upon by exchanging packets with a quorum of ZooKeeper servers. Most proposals contain messages, however the NEW_LEADER proposal is an example of a proposal that does not correspond to a message. #### zxid zxid是屬於zk中儲存的事物id,zxid包含兩個部分,一個是**epoch**,一個是**counter**(計數器)。zxid是一個64位的數字,其中高32位表示epoch,低32位表示技術器。epoch可以理解為一個朝代的`國號`,每次發生Leader的變化後,epoch會遞增。 > The zxid has two parts: the epoch and a counter. In our implementation the zxid is a 64-bit number. We use the high order 32-bits for the epoch and the low order 32-bits for the counter. Because it has two parts represent the zxid both as a number and as a pair of integers, (_epoch, count_). The epoch number represents a change in leadership. Each time a new leader comes into power it will have its own epoch number. We have a simple algorithm to assign a unique zxid to a proposal: the leader simply increments the zxid to obtain a unique zxid for each proposal. *Leadership activation will ensure that only one leader uses a given epoch, so our simple algorithm guarantees that every proposal will have a unique id.* #### 開始分析 開啟zkServer的啟動檔案,可以看到,實際是執行org.apache.zookeeper.server.quorum.QuorumPeerMain ```java /** * To start the replicated server specify the configuration file name on * the command line. * @param args path to the configfile */ public static void main(String[] args) { QuorumPeerMain main = new QuorumPeerMain(); try { main.initializeAndRun(args); } catch (IllegalArgumentException e) { //...忽略部分內容 } LOG.info("Exiting normally"); System.exit(ExitCode.EXECUTION_FINISHED.getValue()); } protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException { QuorumPeerConfig config = new QuorumPeerConfig(); if (args.length == 1) { //解析zk配置檔案配置項 config.parse(args[0]); } // ...忽略部分內容 //這裡開始是否叢集模式來選擇不同的啟動流程 if (args.length == 1 && config.isDistributed()) { runFromConfig(config); } else { LOG.warn("Either no config or no quorum defined in config, running " + " in standalone mode"); // there is only server in the quorum -- run as standalone ZooKeeperServerMain.main(args); } } ``` 這裡可以看到zkServer不同的模式下執行啟動方法不同,這裡重點分析叢集的模式。 ```java public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException { //...省略的上面一大串程式碼是一些JMX的註冊、構建cnxn連線工廠、安全等以及一些其他配置的初始化。 //重點在於start方法 quorumPeer.start(); quorumPeer.join(); } catch (InterruptedException e) { // warn, but generally this is ok LOG.warn("Quorum Peer interrupted", e); } finally { if (metricsProvider != null) { try { metricsProvider.stop(); } catch (Throwable error) { LOG.warn("Error while stopping metrics", error); } } } } ``` runFromConfig方法,雖然很大一串,但是實際需要關注的點在於quorumPeer的start()。quorumPeer的start方法是Thread的覆蓋,關於啟動時資料的載入和選舉以及後面的崩潰恢復等都在這裡開始。 ```java public synchronized void start() { //getView方法返回本次參與叢集的成員資訊,這裡是校驗myid是否包含在成員資訊中 if (!getView().containsKey(myid)) { throw new RuntimeException("My id " + myid + " not in the peer list"); } //資料儲存初始化以及載入zxid等資訊 loadDataBase(); //通訊2181埠的服務啟動 startServerCnxnFactory(); try { //3.5後新增的admin服務,就是它佔掉你的8080埠 adminServer.start(); } catch (AdminServerException e) { LOG.warn("Problem starting AdminServer", e); System.out.println(e); } //開始選舉 startLeaderElection(); startJvmPauseMonitor(); //服務執行時狀態邏輯 super.start(); } ``` 現在主要看一下選舉startLeaderElection。 ```java public synchronized void startLeaderElection() { try { //剛啟動的時候,狀態為looking,因此這裡會開始構建票據vote資訊 if (getPeerState() == ServerState.LOOKING) { //票據包含當前server的myid、zxid、currentEpoch currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch()); } } catch (IOException e) { RuntimeException re = new RuntimeException(e.getMessage()); re.setStackTrace(e.getStackTrace()); throw re; } //建立選舉演算法 this.electionAlg = createElectionAlgorithm(electionType); } ``` 選舉票據的構建完成、以及選舉演算法的建立完成。 ```java protected Election createElectionAlgorithm(int electionAlgorithm) { QuorumCnxManager qcm = createCnxnManager(); //... //這裡的監聽先忽略,回過頭在來看 QuorumCnxManager.Listener listener = qcm.listener; listener.start(); //FastLeaderElection構造中初始化了一些成員資訊 FastLeaderElection fle = new FastLeaderElection(this, qcm); //然後在start方法中啟動了WorkerSender和WorkerReceiver執行緒 fle.start(); le = fle; //... } private void starter(QuorumPeer self, QuorumCnxManager manager) { this.self = self; proposedLeader = -1; proposedZxid = -1; //構建了兩個阻塞佇列 sendqueue = new LinkedBlockingQueue(); recvqueue = new LinkedBlockingQueue(); //這個messager很重要,構建了兩個執行緒,WorkerSender和WorkerReceiver,用來接收發送訊息的 this.messenger = new Messenger(manager); } ``` 看到這個階段,會發現實際上主執行緒的邏輯就走到這裡了,之後會呼叫quorumPeer的join方法,進入阻塞狀態。看了這麼多,實際上還沒有到選舉的過程,接下來開始分析選舉的過程。 回到QuorumPeer#start(),這裡最後是執行緒的啟動,因此這個物件一定會去執行對應的run()方法。這個方法很長,通過邏輯拆分開,主要是對四個狀態的判斷處理。LOOKING、LEADING、FOLLOWING、OBSERVING,其中LOOKING表示選舉的狀態,因此接下來分析一下LOOKING執行的邏輯。 > LOOKING:選舉狀態,只有在LOOKING狀態才會去執行選舉邏輯。 > > LEADING:領導狀態,在這個狀態下表示當前節點已經是Leader了,允許處理事務請求。 > > FOLLOWING:跟隨者狀態,在這個狀態下會同步Leader資料,參與事務投票,處理非事務請求。 > > OBSERVING:觀察者狀態,這個狀態下會同步Leader資料,但是不參與投票,可以處理非事務請求。 ```java case LOOKING: LOG.info("LOOKING"); ServerMetrics.getMetrics().LOOKING_COUNT.add(1); //... try { reconfigFlagClear(); if (shuttingDownLE) { shuttingDownLE = false; startLeaderElection(); } //開始執行選舉的邏輯 setCurrentVote(makeLEStrategy().lookForLeader()); } catch (Exception e) { LOG.warn("Unexpected exception", e); setPeerState(ServerState.LOOKING); } } break; ``` lookForLeader()有兩個實現,我們之前在分析的時候,選舉的演算法預設是FastLeaderElection。 ```java public Vote lookForLeader() throws InterruptedException { // ---------------------------------- 第一部分初始化傳送選票 ----------------------- Map recvset = new HashMap(); Map outofelection = new HashMap(); int notTimeout = minNotificationInterval; synchronized (this) { //更新邏輯時鐘,用於選舉週期判斷 logicalclock.incrementAndGet(); //設定選票資料,myid、lastLoggedZxid、epoch updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } //傳送選票資料到其他節點,這裡是傳送是非同步的傳送,是生產者消費者模式的體現,構建toSend物件然後由WorkerSender執行緒去完成傳送邏輯 sendNotifications(); // ---------------------- 第二部分迴圈獲取選票資訊進行Leader選舉 ----------------------- /* * Loop in which we exchange notifications until we find a leader * 迴圈獲取 */ while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) { /* * Remove next notification from queue, times out after 2 times * the termination time * 從佇列中獲取投票資訊,這裡也是生產者消費者模式的體現, */ Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS); /* * Sends more notifications if haven't received enough. * Otherwise processes new notification. * 如果沒有收到資訊,判斷連線是否正常 */ if (n == null) { if (manager.haveDelivered()) { sendNotifications(); } else { manager.connectAll(); } /* * Exponential backoff */ int tmpTimeOut = notTimeout * 2; notTimeout = (tmpTimeOut < maxNotificationInterval ? tmpTimeOut : maxNotificationInterval); LOG.info("Notification time out: " + notTimeout); } else if (validVoter(n.sid) && validVoter(n.leader)) { //否則校驗選票的有效性。判斷leader和投票者是否在配置中 /* * Only proceed if the vote comes from a replica in the current or next * voting view for a replica in the current or next voting view. */ switch (n.state) { case LOOKING: //... // If notification > current, replace and send messages out 判斷通知的選舉epoch如果大於當前邏輯時鐘 if (n.electionEpoch > logicalclock.get()) { //將當前邏輯時鐘設定為通知拿到的物件epoch logicalclock.set(n.electionEpoch); recvset.clear(); //將收到的票據與當前選舉資訊進行比較,這裡是選舉的判斷核心 //判斷邏輯為,先比較epoch,然後比較zxid,最後比較myid if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { //更新proposal為新的票據資訊 updateProposal(n.leader, n.zxid, n.peerEpoch); } else { //否則使用當前的 updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } //然後重新廣播 sendNotifications(); } else if (n.electionEpoch < logicalclock.get()) {//如果當前邏輯時鐘大於通知的選舉epoch,則表示收到的vote已經過期了 if (LOG.isDebugEnabled()) { LOG.debug( "Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch) + ", logicalclock=0x" + Long.toHexString(logicalclock.get())); } break; } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications(); } if (LOG.isDebugEnabled()) { LOG.debug("Adding vote: from=" + n.sid + ", proposed leader=" + n.leader + ", proposed zxid=0x" + Long.toHexString(n.zxid) + ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch)); } //將收到的投票資訊放入集合中,以服務節點區分,用於後面決策出leader判斷 // don't care about the version if it's in LOOKING state recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch)); //判斷投票結論,這裡有多個實現,預設:The default QuorumVerifier is QuorumMaj if (voteSet.hasAllQuorums()) { // Verify if there is any change in the proposed leader //判斷是否有漏的選票,重新拉一次,如果有則重新計算 while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) { if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { recvqueue.put(n); break; } } /* * This predicate is true once we don't read any new * relevant message from the reception queue */ //如果沒有遺漏的選票了,則開始決策出結果,記錄當前節點狀態,清理投票資訊後然後最終選出的票據 if (n == null) { setPeerState(proposedLeader, voteSet); Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch); leaveInstance(endVote); return endVote; } } break; case OBSERVING://observing不參與 LOG.debug("Notification from observer: {}", n.sid); break; case FOLLOWING: case LEADING: /* * Consider all notifications from the same epoch * together. */ if (n.electionEpoch == logicalclock.get()) { recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); voteSet = getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) { setPeerState(n.leader, voteSet); Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; } } /* * Before joining an established ensemble, verify that * a majority are following the same leader. */ outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); voteSet = getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) { synchronized (this) { logicalclock.set(n.electionEpoch); setPeerState(n.leader, voteSet); } Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; //... } ``` 從上面的分析中得出,選舉通過在迴圈中不斷受到廣播過來的選票資訊來判斷,到達決策點的時候,更新狀態跳出迴圈。接下來回到QuorumPeer#run()方法中,這個時候在run方法的迴圈中,getPeerState已經被修改為選舉對應的結論資訊了,接下來看一下如果是Leader或者是Follower的情況下,邏輯如何執行。 ```java case FOLLOWING: //如果是follower try { LOG.info("FOLLOWING"); //初始化follower物件資訊 setFollower(makeFollower(logFactory)); // follower.followLeader(); } catch (Exception e) { LOG.warn("Unexpected exception", e); } finally { follower.shutdown(); setFollower(null); updateServerState(); } break; ``` 接下來看一下followLeader的方法。 ```java void followLeader() throws InterruptedException { //... self.setZabState(QuorumPeer.ZabState.DISCOVERY); //拿到leader節點資訊 QuorumServer leaderServer = findLeader(); try { //連線到leader connectToLeader(leaderServer.addr, leaderServer.hostname); //將 Follower 的 zxid 及 myid 等資訊封裝好發 //送到 Leader,同步 epoch。 //也就是意味著接下來 follower 節點只同步新epoch 的資料資訊 long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO); if (self.isReconfigStateChange()) { throw new Exception("learned about role change"); } //check to see if the leader zxid is lower than ours //this should never happen but is just a safety check long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid); //如果leader的epoch比當前epoch小 if (newEpoch < self.getAcceptedEpoch()) { LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid) + " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch())); throw new IOException("Error: Epoch of leader is lower"); } long startTime = Time.currentElapsedTime(); try { self.setLeaderAddressAndId(leaderServer.addr, leaderServer.getId()); self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION); //開始從leader同步資料,同步完成後啟動了FollowerZooKeeperServer syncWithLeader(newEpochZxid); self.setZabState(QuorumPeer.ZabState.BROADCAST); } finally { long syncTime = Time.currentElapsedTime() - startTime; ServerMetrics.getMetrics().FOLLOWER_SYNC_TIME.add(syncTime); } if (self.getObserverMasterPort() > 0) { LOG.info("Starting ObserverMaster"); om = new ObserverMaster(self, fzk, self.getObserverMasterPort()); om.start(); } else { om = null; } // create a reusable packet to reduce gc impact QuorumPacket qp = new QuorumPacket(); while (this.isRunning()) { //接受 Leader訊息,執行並反饋給 leader,執行緒在此自旋 readPacket(qp);//從 leader 讀取資料包 processPacket(qp);//處理 packet } //... ``` follower啟動的大致邏輯到這裡就分析的差不多了,其他關於資料怎麼同步的,跟客戶端的互動等不在此篇幅分析範圍內。 看完了follower,接下來看一下如果是leader要做些什麼操作。 ```java case LEADING: LOG.info("LEADING"); try { //MarkLeader同markFollower差不多,都是初始化一些資訊 setLeader(makeLeader(logFactory)); //主要還是看這個方法 leader.lead(); setLeader(null); } catch (Exception e) { LOG.warn("Unexpected exception", e); } finally { if (leader != null) { leader.shutdown("Forcing shutdown"); setLeader(null); } updateServerState(); } break; } ``` 好吧,這裡又是一個巨長的方法。 ```java /** * This method is main function that is called to lead * * @throws IOException * @throws InterruptedException */ void lead() throws IOException, InterruptedException { //... try { self.setZabState(QuorumPeer.ZabState.DISCOVERY); self.tick.set(0); //lead的資料載入從本地檔案讀取(啟動時已經初始化的情況下不會重新再次載入) zk.loadData(); leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid()); // Start thread that waits for connection requests from // new followers. cnxAcceptor = new LearnerCnxAcceptor(); //處理同Follower或者Observer的資訊同步,監聽learner變化 cnxAcceptor.start(); long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch()); zk.setZxid(ZxidUtils.makeZxid(epoch, 0)); synchronized (this) { lastProposed = zk.getZxid(); } newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(), null, null); if ((newLeaderProposal.packet.getZxid() & 0xffffffffL) != 0) { LOG.info("NEWLEADER proposal has Zxid of " + Long.toHexString(newLeaderProposal.packet.getZxid())); } QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier(); QuorumVerifier curQV = self.getQuorumVerifier(); if (curQV.getVersion() == 0 && curQV.getVersion() == lastSeenQV.getVersion()) { // This was added in ZOOKEEPER-1783. The initial config has version 0 (not explicitly // specified by the user; the lack of version in a config file is interpreted as version=0). // As soon as a config is established we would like to increase its version so that it // takes presedence over other initial configs that were not established (such as a config // of a server trying to join the ensemble, which may be a partial view of the system, not the full config). // We chose to set the new version to the one of the NEWLEADER message. However, before we can do that // there must be agreement on the new version, so we can only change the version when sending/receiving UPTODATE, // not when sending/receiving NEWLEADER. In other words, we can't change curQV here since its the committed quorum verifier, // and there's still no agreement on the new version that we'd like to use. Instead, we use // lastSeenQuorumVerifier which is being sent with NEWLEADER message // so its a good way to let followers know about the new version. (The original reason for sending // lastSeenQuorumVerifier with NEWLEADER is so that the leader completes any potentially uncommitted reconfigs // that it finds before starting to propose operations. Here we're reusing the same code path for // reaching consensus on the new version number.) // It is important that this is done before the leader executes waitForEpochAck, // so before LearnerHandlers return from their waitForEpochAck // hence before they construct the NEWLEADER message containing // the last-seen-quorumverifier of the leader, which we change below try { QuorumVerifier newQV = self.configFromString(curQV.toString()); newQV.setVersion(zk.getZxid()); self.setLastSeenQuorumVerifier(newQV, true); } catch (Exception e) { throw new IOException(e); } } newLeaderProposal.addQuorumVerifier(self.getQuorumVerifier()); if (self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()) { newLeaderProposal.addQuorumVerifier(self.getLastSeenQuorumVerifier()); } // We have to get at least a majority of servers in sync with // us. We do this by waiting for the NEWLEADER packet to get // acknowledged //等待過半的節點完成同步 waitForEpochAck(self.getId(), leaderStateSummary); self.setCurrentEpoch(epoch); self.setLeaderAddressAndId(self.getQuorumAddress(), self.getId()); self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION); try { //等待leader確認 waitForNewLeaderAck(self.getId(), zk.getZxid()); } catch (InterruptedException e) { shutdown("Waiting for a quorum of followers, only synced with sids: [ " + newLeaderProposal.ackSetsToString() + " ]"); HashSet followerSet = new HashSet(); for (LearnerHandler f : getLearners()) { if (self.getQuorumVerifier().getVotingMembers().containsKey(f.getSid())) { followerSet.add(f.getSid()); } } boolean initTicksShouldBeIncreased = true; for (Proposal.QuorumVerifierAcksetPair qvAckset : newLeaderProposal.qvAcksetPairs) { if (!qvAckset.getQuorumVerifier().containsQuorum(followerSet)) { initTicksShouldBeIncreased = false; break; } } if (initTicksShouldBeIncreased) { LOG.warn("Enough followers present. " + "Perhaps the initTicks need to be increased."); } return; } //啟動leaderServer服務 startZkServer(); /** * WARNING: do not use this for anything other than QA testing * on a real cluster. Specifically to enable verification that quorum * can handle the lower 32bit roll-over issue identified in * ZOOKEEPER-1277. Without this option it would take a very long * time (on order of a month say) to see the 4 billion writes * necessary to cause the roll-over to occur. * * This field allows you to override the zxid of the server. Typically * you'll want to set it to something like 0xfffffff0 and then * start the quorum, run some operations and see the re-election. */ String initialZxid = System.getProperty("zookeeper.testingonly.initialZxid"); if (initialZxid != null) { long zxid = Long.parseLong(initialZxid); zk.setZxid((zk.getZxid() & 0xffffffff00000000L) | zxid); } if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) { self.setZooKeeperServer(zk); } self.setZabState(QuorumPeer.ZabState.BROADCAST); self.adminServer.setZooKeeperServer(zk); // Everything is a go, simply start counting the ticks // WARNING: I couldn't find any wait statement on a synchronized // block that would be notified by this notifyAll() call, so // I commented it out //synchronized (this) { // notifyAll(); //} // We ping twice a tick, so we only update the tick every other // iteration boolean tickSkip = true; // If not null then shutdown this leader String shutdownMessage = null; //通過心跳監聽叢集的狀態 while (true) { synchronized (this) { long start = Time.currentElapsedTime(); long cur = start; long end = start + self.tickTime / 2; while (cur < end) { wait(end - cur); cur = Time.currentElapsedTime(); } if (!tickSkip) { self.tick.incrementAndGet(); } // We use an instance of SyncedLearnerTracker to // track synced learners to make sure we still have a // quorum of current (and potentially next pending) view. SyncedLearnerTracker syncedAckSet = new SyncedLearnerTracker(); syncedAckSet.addQuorumVerifier(self.getQuorumVerifier()); if (self.getLastSeenQuorumVerifier() != null && self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()) { syncedAckSet.addQuorumVerifier(self.getLastSeenQuorumVerifier()); } syncedAckSet.addAck(self.getId()); for (LearnerHandler f : getLearners()) { if (f.synced()) { syncedAckSet.addAck(f.getSid()); } } // check leader running status if (!this.isRunning()) { // set shutdown flag shutdownMessage = "Unexpected internal error"; break; } if (!tickSkip && !syncedAckSet.hasAllQuorums()) { // Lost quorum of last committed and/or last proposed // config, set shutdown flag shutdownMessage = "Not sufficient followers synced, only synced with sids: [ " + syncedAckSet.ackSetsToString() + " ]"; break; } tickSkip = !tickSkip; } for (LearnerHandler f : getLearners()) { //監聽的方式是通過傳送一個ping的資料包 f.ping(); } } if (shutdownMessage != null) { shutdown(shutdownMessage); // leader goes in looking state } } finally { zk.unregisterJMX(this); } } ``` 到這裡,啟動的原始碼分析差不多完成了,可以看到,關於leader的接單選舉出來後會去完成同follower或者observer的同步,然後開啟了Leader的zookeeperServer服務,並通過不斷的傳送ping資料包來保持對叢集的監聽。 > 總結: > > 當節點啟動預設節點在looking狀態,然後在looking狀態下通過指定的選舉演算法不斷髮起投票,廣播票據最終決策出Leader節點,預設情況下使用的過半機制來確認。關於投票的判斷順序為:epoch > zxid > myid。 > > 如果是Follower節點:初始化follower資訊,連線到leader節點並從leader節點同步資訊,啟動follower的zookeeperServer服務,然後迴圈監聽leader的packet資訊。 > > 如果是Leader節點:初始化leader資訊,載入本地database資訊,啟動LearnerCnxAcceptor監聽follower或者observer的資訊同步並監聽其狀態變化,等待過半節點同步完成,啟動leader的zookeeperServer服務,然後通過迴圈監聽叢集狀態,通過傳送ping資料包完成。