經典分散式演算法 —— 淺顯易懂的 Raft 演算法實現
一、Raft概念
copy一下其他小夥伴寫的文章: Raft演算法詳解
不同於Paxos演算法直接從分散式一致性問題出發推匯出來,Raft演算法則是從多副本狀態機的角度提出,用於管理多副本狀態機的日誌複製。Raft實現了和Paxos相同的功能,它將一致性分解為多個子問題:Leader選舉(Leader election)、日誌同步(Log replication)、安全性(Safety)、日誌壓縮(Log compaction)、成員變更(Membership change)等。同時,Raft演算法使用了更強的假設來減少了需要考慮的狀態,使之變的易於理解和實現。
Raft將系統中的角色分為領導者(Leader)、跟從者(Follower)和候選人(Candidate):
- Leader:接受客戶端請求,並向Follower同步請求日誌,當日志同步到大多數節點上後告訴Follower提交日誌。
- Follower:接受並持久化Leader同步的日誌,在Leader告之日誌可以提交之後,提交日誌。
- Candidate:Leader選舉過程中的臨時角色。
本文不過多贅述 raft 演算法是個什麼東西... 這裡再貼一個十分好理解的文章:The Raft Consensus Algorithm
二、系統初步設計
在對raft有一定理解後,我們簡單梳理一下在raft選舉過程中,我們需要的一些角色,以及角色的司職。
首先我們需要一個選舉控制類,單例實現即可,節點的選舉全權交給此選舉控制類的實現,我們稱其為 ElectOperator。
先講一個 raft 中重要的概念:世代,也稱為 epoch,但在這篇文章,將其稱為 generation(不要糾結這個 = =)。 世代可以認為是一個標記當前傳送的操作是否有效的標識,如果收到了小於本節點世代的請求,則可無視其內容,如果收到了大於本世代的請求,則需要更新本節點世代,並重置自己的身份,變為 Follower,類似於樂觀鎖的設計理念。
我們知道,raft中一共有三種角色:Follower、Candidate、Leader
(1)Follower
Follower 需要做什麼呢:
- 接收心跳
- Follower 在 VOTES_BACK_OFF_MS 時間內,若沒有收到來自 Leader的心跳,則轉變為 Candidate
- 接收拉票請求,並返回自己的投票
好的,Follower非常簡單,只需要做三件事即可。
(2)Candidate
Candidate 扮演什麼樣的職能呢:
- 接收心跳
- Candidate 在 VOTES_BACK_OFF_MS 時間內,若沒有收到來自 Leader的心跳,則轉變為 Candidate
- 接收拉票請求,並返回自己的投票
- 向叢集中的其他節點發起拉票請求
- 當收到的投票大於半數( n/2 + 1, n為叢集內的節點數量),轉變為 Leader
Candidate 比起 Follower 稍微複雜一些,但前三件事情都是一樣的。
(3)Leader
Leader 在選舉過程中扮演的角色最為簡單:
- 接收心跳
- 向叢集內所有節點發送心跳
Leader 也是可以接收心跳的,當收到大於當前世代的心跳或請求後,Leader 需要轉變為 Follower。Leader 不可能收到同世代的心跳請求,因為 (1) 在 raft 演算法中,同一世代中,節點僅對同一個節點進行投票。(2) 需要收到過半投票才可以轉變為 Leader。
三、系統初步實現
簡單貼一下選舉控制器需要的一些屬性程式碼,下面的註釋都說的很清楚了,其中需要補充的一點是定時任務使用了時間輪來實現,不理解沒有關係...就是個定時任務,定時任務的一個引用放在 Map<TaskEnum, TimedTask> taskMap; 中,便於取消任務。
public class ElectOperator extends ReentrantLocker implements Runnable {
// 成為 Candidate 的退避時間(真實退避時間需要 randomized to be between 150ms and 300ms )
private static final long ELECTION_TIMEOUT_MS = ElectConfigHelper.getElectionTimeoutMs();
// 心跳間隔
private static final long HEART_BEAT_MS = ElectConfigHelper.getHeartBeatMs();
/**
* 該投票箱的世代資訊,如果一直進行選舉,一直能達到 {@link #ELECTION_TIMEOUT_MS},而選不出 Leader ,也需要15年,generation才會不夠用,如果
* generation 的初始值設定為 Long.Min (現在是0,則可以撐30年,所以完全呆膠布)
*/
private long generation;
/**
* 當前節點的角色
*/
private NodeRole nodeRole;
/**
* 所有正在跑的定時任務
*/
private Map<TaskEnum, TimedTask> taskMap;
/**
* 投票箱
*/
private Map<String/* serverName */, Boolean> box;
/**
* 投票給了誰的投票記錄
*/
private Votes voteRecord;
/**
* 快取一份叢集資訊,因為叢集資訊是可能變化的,我們要保證在一次選舉中,叢集資訊是不變的
*/
private List<HanabiNode> clusters;
/**
* 心跳內容
*/
private HeartBeat heartBeat;
/**
* 現在叢集的leader是哪個節點
*/
private String leaderServerName;
private volatile static ElectOperator INSTANCE;
public static ElectOperator getInstance() {
if (INSTANCE == null) {
synchronized (ElectOperator.class) {
if (INSTANCE == null) {
INSTANCE = new ElectOperator();
ElectControllerPool.execute(INSTANCE);
}
}
}
return INSTANCE;
}
另外,上面羅列的這些值大都是需要在更新世代時重置的,我們先擬定一下更新世代的邏輯,通用的來講,就是清除投票記錄,清除自己的投票箱,更新自己的世代,身份變更為 Follower 等等,我們將這個方法稱為 init。
/**
* 初始化
*
* 1、成為follower
* 2、先取消所有的定時任務
* 3、重置本地變數
* 4、新增成為Candidate的定時任務
*/
private boolean init(long generation, String reason) {
return this.lockSupplier(() -> {
if (generation > this.generation) {// 如果有選票的世代已經大於當前世代,那麼重置投票箱
logger.debug("初始化投票箱,原因:{}", reason);
// 1、成為follower
this.becomeFollower();
// 2、先取消所有的定時任務
this.cancelAllTask();
// 3、重置本地變數
logger.debug("更新世代:舊世代 {} => 新世代 {}", this.generation, generation);
this.generation = generation;
this.voteRecord = null;
this.box = new HashMap<>();
this.leaderServerName = null;
// 4、新增成為Candidate的定時任務
this.becomeCandidateAndBeginElectTask(this.generation);
return true;
} else {
return false;
}
});
}
(1) Follower的實現
基於上面的分析,我們可以歸納一下 Follower 需要一些什麼樣的方法:
1、轉變為 Candidate 的定時任務
實際上就是 randomized to be between 150ms and 300ms 後,如果沒收到 Leader 的心跳,或者自己變為 Candidate 後,在這個時間內沒有成功上位,則繼續轉變為 Candidate。
為什麼我們成為 Candidate 的退避時間需要隨機 150ms - 300ms呢?這是為了避免所有節點的選舉發起發生碰撞,如果說都是相同的退避時間,每個節點又會優先投自己一票,那麼這個集群系統就會陷入無限發起投票,但又無法成為 Leader 的局面。
簡而言之就是我們需要提供一個可重新整理的定時任務,如果在一定時間內沒重新整理這個任務,則節點轉變為 Candidate,併發起選舉,程式碼如下。首先取消之前的 becomeCandidate 定時定時任務,然後設定在 electionTimeout 後呼叫 beginElect(generation) 方法。
/**
* 成為候選者的任務,(重複呼叫則會取消之前的任務,收到來自leader的心跳包,就可以重置一下這個任務)
*
* 沒加鎖,因為這個任務需要頻繁被呼叫,只要收到leader來的訊息就可以呼叫一下
*/
private void becomeCandidateAndBeginElectTask(long generation) {
this.lockSupplier(() -> {
this.cancelCandidateAndBeginElectTask("正在重置發起下一輪選舉的退避時間");
// The election timeout is randomized to be between 150ms and 300ms.
long electionTimeout = ELECTION_TIMEOUT_MS + (int) (ELECTION_TIMEOUT_MS * RANDOM.nextFloat());
TimedTask timedTask = new TimedTask(electionTimeout, () -> this.beginElect(generation));
Timer.getInstance()
.addTask(timedTask);
taskMap.put(TaskEnum.BECOME_CANDIDATE, timedTask);
return null;
});
}
2、接收心跳與心跳回復
接收心跳十分簡單,如果當前心跳大於等於當前世代,且還未認定某個節點為 Leader,則取消所有定時任務,成為Follower,並記錄心跳包中 Leader 節點的資訊,最後重置一下成為候選者的任務。
如果已經成為某個 Leader 的 Follower,則直接成為候選者的任務即可。
另外一個要注意的是,needToSendHeartBeatInfection,是否需要傳送心跳感染包,當收到低世代 Leader 的心跳時,如果當前叢集已經選出 Leader ,則回覆此心跳包,告訴舊 Leader,現在已經是新世代了!(程式碼中沒有展現,其實就是再次封裝一個心跳包,帶上世代資訊和 Leader 節點資訊,回覆給 Leader 即可)
public void receiveHeatBeat(String leaderServerName, long generation, String msg) {
return this.lockSupplier(() -> {
boolean needToSendHeartBeatInfection = true;
// 世代大於當前世代
if (generation >= this.generation) {
needToSendHeartBeatInfection = false;
if (this.leaderServerName == null) {
logger.info("叢集中,節點 {} 已經成功在世代 {} 上位成為 Leader,本節點將成為 Follower,直到與 Leader 的網路通訊出現問題", leaderServerName, generation);
// 取消所有任務
this.cancelAllTask();
// 成為follower
this.becomeFollower();
// 將那個節點設為leader節點
this.leaderServerName = leaderServerName;
}
// 重置成為候選者任務
this.becomeCandidateAndBeginElectTask(this.generation);
}
return null;
});
}
3、接收拉票請求與回覆投票
我們知道,raft 在一個世代只能投票給一個節點,且發起投票者會首先投票給自己。所以邏輯就很簡單了,只有當世代大於等於當前,且還未投票時,則拉票請求成功,返回true即可,否則都視為失敗,返回false。
/**
* 某個節點來請求本節點給他投票了,只有當世代大於當前世代,才有投票一說,其他情況都是失敗的
*
* 返回結果
*
* 為true代表接受投票成功。
* 為false代表已經給其他節點投過票了,
*/
public VotesResponse receiveVotes(Votes votes) {
return this.lockSupplier(() -> {
logger.debug("收到節點 {} 的投票請求,其世代為 {}", votes.getServerName(), votes.getGeneration());
String cause = "";
if (votes.getGeneration() < this.generation) {
cause = String.format("投票請求 %s 世代小於當前世代 %s", votes.getGeneration(), this.generation);
} else if (this.voteRecord != null) {
cause = String.format("在世代 %s,本節點已投票給 => %s 節點", this.generation, this.voteRecord.getServerName());
} else {
this.voteRecord = votes; // 代表投票成功了
}
boolean result = votes.equals(this.voteRecord);
if (result) {
logger.debug("投票記錄更新成功:在世代 {},本節點投票給 => {} 節點", this.generation, this.voteRecord.getServerName());
} else {
logger.debug("投票記錄更新失敗:原因:{}", cause);
}
String serverName = InetSocketAddressConfigHelper.getServerName();
return new VotesResponse(this.generation, serverName, result, serverName.equals(this.leaderServerName), votes.getGeneration());
});
}
(2) Candidate的實現
可以看出 Follower 十分簡單, Candidate 在 Follower 的基礎上增加了發起選舉的拉票請求,與接收投票,並上位成為Leader兩個功能,實際上也十分簡單。
1、發起拉票請求
回顧一下前面的轉變成 Candidate 的定時任務,定時任務實際上就是呼叫一個方法
TimedTask timedTask = new TimedTask(electionTimeout, () -> this.beginElect(generation));
這個 beginElect 就是轉變為 Candidate 併發起選舉的實現。讓我們先想想需要做什麼,首先肯定是
- 更新一下自己的世代,因為已經長時間沒收到 Leader 的心跳包了,我們需要自立門戶。
- 給自己投一票
- 要求其他節點給自己投票
分析到這裡就很明瞭了。下面首先執行 updateGeneration 方法,實際上就是執行前面所說的 init 方法,傳入 generation + 1 的世代,重置一下上個世代各種儲存的狀態;然後呼叫 becomeCandidate,實際上就是切換一下身份,將 Follower 或者 Candidate 切換為 Candidate;給自己的 voteRecord 投一票,最後帶上自己的節點標識和世代資訊,去拉票。
/**
* 開始進行選舉
*
* 1、首先更新一下世代資訊,重置投票箱和投票記錄
* 2、成為候選者
* 3、給自己投一票
* 4、請求其他節點,要求其他節點給自己投票
*/
private void beginElect(long generation) {
this.lockSupplier(() -> {
if (this.generation != generation) {// 存在這麼一種情況,雖然取消了選舉任務,但是選舉任務還是被執行了,所以這裡要多做一重處理,避免上個週期的任務被執行
return null;
}
logger.info("Election Timeout 到期,可能期間內未收到來自 Leader 的心跳包或上一輪選舉沒有在期間內選出 Leader,故本節點即將發起選舉");
updateGeneration("本節點發起了選舉");// this.generation ++
// 成為候選者
logger.info("本節點正式開始世代 {} 的選舉", this.generation);
if (this.becomeCandidate()) {
VotesResponse votes = new VotesResponse(this.generation, InetSocketAddressConfigHelper.getServerName(), true, false, this.generation);
// 給自己投票箱投票
this.receiveVotesResponse(votes);
// 記錄一下,自己給自己投了票
this.voteRecord = votes;
// 讓其他節點給自己投一票
this.askForVoteTask(new Votes(this.generation, InetSocketAddressConfigHelper.getServerName()), 0);
}
return null;
});
}
2、接收投票,併成為 Leader
如果說在 150ms and 300ms 之間,本節點收到了過半投票,則可上位成 Leader,否則定時任務會再次呼叫 beginElect,再次更新本節點世代,然後發起新一輪選舉。
接收投票其實十分簡單,回憶一下前面接收拉票請求與回覆投票,實際上就是拉票成功,就返回true,否則返回flase。
我們每次都判斷一下是否拿到過半的票數,如果拿到,則成為 Leader,另外有一個值得注意的是,為了加快叢集恢復可用的程序,類似於心跳感染(如果心跳發到Leader那裡去了,Leader會告訴本節點,它才是真正的Leader),投票也存在投票感染,下面的程式碼由 votesResponse.isFromLeaderNode() 來表示。
投票的記錄也是十分簡單,就是把每個投票記錄扔到 Map<String/* serverName */, Boolean> box; 裡,true 表示同意投給本節點,flase 則不同意,如果同意達到半數以上,則呼叫 becomeLeader 成為本世代 Leader。
/**
* 給當前節點的投票箱投票
*/
public void receiveVotesResponse(VotesResponse votesResponse) {
this.lockSupplier(() -> {
if (votesResponse.isFromLeaderNode()) {
logger.info("來自節點 {} 的投票應答表明其身份為 Leader,本輪拉票結束。", votesResponse.getServerName());
this.receiveHeatBeat(votesResponse.getServerName(), votesResponse.getGeneration(),
String.format("收到來自 Leader 節點的投票應答,自動將其視為來自 Leader %s 世代 %s 節點的心跳包", heartBeat.getServerName(), votesResponse.getGeneration()));
}
if (this.generation > votesResponse.getAskVoteGeneration()) {// 如果選票的世代小於當前世代,投票無效
logger.info("來自節點 {} 的投票應答世代是以前世代 {} 的選票,選票無效", votesResponse.getServerName(), votesResponse.getAskVoteGeneration());
return null;
}
if (votesResponse.isAgreed()) {
if (!voteSelf) {
logger.info("來自節點 {} 的投票應答有效,投票箱 + 1", votesResponse.getServerName());
}
// 記錄一下投票結果
box.put(votesResponse.getServerName(), votesResponse.isAgreed());
List<HanabiNode> hanabiNodeList = this.clusters;
int clusterSize = hanabiNodeList.size();
int votesNeed = clusterSize / 2 + 1;
long voteCount = box.values()
.stream()
.filter(aBoolean -> aBoolean)
.count();
logger.info("叢集中共 {} 個節點,本節點當前投票箱進度 {}/{}", hanabiNodeList.size(), voteCount, votesNeed);
// 如果獲得的選票已經大於了叢集數量的一半以上,則成為leader
if (voteCount == votesNeed) {
logger.info("選票過半,準備上位成為 leader 節點", votesResponse.getServerName());
this.becomeLeader();
}
} else {
logger.info("節點 {} 在世代 {} 的投票應答為:拒絕給本節點在世代 {} 的選舉投票(當前世代 {})", votesResponse.getServerName(), votesResponse.getGeneration(), votesResponse.getAskVoteGeneration(), this.generation);
// 記錄一下投票結果
box.put(votesResponse.getServerName(), votesResponse.isAgreed());
}
return null;
});
}
(3) Leader 的實現
作為 Leader,在 raft 中的實現卻是最簡單的,我們只需要給子節點發心跳包即可。然後如果收到大於自己世代的心跳感染,則成為新世代的 Follower,接收心跳的邏輯和 Follower 沒有區別。
/**
* 當選票大於一半以上時呼叫這個方法,如何去成為一個leader
*/
private void becomeLeader() {
this.lockSupplier(() -> {
long becomeLeaderCostTime = TimeUtil.getTime() - this.beginElectTime;
this.beginElectTime = 0L;
logger.info("本節點 {} 在世代 {} 角色由 {} 變更為 {} 選舉耗時 {} ms,並開始向其他節點發送心跳包 ......", InetSocketAddressConfigHelper.getServerName(), this.generation, this.nodeRole.name(), NodeRole.Leader.name(),
becomeLeaderCostTime);
this.nodeRole = NodeRole.Leader;
this.cancelAllTask();
this.heartBeatTask();
this.leaderServerName = InetSocketAddressConfigHelper.getServerName();
return null;
});
}
四、執行我們的 raft!
看到這裡,不用懷疑.. 一個 raft 演算法已經實現了。至於一些細枝末節的東西,我相信大家都能處理好的.. 比如如何給其他節點發送各種包,包怎麼去定義之類的,都和 raft 本身沒什麼關係。
一般來說,在叢集可用後,我們就可以讓 Follower 連線 Leader 的業務埠,開始真正的業務了。 raft作為一個能快速選主的分散式演算法,一次選主基本只需要一次 RTT(Round-Trip Time)時間即可,非常迅速。
執行一下我們的專案,簡單測試,我們只用三臺機子,想測試多臺機子可以自己去玩玩...我們可以看到就像 zookeeper,我們需要配置兩個埠,前一個作為選舉埠,後一個則作為業務埠。
本文章只講了怎麼選舉,後面的埠可以無視,但是必填...
依次啟動 hanabi.1,hanabi.2,hanabi.3
很快,我們就能看到 hanabi.1 成為了世代28的 Leader,第一次選舉耗時久是因為啟動的時候有各種初始化 = =
此時,我們關閉 hanabi.1,因為叢集還有2臺機器,它們之間完全可以選出新的 Leader,我們關閉 hanabi.1 試試。觀察 hanabi.3,我們發現,很快,hanabi.3 就發現 Leader 已經掛掉,併發起了世代 29 的選舉。
在世代29中,僅存的 hanabi.2 拒絕為本節點投票,所以在 electionTimeout 到期後,hanabi.3 再次發起了選舉,此次選舉成功,因為 hanabi.2 還未到達 electionTimeout,所以還在世代 28,收到了世代 29 的拉票請求後,hanabi.2 節點將自己的票投給了 hanabi.3,hanabi.3 成功上位。
本專案github地址 : 基於raft演算法實現的分散式kv儲存框架 (專案實際上還有日誌寫入,日誌提交,日誌同步等功能,直接無視它.