1. 程式人生 > 其它 >MIT 6.824 Llab2B Raft之日誌複製

MIT 6.824 Llab2B Raft之日誌複製

書接上文Raft Part A | MIT 6.824 Lab2A Leader Election

實驗準備

  1. 實驗程式碼:git://g.csail.mit.edu/6.824-golabs-2021/src/raft
  2. 如何測試:go test -run 2B -race
  3. 相關論文:Raft Extended Section 5.3 Section 5.4.1
  4. 實驗指導:6.824 Lab 2: Raft (mit.edu)

實驗目標

在Leader Election的基礎上,完成Leader和Follower的關於Log Replication的程式碼。

一些提示

  1. 從實現Start()開始,接著完成AppendEntries RPC用於傳送和接收日誌。
  2. 需要實現Election Restriction,即論文的5.4.1節。
  3. 注意在Leader存活期間,不要有節點發起選舉。
  4. 不要讓一些迴圈不間斷的執行,否則會因為速度過慢而無法通過測試,使用條件變數或休眠來解決。
  5. 讓程式碼清晰乾淨,方便後面的實驗。
  6. 如果未能通過測試,檢視test_test.go和config.go。

複製狀態機

一條日誌的內容可能是這樣子:SET A 0,它的涵義是儲存一個名字為'A'的資料,他的值為'0'。一個日誌只是儲存了一個大小為7位元組的字串,資料真正被儲存到狀態機中。你可以把狀態機理解為資料庫。

客戶端將指令SET A 0傳送到Leader,Leader會將指令追加到自己的日誌序列中,Leader中未複製到其他節點的日誌,會在隨著下一次心跳複製到其他節點,被複制到半數以上節點的日誌將被標記為Commit

狀態,並由另一個goroutine負責提交到狀態機中,被提交到狀態機的日誌被標記為Applied狀態。

commitIndex & lastApplied

指令是不斷追加到日誌序列中的,日誌的同步要保證順序性,狀態機應該由遠及近的執行命令,才能保證節點之間的狀態一致。

因此如果log[i]被標記為Commit或Applied,那麼log[0] ~ log[i-1]也一定是Commit或Applied狀態。論文中使用commitIndex表示最後一個狀態為Commit的日誌索引,使用lastApplied表示最後一個狀態為Applied的日誌索引。

並且由於只有Applied只能由Commit轉換而來,所以lastApplied <= commitIndex

一定成立。

nextIndex

日誌從Leader複製到Followers,有如下三種方式。

  1. Leader每增加一個日誌,就傳送給Followers。
  2. Leader隨著每次心跳,將自己所有的日誌都發送給Followers。
  3. Leader隨著每次心跳,僅傳送未複製到Followers的部分日誌。

為了效率,選擇方式3是顯然的,那麼Leader如何知道每個Follower都需要哪部分日誌呢?

通過各個節點的日誌序列可以得出,commitIndex = 7,因為1 ~ 7號日誌已經同步到半數以上的節點。

因為在日誌同步過程中保證順序性,Follower相較於Leader缺失的是“後半部分”日誌,Leader使用nextIndex[]陣列標記每個Follower缺失日誌的開始位置(包括Leader自己)。本例中,nextIndex[] = {9, 6, 9, 3, 8}。

在本例中,Leader要向第一個Follower同步的部分日誌就是從nextIndex[1]到最後,即log[6],log[7],log[8]。

logEntry

你可能注意到上圖中日誌不止有指令,事實上論文中的日誌有兩個欄位,分別是Term和Command。

type LogEntry struct {
    Term    int
    Command interface{}
}

Command用於儲存如SET A 0這樣的指令,那麼為什麼要在日誌中加入Term呢?

假設Leader收到了客戶端請求,並追加到了自己的日誌序列中,還未等待同步到其他節點,就掛掉了,此時剩餘節點選出了新的Leader,繼續正常工作,狀態如下。

Log Old Leader New Leader Follower
1 'SET A 0' 'SET A 0' 'SET A 0'
2 'SET B 1' 'SET C 2' 'SET C 2'
3 OFFLINE 'SET A 1' 'SET A 1'

注意:為了方便處理邊界,論文中日誌序列索引從1開始。

若此時Old Leader恢復了,感知到New Leader後會轉會為Follower,此時nextIndex[0] = 3,New Leader會將log[3:]同步到Old Leader中。

Log Follower New Leader Follower
1 'SET A 0' 'SET A 0' 'SET A 0'
2 'SET B 1' 'SET C 2' 'SET C 2'
3 'SET A 1' 'SET A 1' 'SET A 1'

可以看到,現在叢集已經出現了不一致的情況,看樣子logEntry應該儲存更多的資訊,用來保證一致性。

那麼為什麼是Term呢?回想上文提到的日誌的時序性和Leader的唯一性,叢集在某一個Term中,Leader是唯一的,在給定Term的前提下,能夠確保在該任期內,叢集狀態是一致的。

Log Follower New Leader Follower
1 [T1] 'SET A 0' [T1] 'SET A 0' [T1] 'SET A 0'
2 [T1] 'SET B 1' [T2] 'SET C 2' [T2] 'SET C 2'
3 [T2] 'SET A 1' [T2] 'SET A 1' [T2] 'SET A 1'

觀察在logEntry中引入Term後的狀態,不難發現導致不一致的觀點就在Term上。

prevLogIndex & prevLogTerm

如何利用Term,在日誌同步過程中發現並糾正不一致的日誌呢?下圖是論文中給出的比較極端的不一致的情況。

目前nextIndex[] = {11, 10, 5, 12, 13, 8, 12}。

nextIndex[a] = 10,Leader需要同步日誌序列log[10:],並且要保證a的log[1:10]和自己的log[1:10]保持一致,關鍵點在於log[9],即log[nextIndex[a]-1]。

首先如果要Leader.log[1:10]和a.log[1:10]是一致能,那麼Leader.log[9]和a.Log[9]也一定是一致的,根據上一小節的結論,若Leader.log[9].Term == a.log[9].Term,則Leader.log[9].Command == a.log[9].Command一定存在。

Raft演算法規定,當Leader.log[N-1].Term == Follower.log[N-1].Term時,就可以把Leader.log[N]同步到Follower。

注意,這其實是一個動態規劃的思想。

if L.log[N-1].Term == F.log[N-1].Term {
    F.log[N] = L.log[N]
}

由此可得出日誌匹配原則:如果兩個日誌在相同索引的位置的Term相同,那麼從開始到該索引位置的日誌一定相同。

注意N和Term是由Leader.nextIndex和Leader.log得來,而上面的程式碼邏輯是在Follower中,Follower如何得知呢,論文中在AppendEntriesArgs中引入prevLogIndex和prevLogTerm,分別表示N-1和L.log[N-1].Term。

因此在AppendEntries中有如下邏輯。

/* Reply false if log doesn't contain an entry at pervLogIndex whose term matches pervLogTerm. */
if len(rf.log) <= args.PrevLogIndex || rf.log[args.PrevLogIndex].Term != args.PrevLogTerm {
    return false
}

Follower拒絕後,Leader如何處理呢?Follower錯誤和缺失的日誌部分依舊是需要修正和同步的。一個簡單的辦法是讓nextIndex遞減,直到找到Leader和Follower日誌序列中一致的位置。

因此在heartbeat(此處Part A中的heartbeat名字或許改為sync更恰當)中有如下邏輯。

/* If AppendEntries fails because of log inconsistency: decrement nextIndex and retry */
if !reply.Success {
    rf.nextIndex[i] = max(1, rf.nextIndex[i]-1)
}

注意log索引最小為1的問題,如果你覺得這種方式效率太低,可以嘗試在AppendEntriesReply中新增更多的欄位提升效率。

matchIndex

commitIndex是由Leader根據日誌同步情況確定的,同步到半數以上節點的日誌會被標記為commitIndex,論文在Leader中引入了matchIndex[]陣列用於輔助計算commitIndex,在日誌同步成功時,會同步更新nextIndex和matchIndex,兩者的關係為matchIndex = nextIndex - 1。matchIndex是個相對不是那麼重要的概念,你也可以用其它方式計算commitIndex,這裡使用排序取中位數的方式。

因此在heartbeat(此處Part A中的heartbeat名字或許改為sync更恰當)有如下邏輯。

if reply.Success {
    /* If successful: update nextIndex and matchIndex for follower. */
    rf.nextIndex[i] += len(args.Entries)
    rf.matchIndex[i] = rf.nextIndex[i] - 1
    
    match := make([]int, len(rf.peers))
    copy(match, rf.matchIndex)
    sort.Ints(match)
    majority := match[(len(rf.peers)-1)/2]
    /* If there exists an N such that N > commitIndex, a majority of matchIndex[i] >= N, and log[N].Term == currentTerm: set commitIndex = N */
    if majority > rf.commitIndex && rf.log[majority].Term == rf.currentTerm {
        rf.commitIndex = majority
    }
}

commitIndex是不只是Leader的概念,叢集中所有節點的commitIndex應當保持一致。論文在AppendEntriesArgs中引入了leaderCommit欄位,用於Follower更新commitIndex。

因此在AppendEntries中有如下邏輯。

/* If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry) */
if args.LeaderCommit > rf.commitIndex {
    rf.commitIndex = min(args.LeaderCommit, len(rf.log)-1)
}

如何理解rf.log[majority].Term == rf.currentTerm

圖中黑色框選表示Leader,注意(c),此時S1 Leader,Term為2,Index為2的日誌在半數以上節點中存在,但是S1不能將該日誌標記為commit狀態。因為如果此時Leader轉為S5,該日誌將會被覆蓋。

所以在標記commit狀態時需要判斷Term是否和currentTerm相等,只有當前Term的日誌才可以標記為commit狀態。也就是(e)的狀態。

lastLogTerm & lastLogIndex

讓我們再次觀察上文中提過的圖片,不知道你在之前是否發現了一些問題。

當Leader掛掉後,會從Followers中選舉新的Leader,在Part A中,每個Follower的選票會投給第一個向自己索要選票的Candidate,也就是說每個Follower都有可能被選舉為新的Leader。

假設第三個Follower,即日誌序列長度為2的Follower競選成功,那麼自log[3]開始向後的所有日誌都會在同步中丟失。但是已經標記為commit狀態的日誌是不應該丟失的。

Raft演算法規定,投票者只會將選票投給日誌和自己至少一樣新的成員。這樣保證了commit狀態的日誌不會丟失。即論文5.4.1節的選舉限制。

論文中在RequestVoteArgs中引入了lastLogIndex和lastLogTerm兩個欄位用於輔助實現選舉限制,分別表示候選人最後一個日誌的索引和任期。

因此在RequestVote中有如下邏輯。

/* If votedFor is null or candidateId, and candidate's log is at least as up-to-date as receiver's log, grant vote. */
if rf.votedFor == -1 || rf.votedFor == args.CandidateId {
    if args.LastLogTerm > rf.log[len(rf.log)-1].Term || args.LastLogIndex >= len(rf.log)-1 && rf.log[len(rf.log)-1].Term == args.LastLogTerm {
        rf.votedFor = args.CandidateId
        reply.VoteGranted = true
    }
}

實驗總結

我用綠色的線框選了Part B中需要實現的內容,和本文程式碼中的註釋是一致的,你可以通過關鍵字搜尋定位到相應的程式碼。

除了本文中給出的程式碼外,想要通過測試還需要實現Start()和Apply()兩個函式,分別用於客戶端傳送指令和提交日誌到狀態機,這兩個函式並不難實現所以本文就不給出具體程式碼了。

最後,為了證明我不是在亂寫,附上我的測試結果。