zookeeper原始碼淺析(二)之Leader選擇
阿新 • • 發佈:2019-01-05
1.入口函式QuorumPeerMain主執行緒啟動
Quorumpeermain.runfromconfig()程式碼
- public void runFromConfig(QuorumPeerConfig config) throws IOException {
- ......
- LOG.info("Starting quorum peer"
- try {
- //對client提供讀寫的server,一般是2181埠
- ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
- cnxnFactory.configure(config.getClientPortAddress(),
- config.getMaxClientCnxns());
- //zk的邏輯主執行緒,負責選舉,投票等
- quorumPeer = new QuorumPeer();
- quorumPeer.setClientPortAddress(config.getClientPortAddress());
- quorumPeer.setTxnFactory(new FileTxnSnapLog(
- new File(config.getDataLogDir()),
- new File(config.getDataDir())));
- //叢集機器地址
- quorumPeer.setQuorumPeers(config.getServers());
- quorumPeer.setElectionType(config.getElectionAlg());
- //本機的叢集編號
- quorumPeer.setMyid(config.getServerId());
- quorumPeer.setTickTime(config.getTickTime());
- quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
- quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
- quorumPeer.setInitLimit(config.getInitLimit());
- quorumPeer.setSyncLimit(config.getSyncLimit());
- //投票決定方式,預設超過半數就通過
- quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
- quorumPeer.setCnxnFactory(cnxnFactory);
- quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
- quorumPeer.setLearnerType(config.getPeerType());
- //啟動主執行緒
- quorumPeer.start();
- quorumPeer.join();
- } catch (InterruptedException e) {
- // warn, but generally this is ok
- LOG.warn("Quorum Peer interrupted", e);
- }
- }
2.QuorumPeer複寫Thread.start方法,啟動
Quorumpeer.start()程式碼- @Override
- public synchronized void start() {
- //恢復DB,從zxid中回覆epoch變數,代表投票輪數
- loadDataBase();
- //啟動針對client的IO執行緒
- cnxnFactory.start();
- //選舉初始化,主要是從配置獲取選舉型別
- startLeaderElection();
- //啟動
- super.start();
- }
3.通過QuorumPeer.loadDataBase()載入資料,初始化zkDb、currentEpoch、acceptedEpoch。
Quorumpeer.loaddatabase()程式碼- private void loadDataBase() {
- try {
- //從本地檔案恢復db
- zkDb.loadDataBase();
- // load the epochs
- //從最新的zxid恢復epoch變數,zxid64位,前32位是epoch值,後32位是zxid
- long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
- long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
- try {
- currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
- } catch(FileNotFoundException e) {
- .....
- }
- if (epochOfZxid > currentEpoch) {
- throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + ", is older than the last zxid, " + lastProcessedZxid);
- }
- try {
- acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
- } catch(FileNotFoundException e) {
- .....
- }
- if (acceptedEpoch < currentEpoch) {
- throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + " is less than the accepted epoch, " + ZxidUtils.zxidToString(acceptedEpoch));
- }
- } catch(IOException ie) {
- .....
- }
- }
4.通過QuorumPeer.startLeaderElection()初始化electionAlg、currentVote。
Quorumpeer.startleaderelection()程式碼- synchronized public void startLeaderElection() {
- try {
- //先投自己
- currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
- } catch(IOException e) {
- RuntimeException re = new RuntimeException(e.getMessage());
- re.setStackTrace(e.getStackTrace());
- throw re;
- }
- //從配置中拿自己的選舉地址
- for (QuorumServer p : getView().values()) {
- if (p.id == myid) {
- myQuorumAddr = p.addr;
- break;
- }
- }
- .....
- this.electionAlg = createElectionAlgorithm(electionType);
- }
5.獲取選舉演算法,預設為FastLeaderElection演算法。從3.4.0版本開始,zookeeper廢棄了0,1,2三種演算法。
Quorumpeer.createelectionalgorithm()程式碼- protected Election createElectionAlgorithm(int electionAlgorithm){
- Election le=null;
- //TODO: use a factory rather than a switch
- switch (electionAlgorithm) {
- case 0:
- le = new LeaderElection(this);
- break;
- case 1:
- le = new AuthFastLeaderElection(this);
- break;
- case 2:
- le = new AuthFastLeaderElection(this, true);
- break;
- case 3:
- //leader選舉IO負責類
- qcm = new QuorumCnxManager(this);
- QuorumCnxManager.Listener listener = qcm.listener;
- //啟動已繫結配置的選舉埠的選舉執行緒,等待叢集其他機器連線
- //例如配置檔案中配置了server.1=hadoop1:2888:3888則server.1的選舉埠為3888,2888是其leader和其他伺服器交換資訊的埠
- //配置檔案詳見QuorumPeerConfig.parseProperties()方法
- if(listener != null){
- listener.start();
- //基於TCP的選舉演算法
- le = new FastLeaderElection(this, qcm);
- } else {
- LOG.error("Null listener when initializing cnx manager");
- }
- break;
- default:
- assert false;
- }
- return le;
- }
- private void starter(QuorumPeer self, QuorumCnxManager manager) {
- this.self = self;
- proposedLeader = -1;
- proposedZxid = -1;
- //業務層傳送佇列,業務物件ToSend
- sendqueue = new LinkedBlockingQueue<ToSend>();
- //業務層接受佇列,業務物件Notificataion
- recvqueue = new LinkedBlockingQueue<Notification>();
- //Messenger包含WorkerSender和WorkerReceiver執行緒
- //WorkerSender業務層傳送執行緒,將訊息發給IO負責類QuorumCnxManager
- //WorkerReceiver業務層接受執行緒,從IO負責類QuorumCnxManager接受訊息
- this.messenger = new Messenger(manager);
- }
- @Override
- public void run() {
- .....
- try {
- /*
- * Main loop
- */
- while (running) {
- switch (getPeerState()) {
- //如果狀態是LOOKING,則進入選舉流程
- case LOOKING:
- LOG.info("LOOKING");
- .....
- try {
- roZkMgr.start();
- //選舉演算法開始選舉
- setCurrentVote(makeLEStrategy().lookForLeader());
- } catch (Exception e) {
- LOG.warn("Unexpected exception",e);
- setPeerState(ServerState.LOOKING);
- } finally {
- // If the thread is in the the grace period, interrupt
- // to come out of waiting.
- roZkMgr.interrupt();
- roZk.shutdown();
- }
- } else {
- try {
- setCurrentVote(makeLEStrategy().lookForLeader());
- } catch (Exception e) {
- LOG.warn("Unexpected exception", e);
- setPeerState(ServerState.LOOKING);
- }
- }
- break;
- //當選舉完成會改變相應的狀態,並建立相應的物件
- case OBSERVING:
- try {
- LOG.info("OBSERVING");
- setObserver(makeObserver(logFactory));
- observer.observeLeader();
- } catch (Exception e) {
- LOG.warn("Unexpected exception",e );
- } finally {
- observer.shutdown();
- setObserver(null);
- setPeerState(ServerState.LOOKING);
- }
- break;
- case FOLLOWING:
- try {
- LOG.info("FOLLOWING");
- setFollower(makeFollower(logFactory));
- follower.followLeader();
- } catch (Exception e) {
- LOG.warn("Unexpected exception",e);
- } finally {
- follower.shutdown();
- setFollower(null);
- setPeerState(ServerState.LOOKING);
- }
- break;
- case LEADING:
- LOG.info("LEADING");
- try {
- setLeader(makeLeader(logFactory));
- leader.lead();
- setLeader(null);
- } catch (Exception e) {
- LOG.warn("Unexpected exception",e);
- } finally {
- if (leader != null) {
- leader.shutdown("Forcing shutdown");
- setLeader(null);
- }
- setPeerState(ServerState.LOOKING);
- }
- break;
- }
- }
- } finally {
- .....
- }
- }
8.FastLeaderElection的選舉流程
Fastleaderelection.lookforleader()程式碼- public Vote lookForLeader() throws InterruptedException {
- .....
- try {
- //收到的投票
- HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
- HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
- int notTimeout = finalizeWait;
- synchronized(this){
- logicalclock++;
- //先投給自己
- updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
- }
- LOG.info("New election. My id = " + self.getId() +
- ", proposed zxid=0x" + Long.toHexString(proposedZxid));
- //傳送投票,包括髮給自己
- sendNotifications();
- /*
- * Loop in which we exchange notifications until we find a leader
- */
- //主迴圈,直到選出leader
- while ((self.getPeerState() == ServerState.LOOKING) &&
- (!stop)){
- /*
- * Remove next notification from queue, times out after 2 times
- * the termination time
- */
- //從IO執行緒裡拿到投票訊息,自己的投票也在這裡處理
- Notification n = recvqueue.poll(notTimeout,
- TimeUnit.MILLISECONDS);
- /*
- * Sends more notifications if haven't received enough.
- * Otherwise processes new notification.
- */
- //如果空閒
- if(n == null){
- //訊息發完了,繼續傳送,一直到選出leader為止
- if(manager.haveDelivered()){
- sendNotifications();
- } else {
- //訊息還在,可能其他server還沒啟動,嘗試連線
- manager.connectAll();
- }
- /*
- * Exponential backoff
- */
- //延長超時時間
- int tmpTimeOut = notTimeout*2;
- notTimeout = (tmpTimeOut < maxNotificationInterval?
- tmpTimeOut : maxNotificationInterval);
- LOG.info("Notification time out: " + notTimeout);
- }
- //收到了投票訊息
- else if(self.getVotingView().containsKey(n.sid)) {
- /*
- * Only proceed if the vote comes from a replica in the
- * voting view.
- */
- switch (n.state) {
- //LOOKING訊息,則
- case LOOKING:
- //檢查下收到的這張選票是否可以勝出,依次比較選舉輪數epoch,事務zxid,伺服器編號server id
- // If notification > current, replace and send messages out
- if (n.electionEpoch > logicalclock) {
- logicalclock = n.electionEpoch;
- recvset.clear();
- if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
- getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
- updateProposal(n.leader, n.zxid, n.peerEpoch);
- } else {
- updateProposal(getInitId(),
- getInitLastLoggedZxid(),
- getPeerEpoch());
- }
- sendNotifications();
- } else if (n.electionEpoch < logicalclock) {
- if(LOG.isDebugEnabled()){
- LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
- + Long.toHexString(n.electionEpoch)
- + ", logicalclock=0x" + Long.toHexString(logicalclock));
- }
- break;
- } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
- proposedLeader, proposedZxid, proposedEpoch)) {
- //勝出了,就把自己的投票修改為對方的,然後廣播訊息
- updateProposal(n.leader, n.zxid, n.peerEpoch);
- sendNotifications();
- }
- .....
- //新增到本機投票集合,用來做選舉終結判斷
- recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
- //選舉是否結束,預設演算法是超過半數server同意
- if (termPredicate(recvset,
- new Vote(proposedLeader, proposedZxid,
- logicalclock, proposedEpoch))) {
- // Verify if there is any change in the proposed leader
- while((n = recvqueue.poll(finalizeWait,
- TimeUnit.MILLISECONDS)) != null){
- if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
- proposedLeader, proposedZxid, proposedEpoch)){
- recvqueue.put(n);
- break;
- }
- }
- /*
- * This predicate is true once we don't read any new
- * relevant message from the reception queue
- */
- if (n == null) {
- //修改狀態,LEADING or FOLLOWING
- self.setPeerState((proposedLeader == self.getId()) ?
- ServerState.LEADING: learningState());
- //返回最終的選票結果
- Vote endVote = new Vote(proposedLeader,
- proposedZxid, proposedEpoch);
- leaveInstance(endVote);
- return endVote;
- }
- }
- break;
- //OBSERVING機器不引數選舉
- &n