1. 程式人生 > >ZooKeeper系統之(四):跟隨者工作模式

ZooKeeper系統之(四):跟隨者工作模式

當ZooKeeper叢集啟動之後,需要完成leader和follower之間的資料同步。

首先leader和observer有一個共同的父類learner,裡面定義了一些公共方法。叢集正常執行後會有一個leader和多個follower(這裡observer就不單獨說了,和follower的行為是類似的)。

1、 註冊過程

follower在提供服務給客戶端之前必須完成註冊到leader的動作。

 

註冊分為以下3個主要步驟:

a) 呼叫connectToLeader方法連線到Leader。

b) 呼叫registerWithLeader方法註冊到Leader,交換各自的sid、zxid和Epoch等資訊,Leader以此決定事務同步的方式。

c) 呼叫SyncWithLeader跟Leader進行事務資料同步,處理SNAP/DIFF/TRUNC包。

這3個方法都定義在父類Learner類中。下面我們以Follower作為例子說明註冊到Leader的完整流程。

2、connectToLeader

connectToLeader方法功能較簡單,建立Socket連線到Leader。該方法定義在Follower的父類Learner中。它加了重試機制,具體的程式碼這裡就不給出了。

最多可以嘗試5次連線。連線成功後Leader會建立一個LearnerHandler專門處理與該Follower之間的QuorunPacket訊息的傳遞。

3、registerWithLeader

Follower連線Leader成功之後,馬上呼叫registerWithLeader方法。

registerWithLeader方法首先發送FOLLOWERINFO包給Leader,告訴Leader自己的身份屬性(Follower的zxid,sid)。然後等待Leader回覆的LEADINFO包,獲取Leader的Epoch和zxid值,並更新Follower的Epoch和zxid值,以Leader資訊為準。

最後給Leader發ACKEPOCH包,告訴Leader這次Follower已經與Leader的zxid同步了。

這裡acceptedEpoch就是Leader的Epoch。

整個resigerWithLeader流程如下圖所示:

 

接下來Follower就要進入syncWithLeader方法來與Leader同步資料了。

4、SyncWithLeader

SyncWithLeader方法同步Leader的事務到Follower,該方法較長,這裡分段介紹其整個過程。

首先讀取同步資料包,主要程式碼如下:

QuorumPacket qp = new QuorumPacket();
readPacket(qp);
if (qp.getType() == Leader.SNAP){
  zk.getZKDatabase().deserializeSnapshot(leaderIs);
}else if (qp.getType() == Leader. TRUNC) {     
  boolean truncated=zk.getZKDatabase().truncateLog(qp.getZxid());
if (!truncated) {
      // not able to truncate the log
      LOG.error("Not able to truncate the log " + Long.toHexString(qp.getZxid()));
      System.exit(13);
   }
}else if (qp.getType() == Leader.DIFF) {
   LOG.info("Getting a diff from the leader 0x{}", ong.toHexString(qp.getZxid()));
   snapshotNeeded = false;
}

同步方式分成3種:

  1. SNAP:快照模式,這種模式下Leader將整個完整資料庫傳給Follower。
  2. TRUNC:截斷模式,這種模式表明Follower的資料比Leader還多,為了維持一致性需要將Follower多餘的資料刪除。
  3. DIFF:差異模式,說明Follower比Leader的事務少,需要給Follower補足,這時候Leader會將需要補充的事務生成PROPOSAL包和COMMIT包發給Follower執行。

當前面都執行完成後,還有一段程式碼處理後續訊息(這裡是QuorumPacket型別),比如:PROPOSAL、COMMIT、NEWLEADER等。例如PROPOSAL是指同步期間收到的leader傳送的寫請求資訊,快取在packetsNotCommitted裡,等後續處理,這這部分程式碼可以先不管。

這部分的主要程式碼是這樣的:

while (self.isRunning()) {
     readPacket(qp);
     switch(qp.getType()) {
        case Leader.PROPOSAL:                  
             packetsNotCommitted.add(pif);
             break;
        case Leader.COMMIT:
        case Leader.COMMITANDACTIVATE:
             pif = packetsNotCommitted.peekFirst();                   
             if (!writeToTxnLog) {
                 zk.processTxn(pif.hdr, pif.rec);
                 packetsNotCommitted.remove();
             } else {
                 packetsCommitted.add(qp.getZxid());
             }
             break;
        case Leader.INFORM:
        case Leader.INFORMANDACTIVATE: 
             if (!writeToTxnLog) {
               // Apply to db directly if we haven't taken the snapshot
                 zk.processTxn(packet.hdr, packet.rec);
             } else {
                 packetsNotCommitted.add(packet);
                 packetsCommitted.add(qp.getZxid());
             }
             break;                
         case Leader.UPTODATE: 
             if (isPreZAB1_0) {
                 zk.takeSnapshot();
                 self.setCurrentEpoch(newEpoch);
              }
              self.setZooKeeperServer(zk);
              self.adminServer.setZooKeeperServer(zk);
              break outerLoop;
        case Leader.NEWLEADER:                   
             if (snapshotNeeded) {
                zk.takeSnapshot();
              }
              self.setCurrentEpoch(newEpoch);
              writeToTxnLog = true;
              isPreZAB1_0 = false;
              writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
              break;
     }
}

然後Leader會發送NEWLEADER包,Follower收到NEWLEADER包後回覆ACK給Leader。

最後Leader發UPTODATE包表示同步完成,Follower這時啟動服務端並跳出本次迴圈,準備結束整個註冊過程。

5、 Follower主流程

Follower是Learner的子類,Follower的啟動方法就是followLeader。

followLeader的主要程式碼片段如下:

connectToLeader(addr);
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
syncWithLeader(newEpochZxid);                
QuorumPacket qp = new QuorumPacket();
while (this.isRunning()) {
      readPacket(qp);
      processPacket(qp);
}

啟動時首先與Leader同步資料,然後啟動FollowerZooKeeperServer,在FollowerZooKeeperServer執行的同時,額外啟動while迴圈等待Peer的QuorumPacket包,呼叫processPacket方法處理這些包。

processPacket處理QuorumPeer傳送的QuorumPacket,最主要是處理兩種QuorumPacket:PROPOSAL和COMMIT。當然還有PING、COMMITANDACTIVATE等包型別,為便於簡化梳理程式碼設計思路,這裡就不再詳述了。

該方法在收到Leader傳送過來的QuorumPacket時被呼叫,主要是響應PROPOSAL和COMMIT兩種型別的訊息。

PROPOSAL是Leader將要執行的寫事務命令;COMMIT是提交命令。Follower只有在收到COMMIT訊息後才會讓PROPOSAL命令的內容生效。

同一個寫事務命令會在Leader和多個Follower上都執行一次,保證叢集資料的一致性。

程式碼片段:

case Leader.PROPOSAL:           
      TxnHeader hdr = new TxnHeader();
      Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);     
      lastQueued = hdr.getZxid();
      fzk.logRequest(hdr, txn);
      break;
case Leader.COMMIT:
      fzk.commit(qp.getZxid());
      break;

Follower收到PROPOSAL訊息後呼叫FollowerZooKeeperServer的logRequest方法;收到COMMIT訊息後呼叫FollowerZooKeeperServer的commit方法。

  • PROPOSAL包

Leader傳送給叢集中所有follower的寫請求包。

Leader執行寫操作時需要告之叢集中的Learner,讓大家也執行寫操作,保證叢集資料的一致性。PROPOSAL是嚴格按照順序執行的,這也是ZOOKEEPER的核心設計思想之一。

  • COMMIT包

當Leader認為一個Proposal已被大多數Follower持久化並等待執行後會傳送COMMIT包,通知各Follower可以提交執行該Proposal了,最後呼叫到FinalRequestProcessor執行寫操作,通過這種機制保證寫操作能被大半數叢集機器執行。

6、 Observer主流程

Observer和Follower功能類似,主要的差別就是不參與選舉。

Observer的入口方法是observerLeader。當QuorumPeer的狀態是OBSERVING時會啟動Observer並呼叫observerLeader方法。

observerLeader同Follower的followLeader方法類似,首先註冊到Leader,事務同步後進入QuorumPacket包迴圈處理過程,呼叫processPacket方法處理QuorumPacket。

processPacket比Follower要簡單許多,最主要是處理INFORM包來執行Leader的寫請求命令。

這裡處理的是INFORM訊息,Leader群發寫事務時,給Follower發的是PROPOSAL並要等待Follower確認;而給Observer發的則是INFORM訊息並且不需要Obverver回覆