Zookeeper之跟著zk學習怎麼寫阻塞至過半投票
阿新 • • 發佈:2021-10-28
zookeeper中有很多邏輯是等到超過一半選票才能繼續的場景。來看看怎麼實現的吧
Leader.getEpochToPropose
在選出來主之後,會進入到這個分支
QuorumPeer#
case LEADING: LOG.info("LEADING"); try { setLeader(makeLeader(logFactory)); leader.lead(); setLeader(null); } catch (Exception e) { LOG.warn("Unexpected exception", e); } finally { if (leader != null) { leader.shutdown("Forcing shutdown"); setLeader(null); } updateServerState(); }break; }
同時,follower也會進入自己的follow分支,簡單說下邏輯,就是和leader建立通訊,然後傳送訊息,表示要和leader進行資料同步。
訊息型別為Leader.FOLLOWERINFO
這裡忽略leader的網路通訊部分邏輯。直接看獲取epoch的邏輯。
protected final Set<Long> connectingFollowers = new HashSet<Long>();
public long getEpochToPropose(long sid, long lastAcceptedEpoch) throwsInterruptedException, IOException { synchronized (connectingFollowers) { if (!waitingForNewEpoch) { return epoch; } if (lastAcceptedEpoch >= epoch) {//找followe傳過來的epoch最大值然後 +1 epoch = lastAcceptedEpoch + 1; } if (isParticipant(sid)) { connectingFollowers.add(sid); } QuorumVerifier verifier = self.getQuorumVerifier(); if (connectingFollowers.contains(self.getId()) && verifier.containsQuorum(connectingFollowers)) {//超過一半了 waitingForNewEpoch = false;//這裡改為false 下面的while就會跳出了 self.setAcceptedEpoch(epoch); connectingFollowers.notifyAll();//喚醒阻塞 } else { long start = Time.currentElapsedTime(); if (sid == self.getId()) { timeStartWaitForEpoch = start; } long cur = start; long end = start + self.getInitLimit() * self.getTickTime(); while (waitingForNewEpoch && cur < end && !quitWaitForEpoch) { connectingFollowers.wait(end - cur); cur = Time.currentElapsedTime(); } if (waitingForNewEpoch) { throw new InterruptedException("Timeout while waiting for epoch from quorum"); } } return epoch; } }
QuorumVerifier 可以簡單地理解就是比較是否大於叢集伺服器數量的一半的工具類
向connectionFollowers中新增記錄的邏輯在LearnerHandler
@Override public void run() { try { ......long newEpoch = learnerMaster.getEpochToPropose(this.getSid(), lastAcceptedEpoch);//不斷向connectionFollowers中增加元素