1. 程式人生 > 程式設計 >聊聊nacos RaftCore的MasterElection

聊聊nacos RaftCore的MasterElection

本文主要研究一下nacos RaftCore的MasterElection

RaftCore

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftCore.java

@Component
public class RaftCore {
	
	//......

    @PostConstruct
    public void init() throws Exception {

        Loggers.RAFT.info("initializing Raft sub-system"
); executor.submit(notifier); long start = System.currentTimeMillis(); raftStore.loadDatums(notifier,datums); setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"),0L)); Loggers.RAFT.info("cache loaded,datum count: {},current term: {}",datums.size(),peers.getTerm()); while
(true) { if (notifier.tasks.size() <= 0) { break; } Thread.sleep(1000L); } initialized = true; Loggers.RAFT.info("finish to load data from disk,cost: {} ms.",(System.currentTimeMillis() - start)); GlobalExecutor.registerMasterElection(new MasterElection()); GlobalExecutor.registerHeartbeat(new HeartBeat()); Loggers.RAFT.info("timer started: leader timeout ms: {},heart-beat timeout ms: {}"
,GlobalExecutor.LEADER_TIMEOUT_MS,GlobalExecutor.HEARTBEAT_INTERVAL_MS); } //...... } 複製程式碼
  • RaftCore的init方法通過GlobalExecutor.registerMasterElection(new MasterElection())註冊了MasterElection

GlobalExecutor

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/misc/GlobalExecutor.java

public class GlobalExecutor {

	//......

    public static final long TICK_PERIOD_MS = TimeUnit.MILLISECONDS.toMillis(500L);

    public static void registerMasterElection(Runnable runnable) {
        executorService.scheduleAtFixedRate(runnable,TICK_PERIOD_MS,TimeUnit.MILLISECONDS);
    }

    //......
}
複製程式碼
  • registerMasterElection方法每隔TICK_PERIOD_MS毫秒排程一次runnable

MasterElection

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftCore.java

    public class MasterElection implements Runnable {
        @Override
        public void run() {
            try {

                if (!peers.isReady()) {
                    return;
                }

                RaftPeer local = peers.local();
                local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;

                if (local.leaderDueMs > 0) {
                    return;
                }

                // reset timeout
                local.resetLeaderDue();
                local.resetHeartbeatDue();

                sendVote();
            } catch (Exception e) {
                Loggers.RAFT.warn("[RAFT] error while master election {}",e);
            }

        }

        public void sendVote() {

            RaftPeer local = peers.get(NetUtils.localServer());
            Loggers.RAFT.info("leader timeout,start voting,leader: {},term: {}",JSON.toJSONString(getLeader()),local.term);

            peers.reset();

            local.term.incrementAndGet();
            local.voteFor = local.ip;
            local.state = RaftPeer.State.CANDIDATE;

            Map<String,String> params = new HashMap<>(1);
            params.put("vote",JSON.toJSONString(local));
            for (final String server : peers.allServersWithoutMySelf()) {
                final String url = buildURL(server,API_VOTE);
                try {
                    HttpClient.asyncHttpPost(url,null,params,new AsyncCompletionHandler<Integer>() {
                        @Override
                        public Integer onCompleted(Response response) throws Exception {
                            if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                                Loggers.RAFT.error("NACOS-RAFT vote failed: {},url: {}",response.getResponseBody(),url);
                                return 1;
                            }

                            RaftPeer peer = JSON.parseObject(response.getResponseBody(),RaftPeer.class);

                            Loggers.RAFT.info("received approve from peer: {}",JSON.toJSONString(peer));

                            peers.decideLeader(peer);

                            return 0;
                        }
                    });
                } catch (Exception e) {
                    Loggers.RAFT.warn("error while sending vote to server: {}",server);
                }
            }
        }
    }
複製程式碼
  • MasterElection實現了Runnable方法,其run方法在peers都是ready而且local.leaderDueMs減去TICK_PERIOD_MS小於等於0的時候會開始選舉;它首先resetLeaderDue及resetHeartbeatDue,然後執行sendVote方法;sendVote方法首先重置peers,遞增localPeer的term,並設定voteFor為自己,然後更新state為RaftPeer.State.CANDIDATE,最後遍歷peers.allServersWithoutMySelf(),將自己的vote資訊非同步post給其他peer;如果其他peer返回成功則執行peers.decideLeader(peer),返回1,否則返回0

小結

RaftCore的init方法通過GlobalExecutor.registerMasterElection(new MasterElection())註冊了MasterElection;registerMasterElection方法每隔TICK_PERIOD_MS毫秒排程一次;MasterElection實現了Runnable方法,其run方法在peers都是ready而且local.leaderDueMs減去TICK_PERIOD_MS小於等於0的時候會開始選舉;它首先resetLeaderDue及resetHeartbeatDue,然後執行sendVote方法

doc