Zookeeper的預設選舉
zookeeper中預設的選舉模式是通過類FastLeaderElection類來實現的,在QuorumPeer類的啟動方法start()方法中,通過startLeaderElection()方法首先生成一開始自己的選票,首先會把投票的leader選為自己的id,同時帶上自己的zxid,以及當前的epoch輪數,同時上者也是構成一張選票的三個元素。之後根據 createElectionAlgorithm()來生成選舉功能的類。
synchronized public void startLeaderElection() { try { if (getPeerState() == ServerState.LOOKING) { currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch()); } } catch(IOException e) { RuntimeException re = new RuntimeException(e.getMessage()); re.setStackTrace(e.getStackTrace()); throw re; } this.electionAlg = createElectionAlgorithm(electionType); } protected Election createElectionAlgorithm(int electionAlgorithm){ Election le=null; //TODO: use a factory rather than a switch switch (electionAlgorithm) { case 1: le = new AuthFastLeaderElection(this); break; case 2: le = new AuthFastLeaderElection(this, true); break; case 3: qcm = createCnxnManager(); QuorumCnxManager.Listener listener = qcm.listener; if(listener != null){ listener.start(); FastLeaderElection fle = new FastLeaderElection(this, qcm); fle.start(); le = fle; } else { LOG.error("Null listener when initializing cnx manager"); } break; default: assert false; } return le; }
Zookeeper中electionAlgorithm的值預設為3,所以這裡會生成QuorumCnxManager以及FastLeaderElection來完成預設的選舉。但此時FastLeaderElection只是完成了構造方法,並沒有正式開始選舉。
private void starter(QuorumPeer self, QuorumCnxManager manager) { this.self = self; proposedLeader = -1; proposedZxid = -1; sendqueue = new LinkedBlockingQueue<ToSend>(); recvqueue = new LinkedBlockingQueue<Notification>(); this.messenger = new Messenger(manager); } Messenger(QuorumCnxManager manager) { this.ws = new WorkerSender(manager); this.wsThread = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]"); this.wsThread.setDaemon(true); this.wr = new WorkerReceiver(manager); this.wrThread = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]"); this.wrThread.setDaemon(true); }
在FastLeaderElection的構造方法中,構造了兩條佇列分別存放要傳送的訊息和接收到的訊息,同時根據之前負責與各個伺服器之前通訊的QuorumCnxManager來生成WokerSender和WorkerReceiver兩條執行緒來負責訊息的接受解析和發出,但此時執行緒的start()方法並還沒有被呼叫,此時的選舉並未開始。
public void run() { while (!stop) { try { ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS); if(m == null) continue; process(m); } catch (InterruptedException e) { break; } } LOG.info("WorkerSender is down"); } void process(ToSend m) { ByteBuffer requestBuffer = buildMsg(m.state.ordinal(), m.leader, m.zxid, m.electionEpoch, m.peerEpoch, m.configData); manager.toSend(m.sid, requestBuffer); }
負責傳送訊息的WokerSender的run()方法實現顯得極為簡單,只是簡單的不斷從存放要傳送訊息的佇列中不斷去取,然後呼叫process()方法構造訊息交由之前的QuorumCnxManager傳送給別的伺服器。
而負責接收訊息的WorkerReceiver的run()方法的實現要複雜的多。
public void run() {
Message response;
while (!stop) {
// Sleeps on receive
try {
response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
if(response == null) continue;
// The current protocol and two previous generations all send at least 28 bytes
if (response.buffer.capacity() < 28) {
LOG.error("Got a short response: " + response.buffer.capacity());
continue;
}
// this is the backwardCompatibility mode in place before ZK-107
// It is for a version of the protocol in which we didn't send peer epoch
// With peer epoch and version the message became 40 bytes
boolean backCompatibility28 = (response.buffer.capacity() == 28);
// this is the backwardCompatibility mode for no version information
boolean backCompatibility40 = (response.buffer.capacity() == 40);
response.buffer.clear();
// Instantiate Notification and set its attributes
Notification n = new Notification();
int rstate = response.buffer.getInt();
long rleader = response.buffer.getLong();
long rzxid = response.buffer.getLong();
long relectionEpoch = response.buffer.getLong();
long rpeerepoch;
int version = 0x0;
if (!backCompatibility28) {
rpeerepoch = response.buffer.getLong();
if (!backCompatibility40) {
/*
* Version added in 3.4.6
*/
version = response.buffer.getInt();
} else {
LOG.info("Backward compatibility mode (36 bits), server id: {}", response.sid);
}
} else {
LOG.info("Backward compatibility mode (28 bits), server id: {}", response.sid);
rpeerepoch = ZxidUtils.getEpochFromZxid(rzxid);
}
QuorumVerifier rqv = null;
// check if we have a version that includes config. If so extract config info from message.
if (version > 0x1) {
int configLength = response.buffer.getInt();
byte b[] = new byte[configLength];
response.buffer.get(b);
synchronized(self) {
try {
rqv = self.configFromString(new String(b));
QuorumVerifier curQV = self.getQuorumVerifier();
if (rqv.getVersion() > curQV.getVersion()) {
LOG.info("{} Received version: {} my version: {}", self.getId(),
Long.toHexString(rqv.getVersion()),
Long.toHexString(self.getQuorumVerifier().getVersion()));
if (self.getPeerState() == ServerState.LOOKING) {
LOG.debug("Invoking processReconfig(), state: {}", self.getServerState());
self.processReconfig(rqv, null, null, false);
if (!rqv.equals(curQV)) {
LOG.info("restarting leader election");
self.shuttingDownLE = true;
self.getElectionAlg().shutdown();
break;
}
} else {
LOG.debug("Skip processReconfig(), state: {}", self.getServerState());
}
}
} catch (IOException e) {
LOG.error("Something went wrong while processing config received from {}", response.sid);
} catch (ConfigException e) {
LOG.error("Something went wrong while processing config received from {}", response.sid);
}
}
} else {
LOG.info("Backward compatibility mode (before reconfig), server id: {}", response.sid);
}
/*
* If it is from a non-voting server (such as an observer or
* a non-voting follower), respond right away.
*/
if(!self.getCurrentAndNextConfigVoters().contains(response.sid)) {
Vote current = self.getCurrentVote();
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(ToSend.mType.notification,
current.getId(),
current.getZxid(),
logicalclock.get(),
self.getPeerState(),
response.sid,
current.getPeerEpoch(),
qv.toString().getBytes());
sendqueue.offer(notmsg);
} else {
// Receive new message
if (LOG.isDebugEnabled()) {
LOG.debug("Receive new notification message. My id = "
+ self.getId());
}
// State of peer that sent this message
QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
switch (rstate) {
case 0:
ackstate = QuorumPeer.ServerState.LOOKING;
break;
case 1:
ackstate = QuorumPeer.ServerState.FOLLOWING;
break;
case 2:
ackstate = QuorumPeer.ServerState.LEADING;
break;
case 3:
ackstate = QuorumPeer.ServerState.OBSERVING;
break;
default:
continue;
}
n.leader = rleader;
n.zxid = rzxid;
n.electionEpoch = relectionEpoch;
n.state = ackstate;
n.sid = response.sid;
n.peerEpoch = rpeerepoch;
n.version = version;
n.qv = rqv;
/*
* Print notification info
*/
if(LOG.isInfoEnabled()){
printNotification(n);
}
/*
* If this server is looking, then send proposed leader
*/
if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
recvqueue.offer(n);
/*
* Send a notification back if the peer that sent this
* message is also looking and its logical clock is
* lagging behind.
*/
if((ackstate == QuorumPeer.ServerState.LOOKING)
&& (n.electionEpoch < logicalclock.get())){
Vote v = getVote();
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(ToSend.mType.notification,
v.getId(),
v.getZxid(),
logicalclock.get(),
self.getPeerState(),
response.sid,
v.getPeerEpoch(),
qv.toString().getBytes());
sendqueue.offer(notmsg);
}
} else {
/*
* If this server is not looking, but the one that sent the ack
* is looking, then send back what it believes to be the leader.
*/
Vote current = self.getCurrentVote();
if(ackstate == QuorumPeer.ServerState.LOOKING){
if(LOG.isDebugEnabled()){
LOG.debug("Sending new notification. My id ={} recipient={} zxid=0x{} leader={} config version = {}",
self.getId(),
response.sid,
Long.toHexString(current.getZxid()),
current.getId(),
Long.toHexString(self.getQuorumVerifier().getVersion()));
}
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(
ToSend.mType.notification,
current.getId(),
current.getZxid(),
current.getElectionEpoch(),
self.getPeerState(),
response.sid,
current.getPeerEpoch(),
qv.toString().getBytes());
sendqueue.offer(notmsg);
}
}
}
} catch (InterruptedException e) {
LOG.warn("Interrupted Exception while waiting for new message" +
e.toString());
}
}
LOG.info("WorkerReceiver is down");
}
}
首先,會從QuorumCnxManager中存放接收到的訊息的佇列中去取得,之後就會對這個訊息進行解析。但是,針對一條訊息解析的前提是首先這個訊息的大小應該大於28位元組。訊息發出者的角色(int),所要支援的leader(long),當前的zxid(long),當前的輪數epoch(long)。以上是訊息必須所包含的成員,缺少一樣都無法完成後面關於選舉的選票訊息解析。
在確認接受到的訊息的大小之後,取得發出訊息的伺服器的id來判斷該id是否在參與投票的伺服器行列之內,如果不在(比如說該伺服器的角色是一個observer),那麼忽略這一訊息,並且傳送一條包含自己選票資訊的訊息給這個伺服器。之後,將訊息的具體資料注入在Notification類中,方便後續的使用。
之後,判斷此時伺服器處於的狀態,如果處於looking,那麼說明此時的伺服器需要這條訊息來參與選舉的過程,那麼將這條訊息正式加入先前構造方法中建立的存放訊息的佇列中。但是,如果這條訊息發出方處於looking狀態,並且其輪數小於當前的輪數,那麼將自己當前的選票傳送給這條訊息的傳送者。
但是如果此時當前伺服器不處於looking狀態並且傳送方還處於looking狀態,那麼則將選舉結果傳送回。
以上兩條執行緒是FastLeaderElection內部接受和傳送訊息的邏輯。
但此時,FastLeaderElection還沒有正式開始工作,不過在構造方法完成後,馬上會呼叫start()方法開始,但選舉的邏輯還沒有開始。
void start(){
this.wsThread.start();
this.wrThread.start();
}
在QuorumPeer完成上述FastLeaderElection的構建之後,就會呼叫run()方法,在run()方法中,如果處於looking狀態,就會呼叫FastLeaderElection的lookForLeader()方法,正式開始選舉。
public Vote lookForLeader() throws InterruptedException {
try {
self.jmxLeaderElectionBean = new LeaderElectionBean();
MBeanRegistry.getInstance().register(
self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
self.jmxLeaderElectionBean = null;
}
if (self.start_fle == 0) {
self.start_fle = Time.currentElapsedTime();
}
try {
Map<Long, Vote> recvset = new HashMap<Long, Vote>();
Map<Long, Vote> outofelection = new HashMap<Long, Vote>();
int notTimeout = finalizeWait;
synchronized(this){
logicalclock.incrementAndGet();
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
LOG.info("New election. My id = " + self.getId() +
", proposed zxid=0x" + Long.toHexString(proposedZxid));
sendNotifications();
/*
* Loop in which we exchange notifications until we find a leader
*/
while ((self.getPeerState() == ServerState.LOOKING) &&
(!stop)){
/*
* Remove next notification from queue, times out after 2 times
* the termination time
*/
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);
/*
* Sends more notifications if haven't received enough.
* Otherwise processes new notification.
*/
if(n == null){
if(manager.haveDelivered()){
sendNotifications();
} else {
manager.connectAll();
}
/*
* Exponential backoff
*/
int tmpTimeOut = notTimeout*2;
notTimeout = (tmpTimeOut < maxNotificationInterval?
tmpTimeOut : maxNotificationInterval);
LOG.info("Notification time out: " + notTimeout);
}
else if (self.getCurrentAndNextConfigVoters().contains(n.sid)) {
/*
* Only proceed if the vote comes from a replica in the current or next
* voting view.
*/
switch (n.state) {
case LOOKING:
if (getInitLastLoggedZxid() == -1) {
LOG.debug("Ignoring notification as our zxid is -1");
break;
}
if (n.zxid == -1) {
LOG.debug("Ignoring notification from member with -1 zxid" + n.sid);
break;
}
// If notification > current, replace and send messages out
if (n.electionEpoch > logicalclock.get()) {
logicalclock.set(n.electionEpoch);
recvset.clear();
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) {
if(LOG.isDebugEnabled()){
LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
+ Long.toHexString(n.electionEpoch)
+ ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
}
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
if(LOG.isDebugEnabled()){
LOG.debug("Adding vote: from=" + n.sid +
", proposed leader=" + n.leader +
", proposed zxid=0x" + Long.toHexString(n.zxid) +
", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
}
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) {
// Verify if there is any change in the proposed leader
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){
recvqueue.put(n);
break;
}
}
/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
if (n == null) {
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(proposedLeader,
proposedZxid, proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
case OBSERVING:
LOG.debug("Notification from observer: " + n.sid);
break;
case FOLLOWING:
case LEADING:
/*
* Consider all notifications from the same epoch
* together.
*/
if(n.electionEpoch == logicalclock.get()){
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
if(termPredicate(recvset, new Vote(n.leader,
n.zxid, n.electionEpoch, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, n.electionEpoch)) {
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
/*
* Before joining an established ensemble, verify that
* a majority are following the same leader.
* Only peer epoch is used to check that the votes come
* from the same ensemble. This is because there is at
* least one corner case in which the ensemble can be
* created with inconsistent zxid and election epoch
* info. However, given that only one ensemble can be
* running at a single point in time and that each
* epoch is used only once, using only the epoch to
* compare the votes is sufficient.
*
* @see https://issues.apache.org/jira/browse/ZOOKEEPER-1732
*/
outofelection.put(n.sid, new Vote(n.leader,
IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state));
if (termPredicate(outofelection, new Vote(n.leader,
IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, IGNOREVALUE)) {
synchronized(this){
logicalclock.set(n.electionEpoch);
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
}
Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
default:
LOG.warn("Notification state unrecoginized: " + n.state
+ " (n.state), " + n.sid + " (n.sid)");
break;
}
} else {
LOG.warn("Ignoring notification from non-cluster member " + n.sid);
}
}
return null;
} finally {
try {
if(self.jmxLeaderElectionBean != null){
MBeanRegistry.getInstance().unregister(
self.jmxLeaderElectionBean);
}
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
self.jmxLeaderElectionBean = null;
LOG.debug("Number of connection processing threads: {}",
manager.getConnectionThreadCount());
}
}
首先記錄下選舉開始的時間,在選舉之後開始正式lead的工作時會計算兩者的間隔作為選舉的持續時間。
之後,在進入選舉前,給代表當前輪數的引數logicalclock加一,代表在先前輪數的基礎上加一表示新的一輪選舉。
之後初始化自己的選票,第一次的情況下,選舉自己為所要投票的leader。
之後呼叫sendNotifications()方法傳送給所有參與投票的伺服器。
private void sendNotifications() {
for (long sid : self.getCurrentAndNextConfigVoters()) {
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(ToSend.mType.notification,
proposedLeader,
proposedZxid,
logicalclock.get(),
QuorumPeer.ServerState.LOOKING,
sid,
proposedEpoch, qv.toString().getBytes());
if(LOG.isDebugEnabled()){
LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" +
Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get()) +
" (n.round), " + sid + " (recipient), " + self.getId() +
" (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
}
sendqueue.offer(notmsg);
}
}
這個方法便利所有配置了的投票伺服器,構造訊息傳送自己的選票給他們,當然這裡只是交給了傳送訊息的佇列。
之後如果自己的狀態還是looking,那麼開始根據收到的訊息進行解析,並進行相應的操作。
從接收佇列中選取訊息之後,首先確認訊息發出方是參與投票的伺服器,如果不是,則在這裡不予理會,相應的措施已經在剛剛的WorkerReceiver中完成。
在確認了訊息的發出方是參與投票的伺服器之後,根據其狀態進行相應的操作。
首先如果是looking狀態,那麼比較訊息的輪數與當前伺服器的輪數進行比較,如果接收到的訊息的輪數大於當前伺服器的輪數,那麼可能說明當前伺服器因為各種原因宕機了一段時間,那麼此時說明該伺服器應該放棄當前的投票選擇,並且將剛剛收到的輪數更大的選票作為自己新一輪要發給所有伺服器的選票。這樣,選票的內容發生了改變。同時,清空自己用來儲存所有已經接收到的選票的Map,重新開始接收選票。
如果接收到的訊息的輪數小於當前輪數,那麼可以直接忽略了。
如果兩者訊息的輪數一樣,那麼通過totalOrderPredicate()方法,進行訊息中引數的比較。
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +
Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
if(self.getQuorumVerifier().getWeight(newId) == 0){
return false;
}
/*
* We return true if one of the following three cases hold:
* 1- New epoch is higher
* 2- New epoch is the same as current epoch, but new zxid is higher
* 3- New epoch is the same as current epoch, new zxid is the same
* as current zxid, but server id is higher.
*/
return ((newEpoch > curEpoch) ||
((newEpoch == curEpoch) &&
((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}
首先比較輪數,之後在輪數一樣的前提下比較zxid,再再前者一樣的前提下比較投票的伺服器id大小,在前面三輪,如果接收到的訊息先出現較大者,那麼更新自己的選票為接收到的選票內容,併發給所有參與選舉過程的伺服器。
之後將接收到的訊息的發出方的id作為key,收到的選票作為value儲存在map中。
之後由於一條接收到的訊息的處理已經完成,通過termPredicate()方法開始檢驗是否有必要確認選舉結果的產生。
private boolean termPredicate(Map<Long, Vote> votes, Vote vote) {
SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
voteSet.addQuorumVerifier(self.getQuorumVerifier());
if (self.getLastSeenQuorumVerifier() != null
&& self.getLastSeenQuorumVerifier().getVersion() > self
.getQuorumVerifier().getVersion()) {
voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}
/*
* First make the views consistent. Sometimes peers will have different
* zxids for a server depending on timing.
*/
for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
if (vote.equals(entry.getValue())) {
voteSet.addAck(entry.getKey());
}
}
return voteSet.hasAllQuorums();
}
這裡,以自己當前選票中投票給的leader作為參照,遍歷接收到的訊息集合,如果與自己選擇相同的伺服器個數超過參與選票的伺服器總數,那麼就確認了當前伺服器的選舉結果產生,結果就是自己所投票的那個伺服器。
此時,並不急著結束自己的選舉流程,在在一定timeout的情況下去等待相應的時間繼續去從接收佇列去取,這一期間如果自己的選票被更新,那麼就要從新進入迴圈,把新訊息放回佇列,繼續之前的選舉流程,而要是不會更新自己選票的訊息,那麼無視,繼續等待一定的timeout。
如果在一定的timeout之後,成功沒有接受到新的訊息,宣告自己的選舉過程結束,如果自己當前的投票物件是自己,那麼當前伺服器成為leader,否則成為learner。
之前的情況是發生在接受到的訊息發出方是lookong狀態的情況下,如果是leading,那麼說明此時leader已經被選出,直接根據收到的選票更新自己的角色完成選舉流程。