ZooKeeper系統之(四):跟隨者工作模式
當ZooKeeper叢集啟動之後,需要完成leader和follower之間的資料同步。
首先leader和observer有一個共同的父類learner,裡面定義了一些公共方法。叢集正常執行後會有一個leader和多個follower(這裡observer就不單獨說了,和follower的行為是類似的)。
1、 註冊過程
follower在提供服務給客戶端之前必須完成註冊到leader的動作。
註冊分為以下3個主要步驟:
a) 呼叫connectToLeader方法連線到Leader。
b) 呼叫registerWithLeader方法註冊到Leader,交換各自的sid、zxid和Epoch等資訊,Leader以此決定事務同步的方式。
c) 呼叫SyncWithLeader跟Leader進行事務資料同步,處理SNAP/DIFF/TRUNC包。
這3個方法都定義在父類Learner類中。下面我們以Follower作為例子說明註冊到Leader的完整流程。
2、connectToLeader
connectToLeader方法功能較簡單,建立Socket連線到Leader。該方法定義在Follower的父類Learner中。它加了重試機制,具體的程式碼這裡就不給出了。
最多可以嘗試5次連線。連線成功後Leader會建立一個LearnerHandler專門處理與該Follower之間的QuorunPacket訊息的傳遞。
3、registerWithLeader
Follower連線Leader成功之後,馬上呼叫registerWithLeader方法。
registerWithLeader方法首先發送FOLLOWERINFO包給Leader,告訴Leader自己的身份屬性(Follower的zxid,sid)。然後等待Leader回覆的LEADINFO包,獲取Leader的Epoch和zxid值,並更新Follower的Epoch和zxid值,以Leader資訊為準。
最後給Leader發ACKEPOCH包,告訴Leader這次Follower已經與Leader的zxid同步了。
這裡acceptedEpoch就是Leader的Epoch。
整個resigerWithLeader流程如下圖所示:
接下來Follower就要進入syncWithLeader方法來與Leader同步資料了。
4、SyncWithLeader
SyncWithLeader方法同步Leader的事務到Follower,該方法較長,這裡分段介紹其整個過程。
首先讀取同步資料包,主要程式碼如下:
QuorumPacket qp = new QuorumPacket();
readPacket(qp);
if (qp.getType() == Leader.SNAP){
zk.getZKDatabase().deserializeSnapshot(leaderIs);
}else if (qp.getType() == Leader. TRUNC) {
boolean truncated=zk.getZKDatabase().truncateLog(qp.getZxid());
if (!truncated) {
// not able to truncate the log
LOG.error("Not able to truncate the log " + Long.toHexString(qp.getZxid()));
System.exit(13);
}
}else if (qp.getType() == Leader.DIFF) {
LOG.info("Getting a diff from the leader 0x{}", ong.toHexString(qp.getZxid()));
snapshotNeeded = false;
}
同步方式分成3種:
- SNAP:快照模式,這種模式下Leader將整個完整資料庫傳給Follower。
- TRUNC:截斷模式,這種模式表明Follower的資料比Leader還多,為了維持一致性需要將Follower多餘的資料刪除。
- DIFF:差異模式,說明Follower比Leader的事務少,需要給Follower補足,這時候Leader會將需要補充的事務生成PROPOSAL包和COMMIT包發給Follower執行。
當前面都執行完成後,還有一段程式碼處理後續訊息(這裡是QuorumPacket型別),比如:PROPOSAL、COMMIT、NEWLEADER等。例如PROPOSAL是指同步期間收到的leader傳送的寫請求資訊,快取在packetsNotCommitted裡,等後續處理,這這部分程式碼可以先不管。
這部分的主要程式碼是這樣的:
while (self.isRunning()) {
readPacket(qp);
switch(qp.getType()) {
case Leader.PROPOSAL:
packetsNotCommitted.add(pif);
break;
case Leader.COMMIT:
case Leader.COMMITANDACTIVATE:
pif = packetsNotCommitted.peekFirst();
if (!writeToTxnLog) {
zk.processTxn(pif.hdr, pif.rec);
packetsNotCommitted.remove();
} else {
packetsCommitted.add(qp.getZxid());
}
break;
case Leader.INFORM:
case Leader.INFORMANDACTIVATE:
if (!writeToTxnLog) {
// Apply to db directly if we haven't taken the snapshot
zk.processTxn(packet.hdr, packet.rec);
} else {
packetsNotCommitted.add(packet);
packetsCommitted.add(qp.getZxid());
}
break;
case Leader.UPTODATE:
if (isPreZAB1_0) {
zk.takeSnapshot();
self.setCurrentEpoch(newEpoch);
}
self.setZooKeeperServer(zk);
self.adminServer.setZooKeeperServer(zk);
break outerLoop;
case Leader.NEWLEADER:
if (snapshotNeeded) {
zk.takeSnapshot();
}
self.setCurrentEpoch(newEpoch);
writeToTxnLog = true;
isPreZAB1_0 = false;
writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
break;
}
}
然後Leader會發送NEWLEADER包,Follower收到NEWLEADER包後回覆ACK給Leader。
最後Leader發UPTODATE包表示同步完成,Follower這時啟動服務端並跳出本次迴圈,準備結束整個註冊過程。
5、 Follower主流程
Follower是Learner的子類,Follower的啟動方法就是followLeader。
followLeader的主要程式碼片段如下:
connectToLeader(addr);
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
syncWithLeader(newEpochZxid);
QuorumPacket qp = new QuorumPacket();
while (this.isRunning()) {
readPacket(qp);
processPacket(qp);
}
啟動時首先與Leader同步資料,然後啟動FollowerZooKeeperServer,在FollowerZooKeeperServer執行的同時,額外啟動while迴圈等待Peer的QuorumPacket包,呼叫processPacket方法處理這些包。
processPacket處理QuorumPeer傳送的QuorumPacket,最主要是處理兩種QuorumPacket:PROPOSAL和COMMIT。當然還有PING、COMMITANDACTIVATE等包型別,為便於簡化梳理程式碼設計思路,這裡就不再詳述了。
該方法在收到Leader傳送過來的QuorumPacket時被呼叫,主要是響應PROPOSAL和COMMIT兩種型別的訊息。
PROPOSAL是Leader將要執行的寫事務命令;COMMIT是提交命令。Follower只有在收到COMMIT訊息後才會讓PROPOSAL命令的內容生效。
同一個寫事務命令會在Leader和多個Follower上都執行一次,保證叢集資料的一致性。
程式碼片段:
case Leader.PROPOSAL:
TxnHeader hdr = new TxnHeader();
Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
lastQueued = hdr.getZxid();
fzk.logRequest(hdr, txn);
break;
case Leader.COMMIT:
fzk.commit(qp.getZxid());
break;
Follower收到PROPOSAL訊息後呼叫FollowerZooKeeperServer的logRequest方法;收到COMMIT訊息後呼叫FollowerZooKeeperServer的commit方法。
- PROPOSAL包
Leader傳送給叢集中所有follower的寫請求包。
Leader執行寫操作時需要告之叢集中的Learner,讓大家也執行寫操作,保證叢集資料的一致性。PROPOSAL是嚴格按照順序執行的,這也是ZOOKEEPER的核心設計思想之一。
- COMMIT包
當Leader認為一個Proposal已被大多數Follower持久化並等待執行後會傳送COMMIT包,通知各Follower可以提交執行該Proposal了,最後呼叫到FinalRequestProcessor執行寫操作,通過這種機制保證寫操作能被大半數叢集機器執行。
6、 Observer主流程
Observer和Follower功能類似,主要的差別就是不參與選舉。
Observer的入口方法是observerLeader。當QuorumPeer的狀態是OBSERVING時會啟動Observer並呼叫observerLeader方法。
observerLeader同Follower的followLeader方法類似,首先註冊到Leader,事務同步後進入QuorumPacket包迴圈處理過程,呼叫processPacket方法處理QuorumPacket。
processPacket比Follower要簡單許多,最主要是處理INFORM包來執行Leader的寫請求命令。
這裡處理的是INFORM訊息,Leader群發寫事務時,給Follower發的是PROPOSAL並要等待Follower確認;而給Observer發的則是INFORM訊息並且不需要Obverver回覆