1. 程式人生 > >經典分散式演算法 —— 淺顯易懂的 Raft 演算法實現

經典分散式演算法 —— 淺顯易懂的 Raft 演算法實現

一、Raft概念

copy一下其他小夥伴寫的文章: Raft演算法詳解

不同於Paxos演算法直接從分散式一致性問題出發推匯出來,Raft演算法則是從多副本狀態機的角度提出,用於管理多副本狀態機的日誌複製。Raft實現了和Paxos相同的功能,它將一致性分解為多個子問題:Leader選舉(Leader election)、日誌同步(Log replication)、安全性(Safety)、日誌壓縮(Log compaction)、成員變更(Membership change)等。同時,Raft演算法使用了更強的假設來減少了需要考慮的狀態,使之變的易於理解和實現。

Raft將系統中的角色分為領導者(Leader)、跟從者(Follower)和候選人(Candidate):

  • Leader:接受客戶端請求,並向Follower同步請求日誌,當日志同步到大多數節點上後告訴Follower提交日誌。
  • Follower:接受並持久化Leader同步的日誌,在Leader告之日誌可以提交之後,提交日誌。
  • Candidate:Leader選舉過程中的臨時角色。

本文不過多贅述 raft 演算法是個什麼東西... 這裡再貼一個十分好理解的文章:The Raft Consensus Algorithm


二、系統初步設計

在對raft有一定理解後,我們簡單梳理一下在raft選舉過程中,我們需要的一些角色,以及角色的司職。

首先我們需要一個選舉控制類,單例實現即可,節點的選舉全權交給此選舉控制類的實現,我們稱其為 ElectOperator。

先講一個 raft 中重要的概念:世代,也稱為 epoch,但在這篇文章,將其稱為 generation(不要糾結這個 = =)。 世代可以認為是一個標記當前傳送的操作是否有效的標識,如果收到了小於本節點世代的請求,則可無視其內容,如果收到了大於本世代的請求,則需要更新本節點世代,並重置自己的身份,變為 Follower,類似於樂觀鎖的設計理念。

我們知道,raft中一共有三種角色:Follower、Candidate、Leader

(1)Follower

Follower 需要做什麼呢:

  • 接收心跳
  • Follower 在 VOTES_BACK_OFF_MS 時間內,若沒有收到來自 Leader的心跳,則轉變為 Candidate
  • 接收拉票請求,並返回自己的投票

好的,Follower非常簡單,只需要做三件事即可。

(2)Candidate

Candidate 扮演什麼樣的職能呢:

  • 接收心跳
  • Candidate 在 VOTES_BACK_OFF_MS 時間內,若沒有收到來自 Leader的心跳,則轉變為 Candidate
  • 接收拉票請求,並返回自己的投票
  • 向叢集中的其他節點發起拉票請求
  • 當收到的投票大於半數( n/2 + 1, n為叢集內的節點數量),轉變為 Leader

Candidate 比起 Follower 稍微複雜一些,但前三件事情都是一樣的。

(3)Leader

Leader 在選舉過程中扮演的角色最為簡單:

  • 接收心跳
  • 向叢集內所有節點發送心跳

Leader 也是可以接收心跳的,當收到大於當前世代的心跳或請求後,Leader 需要轉變為 Follower。Leader 不可能收到同世代的心跳請求,因為 (1) 在 raft 演算法中,同一世代中,節點僅對同一個節點進行投票。(2) 需要收到過半投票才可以轉變為 Leader。


三、系統初步實現

簡單貼一下選舉控制器需要的一些屬性程式碼,下面的註釋都說的很清楚了,其中需要補充的一點是定時任務使用了時間輪來實現,不理解沒有關係...就是個定時任務,定時任務的一個引用放在 Map<TaskEnum, TimedTask> taskMap; 中,便於取消任務。

public class ElectOperator extends ReentrantLocker implements Runnable {
    // 成為 Candidate 的退避時間(真實退避時間需要 randomized to be between 150ms and 300ms )
    private static final long ELECTION_TIMEOUT_MS = ElectConfigHelper.getElectionTimeoutMs();

    // 心跳間隔
    private static final long HEART_BEAT_MS = ElectConfigHelper.getHeartBeatMs();

    /**
     * 該投票箱的世代資訊,如果一直進行選舉,一直能達到 {@link #ELECTION_TIMEOUT_MS},而選不出 Leader ,也需要15年,generation才會不夠用,如果
     * generation 的初始值設定為 Long.Min (現在是0,則可以撐30年,所以完全呆膠布)
     */
    private long generation;

    /**
     * 當前節點的角色
     */
    private NodeRole nodeRole;

    /**
     * 所有正在跑的定時任務
     */
    private Map<TaskEnum, TimedTask> taskMap;

    /**
     * 投票箱
     */
    private Map<String/* serverName */, Boolean> box;

    /**
     * 投票給了誰的投票記錄
     */
    private Votes voteRecord;

    /**
     * 快取一份叢集資訊,因為叢集資訊是可能變化的,我們要保證在一次選舉中,叢集資訊是不變的
     */
    private List<HanabiNode> clusters;

    /**
     * 心跳內容
     */
    private HeartBeat heartBeat;

    /**
     * 現在叢集的leader是哪個節點
     */
    private String leaderServerName;

    private volatile static ElectOperator INSTANCE;

    public static ElectOperator getInstance() {
        if (INSTANCE == null) {
            synchronized (ElectOperator.class) {
                if (INSTANCE == null) {
                    INSTANCE = new ElectOperator();
                    ElectControllerPool.execute(INSTANCE);
                }
            }
        }

        return INSTANCE;
    }

另外,上面羅列的這些值大都是需要在更新世代時重置的,我們先擬定一下更新世代的邏輯,通用的來講,就是清除投票記錄,清除自己的投票箱,更新自己的世代,身份變更為 Follower 等等,我們將這個方法稱為 init。

    /**
     * 初始化
     *
     * 1、成為follower
     * 2、先取消所有的定時任務
     * 3、重置本地變數
     * 4、新增成為Candidate的定時任務
     */
    private boolean init(long generation, String reason) {
        return this.lockSupplier(() -> {
            if (generation > this.generation) {// 如果有選票的世代已經大於當前世代,那麼重置投票箱
                logger.debug("初始化投票箱,原因:{}", reason);

                // 1、成為follower
                this.becomeFollower();

                // 2、先取消所有的定時任務
                this.cancelAllTask();

                // 3、重置本地變數
                logger.debug("更新世代:舊世代 {} => 新世代 {}", this.generation, generation);
                this.generation = generation;
                this.voteRecord = null;
                this.box = new HashMap<>();
                this.leaderServerName = null;

                // 4、新增成為Candidate的定時任務
                this.becomeCandidateAndBeginElectTask(this.generation);
                return true;
            } else {
                return false;
            }
        });
    }

(1) Follower的實現

基於上面的分析,我們可以歸納一下 Follower 需要一些什麼樣的方法:

1、轉變為 Candidate 的定時任務

實際上就是 randomized to be between 150ms and 300ms 後,如果沒收到 Leader 的心跳,或者自己變為 Candidate 後,在這個時間內沒有成功上位,則繼續轉變為 Candidate。

為什麼我們成為 Candidate 的退避時間需要隨機 150ms - 300ms呢?這是為了避免所有節點的選舉發起發生碰撞,如果說都是相同的退避時間,每個節點又會優先投自己一票,那麼這個集群系統就會陷入無限發起投票,但又無法成為 Leader 的局面。

簡而言之就是我們需要提供一個可重新整理的定時任務,如果在一定時間內沒重新整理這個任務,則節點轉變為 Candidate,併發起選舉,程式碼如下。首先取消之前的 becomeCandidate 定時定時任務,然後設定在 electionTimeout 後呼叫 beginElect(generation) 方法。

   /**
     * 成為候選者的任務,(重複呼叫則會取消之前的任務,收到來自leader的心跳包,就可以重置一下這個任務)
     *
     * 沒加鎖,因為這個任務需要頻繁被呼叫,只要收到leader來的訊息就可以呼叫一下
     */
    private void becomeCandidateAndBeginElectTask(long generation) {
        this.lockSupplier(() -> {
            this.cancelCandidateAndBeginElectTask("正在重置發起下一輪選舉的退避時間");

            // The election timeout is randomized to be between 150ms and 300ms.
            long electionTimeout = ELECTION_TIMEOUT_MS + (int) (ELECTION_TIMEOUT_MS * RANDOM.nextFloat());
            TimedTask timedTask = new TimedTask(electionTimeout, () -> this.beginElect(generation));
            Timer.getInstance()
                 .addTask(timedTask);

            taskMap.put(TaskEnum.BECOME_CANDIDATE, timedTask);
            return null;
        });
    }

2、接收心跳與心跳回復

接收心跳十分簡單,如果當前心跳大於等於當前世代,且還未認定某個節點為 Leader,則取消所有定時任務,成為Follower,並記錄心跳包中 Leader 節點的資訊,最後重置一下成為候選者的任務。

如果已經成為某個 Leader 的 Follower,則直接成為候選者的任務即可。

另外一個要注意的是,needToSendHeartBeatInfection,是否需要傳送心跳感染包,當收到低世代 Leader 的心跳時,如果當前叢集已經選出 Leader ,則回覆此心跳包,告訴舊 Leader,現在已經是新世代了!(程式碼中沒有展現,其實就是再次封裝一個心跳包,帶上世代資訊和 Leader 節點資訊,回覆給 Leader 即可)

    public void receiveHeatBeat(String leaderServerName, long generation, String msg) {
       return this.lockSupplier(() -> {
	    boolean needToSendHeartBeatInfection = true;
            // 世代大於當前世代
            if (generation >= this.generation) {
                needToSendHeartBeatInfection = false;

                if (this.leaderServerName == null) {
                    
                    logger.info("叢集中,節點 {} 已經成功在世代 {} 上位成為 Leader,本節點將成為 Follower,直到與 Leader 的網路通訊出現問題", leaderServerName, generation);

                    // 取消所有任務
                    this.cancelAllTask();

                    // 成為follower
                    this.becomeFollower();

                    // 將那個節點設為leader節點
                    this.leaderServerName = leaderServerName;
                }

                // 重置成為候選者任務
                this.becomeCandidateAndBeginElectTask(this.generation);
            }
            return null;
        });
    }

3、接收拉票請求與回覆投票

我們知道,raft 在一個世代只能投票給一個節點,且發起投票者會首先投票給自己。所以邏輯就很簡單了,只有當世代大於等於當前,且還未投票時,則拉票請求成功,返回true即可,否則都視為失敗,返回false。

    /**
     * 某個節點來請求本節點給他投票了,只有當世代大於當前世代,才有投票一說,其他情況都是失敗的
     *
     * 返回結果
     *
     * 為true代表接受投票成功。
     * 為false代表已經給其他節點投過票了,
     */
    public VotesResponse receiveVotes(Votes votes) {
        return this.lockSupplier(() -> {
            logger.debug("收到節點 {} 的投票請求,其世代為 {}", votes.getServerName(), votes.getGeneration());
            String cause = "";

            if (votes.getGeneration() < this.generation) {
                cause = String.format("投票請求 %s 世代小於當前世代 %s", votes.getGeneration(), this.generation);
            } else if (this.voteRecord != null) {
                cause = String.format("在世代 %s,本節點已投票給 => %s 節點", this.generation, this.voteRecord.getServerName());
            } else {
                this.voteRecord = votes; // 代表投票成功了
            }

            boolean result = votes.equals(this.voteRecord);

            if (result) {
                logger.debug("投票記錄更新成功:在世代 {},本節點投票給 => {} 節點", this.generation, this.voteRecord.getServerName());
            } else {
                logger.debug("投票記錄更新失敗:原因:{}", cause);
            }

            String serverName = InetSocketAddressConfigHelper.getServerName();
            return new VotesResponse(this.generation, serverName, result, serverName.equals(this.leaderServerName), votes.getGeneration());
        });
    }

(2) Candidate的實現

可以看出 Follower 十分簡單, Candidate 在 Follower 的基礎上增加了發起選舉的拉票請求,與接收投票,並上位成為Leader兩個功能,實際上也十分簡單。

1、發起拉票請求

回顧一下前面的轉變成 Candidate 的定時任務,定時任務實際上就是呼叫一個方法

TimedTask timedTask = new TimedTask(electionTimeout, () -> this.beginElect(generation));

這個 beginElect 就是轉變為 Candidate 併發起選舉的實現。讓我們先想想需要做什麼,首先肯定是

  1. 更新一下自己的世代,因為已經長時間沒收到 Leader 的心跳包了,我們需要自立門戶。
  2. 給自己投一票
  3. 要求其他節點給自己投票

分析到這裡就很明瞭了。下面首先執行 updateGeneration 方法,實際上就是執行前面所說的 init 方法,傳入 generation + 1 的世代,重置一下上個世代各種儲存的狀態;然後呼叫 becomeCandidate,實際上就是切換一下身份,將 Follower 或者 Candidate 切換為 Candidate;給自己的 voteRecord 投一票,最後帶上自己的節點標識和世代資訊,去拉票。

    /**
     * 開始進行選舉
     *
     * 1、首先更新一下世代資訊,重置投票箱和投票記錄
     * 2、成為候選者
     * 3、給自己投一票
     * 4、請求其他節點,要求其他節點給自己投票
     */
    private void beginElect(long generation) {
        this.lockSupplier(() -> {

            if (this.generation != generation) {// 存在這麼一種情況,雖然取消了選舉任務,但是選舉任務還是被執行了,所以這裡要多做一重處理,避免上個週期的任務被執行
                return null;
            }

            logger.info("Election Timeout 到期,可能期間內未收到來自 Leader 的心跳包或上一輪選舉沒有在期間內選出 Leader,故本節點即將發起選舉");

            updateGeneration("本節點發起了選舉");// this.generation ++

            // 成為候選者
            logger.info("本節點正式開始世代 {} 的選舉", this.generation);
            if (this.becomeCandidate()) {
                VotesResponse votes = new VotesResponse(this.generation, InetSocketAddressConfigHelper.getServerName(), true, false, this.generation);

                // 給自己投票箱投票
                this.receiveVotesResponse(votes);

                // 記錄一下,自己給自己投了票
                this.voteRecord = votes;

                // 讓其他節點給自己投一票
                this.askForVoteTask(new Votes(this.generation, InetSocketAddressConfigHelper.getServerName()), 0);
            }
            return null;
        });
    }

2、接收投票,併成為 Leader

如果說在 150ms and 300ms 之間,本節點收到了過半投票,則可上位成 Leader,否則定時任務會再次呼叫 beginElect,再次更新本節點世代,然後發起新一輪選舉。

接收投票其實十分簡單,回憶一下前面接收拉票請求與回覆投票,實際上就是拉票成功,就返回true,否則返回flase。

我們每次都判斷一下是否拿到過半的票數,如果拿到,則成為 Leader,另外有一個值得注意的是,為了加快叢集恢復可用的程序,類似於心跳感染(如果心跳發到Leader那裡去了,Leader會告訴本節點,它才是真正的Leader),投票也存在投票感染,下面的程式碼由 votesResponse.isFromLeaderNode() 來表示。

投票的記錄也是十分簡單,就是把每個投票記錄扔到 Map<String/* serverName */, Boolean> box; 裡,true 表示同意投給本節點,flase 則不同意,如果同意達到半數以上,則呼叫 becomeLeader 成為本世代 Leader。

    /**
     * 給當前節點的投票箱投票
     */
    public void receiveVotesResponse(VotesResponse votesResponse) {
        this.lockSupplier(() -> {

            if (votesResponse.isFromLeaderNode()) {
                logger.info("來自節點 {} 的投票應答表明其身份為 Leader,本輪拉票結束。", votesResponse.getServerName());
                this.receiveHeatBeat(votesResponse.getServerName(), votesResponse.getGeneration(),
                    String.format("收到來自 Leader 節點的投票應答,自動將其視為來自 Leader %s 世代 %s 節點的心跳包", heartBeat.getServerName(), votesResponse.getGeneration()));
            }

            if (this.generation > votesResponse.getAskVoteGeneration()) {// 如果選票的世代小於當前世代,投票無效
                logger.info("來自節點 {} 的投票應答世代是以前世代 {} 的選票,選票無效", votesResponse.getServerName(), votesResponse.getAskVoteGeneration());
                return null;
            }

            if (votesResponse.isAgreed()) {
                if (!voteSelf) {
                    logger.info("來自節點 {} 的投票應答有效,投票箱 + 1", votesResponse.getServerName());
                }

                // 記錄一下投票結果
                box.put(votesResponse.getServerName(), votesResponse.isAgreed());

                List<HanabiNode> hanabiNodeList = this.clusters;
                int clusterSize = hanabiNodeList.size();
                int votesNeed = clusterSize / 2 + 1;

                long voteCount = box.values()
                                    .stream()
                                    .filter(aBoolean -> aBoolean)
                                    .count();

                logger.info("叢集中共 {} 個節點,本節點當前投票箱進度 {}/{}", hanabiNodeList.size(), voteCount, votesNeed);

                // 如果獲得的選票已經大於了叢集數量的一半以上,則成為leader
                if (voteCount == votesNeed) {
                    logger.info("選票過半,準備上位成為 leader 節點", votesResponse.getServerName());
                    this.becomeLeader();
                }
            } else {
                logger.info("節點 {} 在世代 {} 的投票應答為:拒絕給本節點在世代 {} 的選舉投票(當前世代 {})", votesResponse.getServerName(), votesResponse.getGeneration(), votesResponse.getAskVoteGeneration(), this.generation);

                // 記錄一下投票結果
                box.put(votesResponse.getServerName(), votesResponse.isAgreed());
            }

            return null;
        });
    }

(3) Leader 的實現

作為 Leader,在 raft 中的實現卻是最簡單的,我們只需要給子節點發心跳包即可。然後如果收到大於自己世代的心跳感染,則成為新世代的 Follower,接收心跳的邏輯和 Follower 沒有區別。

    /**
     * 當選票大於一半以上時呼叫這個方法,如何去成為一個leader
     */
    private void becomeLeader() {
        this.lockSupplier(() -> {
            long becomeLeaderCostTime = TimeUtil.getTime() - this.beginElectTime;
            this.beginElectTime = 0L;

            logger.info("本節點 {} 在世代 {} 角色由 {} 變更為 {} 選舉耗時 {} ms,並開始向其他節點發送心跳包 ......", InetSocketAddressConfigHelper.getServerName(), this.generation, this.nodeRole.name(), NodeRole.Leader.name(),
                becomeLeaderCostTime);
            this.nodeRole = NodeRole.Leader;
            this.cancelAllTask();

            this.heartBeatTask();
            this.leaderServerName = InetSocketAddressConfigHelper.getServerName();
            return null;
        });
    }


四、執行我們的 raft!

看到這裡,不用懷疑.. 一個 raft 演算法已經實現了。至於一些細枝末節的東西,我相信大家都能處理好的.. 比如如何給其他節點發送各種包,包怎麼去定義之類的,都和 raft 本身沒什麼關係。

一般來說,在叢集可用後,我們就可以讓 Follower 連線 Leader 的業務埠,開始真正的業務了。 raft作為一個能快速選主的分散式演算法,一次選主基本只需要一次 RTT(Round-Trip Time)時間即可,非常迅速。

執行一下我們的專案,簡單測試,我們只用三臺機子,想測試多臺機子可以自己去玩玩...我們可以看到就像 zookeeper,我們需要配置兩個埠,前一個作為選舉埠,後一個則作為業務埠。

本文章只講了怎麼選舉,後面的埠可以無視,但是必填...

依次啟動 hanabi.1,hanabi.2,hanabi.3

很快,我們就能看到 hanabi.1 成為了世代28的 Leader,第一次選舉耗時久是因為啟動的時候有各種初始化 = =

此時,我們關閉 hanabi.1,因為叢集還有2臺機器,它們之間完全可以選出新的 Leader,我們關閉 hanabi.1 試試。觀察 hanabi.3,我們發現,很快,hanabi.3 就發現 Leader 已經掛掉,併發起了世代 29 的選舉。

在世代29中,僅存的 hanabi.2 拒絕為本節點投票,所以在 electionTimeout 到期後,hanabi.3 再次發起了選舉,此次選舉成功,因為 hanabi.2 還未到達 electionTimeout,所以還在世代 28,收到了世代 29 的拉票請求後,hanabi.2 節點將自己的票投給了 hanabi.3,hanabi.3 成功上位。

本專案github地址 : 基於raft演算法實現的分散式kv儲存框架 (專案實際上還有日誌寫入,日誌提交,日誌同步等功能,直接無視它.