ZooKeeper系列之(五):領導者工作模式
領導者就是Leader,是整個叢集的寫事務流程負責人。
一輪選舉結束時產生新的Leader,並且Epoch加1。同時新的Leader先將自己的zxid設定為Epoch左移32位(Epoch<32),將是叢集中最大的zxid。
Leader監聽Socket等待Follower的連線請求,每次新的Follower連線的時候都會啟動一個LearnerHandler執行緒專門處理與該Follower的互動。LearnerHandler迴圈接收Follower的訊息包,並交給Leader進行處理。
leader啟動流程:
1. Leader選舉完成之後,Peer確認了自己是Leader的身份,在QuromPeer的主執行緒中執行Leader的邏輯
2. 建立Leader物件,並建立Server繫結在QuorumAddress上,用於和其他Follower之間相互通訊
3. 呼叫Leader::lead函式,執行Leader的真正的邏輯
a) 呼叫ZooKeeperServer::loadData,從磁碟中恢復資料和session列表
b) 啟用新的epoch,zookeeper中的zxid是64位,用於唯一標示一個操作,zxid的高32位是epoch,每次Leader切換加1,低32位為序列號,每次操作加1
c) 啟動繫結在QuorumAddress上的Server,為每個Follower的連線建立一個LearnerHandler,用於和Follower做互動,這裡的邏輯另外單獨論述
d) 向所有的Follower傳送一個NEWLEADER包,宣告自己額Leader身份,並在initLimit時間內等待大多數的Follower完成和Leader的同步,併發送ACK包,表示Follower已經和Leader完成同步並可以對外提供服務
e) 這時Leader和Client之間的互動在cnxnFactory的Server中,Leader和Follower之間的互動在LearnerHandler所屬的執行緒中
f) 然後呼叫Leader::lead函式的QuromPeer執行緒在每個tickTime中都會發送ping訊息給其他的follower,follower在接收到ping訊息後會回覆一個ping訊息,並附帶上follower的session tracker裡的所有session資訊,leader收到follower的ping訊息後,根據傳回的session資訊更新自己的session資訊 。
Leader在接收到Follower的註冊請求之後(Follower呼叫connectToLeader方法),等待收到FOLLOWERINFO包:
QuorumPacket qp = new QuorumPacket();
ia.readRecord(qp, "packet");
if (qp.getType() == Leader.OBSERVERINFO) {
learnerType = LearnerType.OBSERVER;
}
long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
long peerLastZxid;
StateSummary ss = null;
long zxid = qp.getZxid();
long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);
QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
oa.writeRecord(newEpochPacket, "packet");
bufferedOutput.flush();
QuorumPacket ackEpochPacket = new QuorumPacket();
ia.readRecord(ackEpochPacket, "packet");
ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
leader.waitForEpochAck(this.getSid(), ss);
peerLastZxid = ss.getLastZxid();
1) lastAcceptedEpoch:是Follower的Epoch值。
2) Zxid:是Follower的zxid
3) newEpoch:Leader根據FOLLOWERINF的值計算出新的Epoch
4) newLeaderZxid:根據新的Epoch計算新的Leader的zxid
然後給Follower傳送LEADERINFO包,將新的zxid和Epoch告訴Follower,好讓Follower知道應該要同步哪些資料。
Leader然後傳送快照包給Follower,Follower根據快照包將本地資料庫恢復到與Leader相同。
如果Follower的事務比Leader少一些(在minCommittedLog 和maxCommittedLog之間),則不需發SNAP包,而是發DIFF包,同時將需補充的事務通過PROPOSAL和COMMIT發給Follower執行。相關邏輯在syncFollower,queueCommittedProposals,startSendingPackets等方法中實現。這部分主要程式碼如下所示:
boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader);
QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid,
leader.self.getLastSeenQuorumVerifier().toString().getBytes(), null);
queuedPackets.add(newLeaderQP);
if (needSnap) {
try {
long zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
bufferedOutput.flush();
leader.zk.getZKDatabase().serializeSnapshot(oa);
oa.writeString("BenWasHere", "signature");
bufferedOutput.flush();
} finally {
snapshot.close();
}
}
startSendingPackets();
startSendingPackets將需要同步的事務傳送給Follower,事務同步完成後,Leader傳送NEWLEADER包給Follower。
然後等Follower回覆第一個ACK包。收到ACK之後呼叫Leader的waitForNewLeaderAck方法告訴Leader該Follower已經完成同步。
當Leader收到足夠多的waitForNewLeaderAck方法呼叫後(通常超過半數),知道大部分Follower已經註冊到本Leader上來了,這時候Leader才能確保正式發揮