Zookeeper原始碼分析(三)-Leader的選舉
阿新 • • 發佈:2019-02-06
Zookeeper在原始碼中選舉可分為兩步
1.startLeaderElection();//建立選舉演算法
synchronized public void startLeaderElection() {
try {
/**
*建立一個投票自己的投票物件(為後面第一次給自己投票做準備)
*/
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
} catch(IOException e) {
//...
}
//遍歷配置中獲取的伺服器,根據myid獲取自身的選舉地址
for (QuorumServer p : getView().values()) {
if (p.id == myid) {
myQuorumAddr = p.addr;
break;
}
}
//...
//根據配置的選舉型別(通過zoo.cfg檔案中electionAlg進行配置),預設為3,獲取選舉演算法
this .electionAlg = createElectionAlgorithm(electionType);
}
接下來看createElectionAlgorithm方法
protected Election createElectionAlgorithm(int electionAlgorithm){
Election le=null;
//TODO: use a factory rather than a switch
switch (electionAlgorithm) {
//...
case 3: //預設為3,所以對此進行分析
//建立一個基於tcp連線進行選主的管理器,裡面有佇列進行訊息儲存,有SendWorker和RecvWorker進行訊息的傳送和接收
qcm = new QuorumCnxManager(this);
QuorumCnxManager.Listener listener = qcm.listener;
//開啟配置過的選舉埠進行監聽
if(listener != null){
listener.start();
//建立選舉演算法
le = new FastLeaderElection(this, qcm);
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;
}
看一下QuorumCnxManager類
public QuorumCnxManager(QuorumPeer self) {
this.recvQueue = new ArrayBlockingQueue<Message>(RECV_CAPACITY);
this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();
this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();
this.lastMessageSent = new ConcurrentHashMap<Long, ByteBuffer>();
String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
if(cnxToValue != null){
this.cnxTO = new Integer(cnxToValue);
}
this.self = self;
// 設定選舉埠的監聽(等待客戶端的連線),通過SendWorker和RecvWorker進行客戶端和伺服器的資訊交換
listener = new Listener();
}
然後看看Listener的具體實現
public class Listener extends Thread {
volatile ServerSocket ss = null;
@Override
public void run() {
int numRetries = 0;
InetSocketAddress addr;
while((!shutdown) && (numRetries < 3)){
try {
// 建立服務端的連線
ss = new ServerSocket();
ss.setReuseAddress(true);
if (self.getQuorumListenOnAllIPs()) {
int port = self.quorumPeers.get(self.getId()).electionAddr.getPort();
addr = new InetSocketAddress(port);
} else {
addr = self.quorumPeers.get(self.getId()).electionAddr;
}
LOG.info("My election bind port: " + addr.toString());
setName(self.quorumPeers.get(self.getId()).electionAddr
.toString());
//繫結選舉地址
ss.bind(addr);
while (!shutdown) {
//等待客戶端的接入
Socket client = ss.accept();
setSockOpts(client);
LOG.info("Received connection request "
+ client.getRemoteSocketAddress());
//客戶端接入後的資訊交換通過SenderWorker和RecvWorker
receiveConnection(client);
numRetries = 0;
}
} catch (IOException e) {
}
// ....
然後看一下receiveConnection
public boolean receiveConnection(Socket sock) {
Long sid = null;
try {
// 讀取收到客戶端的sid
DataInputStream din = new DataInputStream(sock.getInputStream());
sid = din.readLong();
if (sid < 0) {
sid = din.readLong();
//...
}
} catch (IOException e) {
}
if (sid < self.getId()) {//sid如果比本機伺服器的sid小,則刪除senderWorkerMap裡的SendWorker記錄
SendWorker sw = senderWorkerMap.get(sid);
if (sw != null) {
sw.finish();
}
LOG.debug("Create new connection to server: " + sid);
//關掉與客戶端的連線
closeSocket(sock);
// 建立與sid的伺服器的連線
connectOne(sid);
} else { //否則則在senderWorkerMap裡更新SendWorker ,開啟SendWorker 和RecvWorker 進行資料的傳送和接收
SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
if(vsw != null)
vsw.finish();
senderWorkerMap.put(sid, sw);
if (!queueSendMap.containsKey(sid)) {
queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
SEND_CAPACITY));
}
sw.start();
rw.start();
return true;
}
return false;
}
然後看一下 connectOne
synchronized void connectOne(long sid){
if (senderWorkerMap.get(sid) == null){
InetSocketAddress electionAddr;
if (self.quorumPeers.containsKey(sid)) {
electionAddr = self.quorumPeers.get(sid).electionAddr;
} else {
LOG.warn("Invalid server id: " + sid);
return;
}
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Opening channel to server " + sid);
}
Socket sock = new Socket();
setSockOpts(sock);
//新建一個socket連線該sid的選舉地址。
sock.connect(self.getView().get(sid).electionAddr, cnxTO);
if (LOG.isDebugEnabled()) {
LOG.debug("Connected to server " + sid);
}
initiateConnection(sock, sid);
}
//...
}
然後看一下 initiateConnection
public boolean initiateConnection(Socket sock, Long sid) {
DataOutputStream dout = null;
try {
// 建立一個socket封裝本伺服器的sid資訊,傳送給傳入sid的伺服器,然偶關掉本socket
dout = new DataOutputStream(sock.getOutputStream());
dout.writeLong(self.getId());
dout.flush();
} catch (IOException e) {
LOG.warn("Ignoring exception reading or writing challenge: ", e);
closeSocket(sock);
return false;
}
// If lost the challenge, then drop the new connection
if (sid > self.getId()) {
LOG.info("Have smaller server identifier, so dropping the " +
"connection: (" + sid + ", " + self.getId() + ")");
closeSocket(sock);
// Otherwise proceed with the connection
} else {
//更新新的SendWorker 和RecvWorker
SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
if(vsw != null)
vsw.finish();
senderWorkerMap.put(sid, sw);
if (!queueSendMap.containsKey(sid)) {
queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
SEND_CAPACITY));
}
sw.start();
rw.start();
return true;
}
return false;
}
FastLeaderElection方法中有個starter(self, manager)
private void starter(QuorumPeer self, QuorumCnxManager manager) {
this.self = self;
proposedLeader = -1;
proposedZxid = -1;
//傳送佇列,維護了ToSend物件,即要傳送的東西,包括通知(Notification)和許可權(ACK)
sendqueue = new LinkedBlockingQueue<ToSend>();
//接收佇列,維護了通知(Notification)物件,裡面包含了伺服器狀態(LOOKING or FOLLOWER)
recvqueue = new LinkedBlockingQueue<Notification>();
//開啟WorkerSender和WorkerReceiver執行緒
this.messenger = new Messenger(manager);
}
然後看一下WorkerReceiver的執行緒
public void run() {
Message response;
while (!stop) {
try{
/*
*從quorumCnxManager的recvQueue中取出一條message
*/
response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
if(response == null) continue;
/*
*如果是Observer,則返回當前選舉結果,封裝成ToSend物件放入sendqueue中
*/
if(!self.getVotingView().containsKey(response.sid)){
Vote current = self.getCurrentVote();
ToSend notmsg = new ToSend(ToSend.mType.notification,
current.getId(),
current.getZxid(),
logicalclock,
self.getPeerState(),
response.sid,
current.getPeerEpoch());
sendqueue.offer(notmsg);
} else {
/*
*如果不是則封裝成Notification物件
*/
Notification n = new Notification();
// State of peer that sent this message
QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
switch (response.buffer.getInt()) {
case 0:
ackstate = QuorumPeer.ServerState.LOOKING;
break;
case 1:
ackstate = QuorumPeer.ServerState.FOLLOWING;
break;
case 2:
ackstate = QuorumPeer.ServerState.LEADING;
break;
case 3:
ackstate = QuorumPeer.ServerState.OBSERVING;
break;
default:
continue;
}
n.leader = response.buffer.getLong();
n.zxid = response.buffer.getLong();
n.electionEpoch = response.buffer.getLong();
n.state = ackstate;
n.sid = response.sid;
if(!backCompatibility){
n.peerEpoch = response.buffer.getLong();
} else {
if(LOG.isInfoEnabled()){
LOG.info("Backward compatibility mode, server id=" + n.sid);
}
n.peerEpoch = ZxidUtils.getEpochFromZxid(n.zxid);
}
/*
* Version added in 3.4.6
*/
n.version = (response.buffer.remaining() >= 4) ?
response.buffer.getInt() : 0x0;
/*
* Print notification info
*/
if(LOG.isInfoEnabled()){
printNotification(n);
}
/*
* 如果quorumpeer自己也是looking狀態,這將Notification物件放入recvqueue佇列中
*/
if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
recvqueue.offer(n);
/*
* Send a notification back if the peer that sent this
* message is also looking and its logical clock is
* lagging behind.
*/
if((ackstate == QuorumPeer.ServerState.LOOKING)
&& (n.electionEpoch < logicalclock)){
Vote v = getVote();
ToSend notmsg = new ToSend(ToSend.mType.notification,
v.getId(),
v.getZxid(),
logicalclock,
self.getPeerState(),
response.sid,
v.getPeerEpoch());
sendqueue.offer(notmsg);
}
} else {
/*
* 如果自己不是looking,而該message即對方server是looking,將當前選舉結果封裝成ToSend,放入傳送佇列sendqueue
*/
Vote current = self.getCurrentVote();
if(ackstate == QuorumPeer.ServerState.LOOKING){
if(LOG.isDebugEnabled()){
LOG.debug("Sending new notification. My id = " +
self.getId() + " recipient=" +
response.sid + " zxid=0x" +
Long.toHexString(current.getZxid()) +
" leader=" + current.getId());
}
ToSend notmsg;
if(n.version > 0x0) {
notmsg = new ToSend(
ToSend.mType.notification,
current.getId(),
current.getZxid(),
current.getElectionEpoch(),
self.getPeerState(),
response.sid,
current.getPeerEpoch());
} else {
Vote bcVote = self.getBCVote();
notmsg = new ToSend(
ToSend.mType.notification,
bcVote.getId(),
bcVote.getZxid(),
bcVote.getElectionEpoch(),
self.getPeerState(),
response.sid,
bcVote.getPeerEpoch());
}
sendqueue.offer(notmsg);
}
}
}
} catch (InterruptedException e) {
System.out.println("Interrupted Exception while waiting for new message" +
e.toString());
}
}
LOG.info("WorkerReceiver is down");
}
接下來看WorkerSender的執行緒
public void run() {
while (!stop) {
try {
//從sendqueue裡獲取一個ToSend的物件
ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
if(m == null) continue;
process(m);
} catch (InterruptedException e) {
break;
}
}
LOG.info("WorkerSender is down");
}
-------------------------------------------------------
看process(m)
void process(ToSend m) {
//將ToSend物件轉換為ByteBuffer物件
ByteBuffer requestBuffer = buildMsg(m.state.ordinal(),
m.leader,
m.zxid,
m.electionEpoch,
m.peerEpoch);
//通過QuorumCnxManager物件傳送
manager.toSend(m.sid, requestBuffer);
}
-------------------------------------------------------
看 toSend(m.sid, requestBuffer) 方法
public void toSend(Long sid, ByteBuffer b) {
//如果是自己的話直接加入recvqueue佇列中
if (self.getId() == sid) {
b.position(0);
addToRecvQueue(new Message(b.duplicate(), sid));
} else {//如果不是自己的話
// 存入到queueSendMap中
if (!queueSendMap.containsKey(sid)) {
ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(
SEND_CAPACITY);
queueSendMap.put(sid, bq);
addToSendQueue(bq, b);
} else {
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
if(bq != null){
addToSendQueue(bq, b);
} else {
LOG.error("No queue for server " + sid);
}
}
// 連線該sid的伺服器選舉埠,初始化連線
connectOne(sid);
}
}
2.super.start();//啟動quorumPeer執行緒,進行投票選舉
public void run() {
/*
*省略不介紹
*/
try {
/*
* Main loop
*/
while (running) {
switch (getPeerState()) {
case LOOKING://如果狀態是LOOKING,則進入這裡
LOG.info("LOOKING");
//...
try {
// roZkMgr.start();
//setBCVote(null);
//選舉演算法這裡才開始選舉,之前只是初始化,並沒有開始
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
setPeerState(ServerState.LOOKING);
} finally {
//...
}
} else {
try {
setBCVote(null);
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
}
}
break;
case OBSERVING:
try {
LOG.info("OBSERVING");
setObserver(makeObserver(logFactory));
observer.observeLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e );
} finally {
observer.shutdown();
setObserver(null);
setPeerState(ServerState.LOOKING);
}
break;
case FOLLOWING:
try {
LOG.info("FOLLOWING");
setFollower(makeFollower(logFactory));
follower.followLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
follower.shutdown();
setFollower(null);
setPeerState(ServerState.LOOKING);
}
break;
case LEADING:
LOG.info("LEADING");
try {
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
setPeerState(ServerState.LOOKING);
}
break;
}
}
} finally {
//...
}
}
public Vote lookForLeader() throws InterruptedException {
//...
try {
/*
* 收到的投票
*/
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
int notTimeout = finalizeWait;
synchronized(this){
/* 跟新邏輯時鐘*/
logicalclock++;
/*
//更新投票資訊
//getInitId() 即是獲取選誰,id就是myid裡指定的那個數字,所以說是給自己投票
*/
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
LOG.info("New election. My id = " + self.getId() +
", proposed zxid=0x" + Long.toHexString(proposedZxid));
/*只是往傳送佇列(sendqueue)裡插入一條投票資訊,而WorkerSender負責從傳送佇列裡取一條投票,讓QuorumCnxManager物件進行傳送*/
sendNotifications();
/*迴圈交換通知,直到選出leader*/
while ((self.getPeerState() == ServerState.LOOKING) &&
(!stop)){
/*從recvqueue獲取一條收到的通知*/
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);
/*如果沒有通知就去判斷下所有傳送佇列是否還有訊息*/
if(n == null){
if(manager.haveDelivered()){
/*訊息發完了,繼續傳送,一直到選出leader為止 */
sendNotifications();
} else {
/*訊息還在,可能其他server還沒啟動,嘗試連線其他伺服器*/
manager.connectAll();
}
/*
* Exponential backoff
*/
int tmpTimeOut = notTimeout*2;
notTimeout = (tmpTimeOut < maxNotificationInterval?
tmpTimeOut : maxNotificationInterval);
LOG.info("Notification time out: " + notTimeout);
}
/*如果收到通知*/
else if(self.getVotingView().containsKey(n.sid)) {
switch (n.state) {
case LOOKING:
// If notification > current, replace and send messages out
if (n.electionEpoch > logicalclock) {
logicalclock = n.electionEpoch;
recvset.clear();
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
//檢查下收到的這張選票通知是否可以勝出,勝出則把自己的更新為對方的
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
//不勝出則更新自身資訊
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
//廣播出去,即插入sendqueue佇列中
sendNotifications();
} else if (n.electionEpoch < logicalclock) {
if(LOG.isDebugEnabled()){
LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
+ Long.toHexString(n.electionEpoch)
+ ", logicalclock=0x" + Long.toHexString(logicalclock));
}
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
if(LOG.isDebugEnabled()){
LOG.debug("Adding vote: from=" + n.sid +
", proposed leader=" + n.leader +
", proposed zxid=0x" + Long.toHexString(n.zxid) +
", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
}
//新增到本機投票集合,用來做選舉終結判斷
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
// 驗證是否該投票勝出,預設是QuorumMaj演算法(超過一半勝出)
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock, proposedEpoch))) {
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){
recvqueue.put(n);
break;
}
}
//修改狀態,LEADING or FOLLOWING
if (n == null) {
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
//返回最終選票結果
Vote endVote = new Vote(proposedLeader,
proposedZxid,
logicalclock,
proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
case OBSERVING:
LOG.debug("Notification from observer: " + n.sid);
break;
case FOLLOWING:
case LEADING:
/*
* Consider all notifications from the same epoch
* together.
*/
if(n.electionEpoch == logicalclock){
recvset.put(n.sid, new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch));
if(ooePredicate(recvset, outofelection, n)) {
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
/*
* Before joining an established ensemble, verify
* a majority is following the same leader.
*/
outofelection.put(n.sid, new Vote(n.version,
n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch,
n.state));
if(ooePredicate(outofelection, outofelection, n)) {
synchronized(this){
logicalclock = n.electionEpoch;
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
}
Vote endVote = new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
default:
LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)",
n.state, n.sid);
break;
}
} else {
LOG.warn("Ignoring notification from non-cluster member " + n.sid);
}
}
return null;
} finally {
// ...
}
}