Mit 6.824 Lab3 KV Raft實現
paper地址:http://nil.csail.mit.edu/6.824/2021/labs/lab-kvraft.html
前言
建議在實現 Lab3 之前,結合 Lab2 的實現以及 Raft 論文進行實現 Lab3,即基於 Lab2 實現的 Raft 庫構建容錯鍵/值儲存服務。
開始
整體架構
-
首先,結合 Lab3 的 paper 中給出的架構圖進行理解本架構,下面會給出個人理解的通俗版本的圖,來幫助理解。
-
其次,如果讀過paper和Raft論文,應該會清楚一個要點:每個KVServer(raftServerId) 對應paper架構圖裡的 State Machine,也就是狀態機,而每個 KVServer 對應 Lab2 實現的 Raft peer,並且 KVServer 之間是藉助 Raft Service 來實現共識性,不直接互動的。
-
根據 paper 裡對 Lab3 要求的描述,可以清楚 KVServer 通過ClientId可以知道 Client 的請求來自具體哪個客戶端,同時儲存每個客戶端的請求資訊和狀態,所以每個客戶端請求過來時,都賦予了一個剛生成的唯一ID,並且同一個請求對應唯一的序列號(ClientId),這兩個 ID 就可以確定唯一性請求。這些在 client.go 和 server.go 就有具體程式碼和註釋說明。
-
客戶端的Id用 nrand() 隨機生成唯一ID,經過測試最多有7個客戶端ID且不會重複,每個 Client 維護一個 lastRequestId,通過mathrand(len(KVServer))生成,表示每一次請求的 Seq 序列號 clientId。
-
KVServer 通過維護了 lastRequestId,使得 Client 併發呼叫時,能通過最新的 RequestId, 得到最新的結果,保證應用程式的強一致性,這個強一致性通過定時器實現一段時間內(500ms)的分散式資料強一致性。
請求和響應流程:
請求響應流程,以Put/Get為例子:
- KVServer 收到 Client 的Request請求後,通過raft.Start() 提交Op給raft 庫, 然後通過Chan機制,等待Raft 返回結果到 waitApplyCh, 也就是等待Raft應用日誌到狀態機後,才通過給chan緩衝區放入響應資料來響應給KVServer。
- 在Raft的所有peer 進行 apply當前請求的命令Op後, 每個Server會在單獨的執行緒ApplyLoop 中等待不斷到來的ApplyCh中的Op,直到 ApplyCh 緩衝區得到 Raft 的響應。
- Raft庫 執行這個Op(GET全部執行,重複的PUT, APPEND不執行)
- Leader 等待Apply Loop 完成,之後根據Op中的資訊將Raft庫中的執行結果返回給Wait Channel , 中才有Wait Channel 在等結果
- 最後Leader將執行結果封裝後返回給Client
具體請求流程
go kv.ReadRaftApplyCommandLoop()
這個Loop裡,監聽讀取KVServer的applyCh- 通過在KVServer裡的
applyCh chan raft.ApplyMsg
,藉助管道chan的機制,實現寫阻塞,也就是實現了在請求響應過程中,節點 peers監聽一個chan 管道,管道接收到,才能觸發接下來的操作。 - 只有Leader負責接受client的請求,才能觸發以上條件,follower不主動觸發,只能等待被leader同步
- 接受到請求後,Leader判定條件是否準確,正確則交由Raft#Start(Op)方法,接下來就阻塞等待方法的回撥,等待結果返回。
- 根據需要設定 WaitChan 等待結果,同時設定Timeout,用來判定是否響應超時
- 之後Raft會將Leader的 Op 執行結果同步給所有Follower,ApplyEntry 同步到上層的每個KVServer
- 根據管道的返回時間是否超時,判定是否在強一致性的時間內能否得到響應
- 如果超時:
- 會先進行ifRequestDuplicate() 判斷RequestId是否過時,依然是最新的RequestId 則從Leader的狀態機中執行 Op 命令,返回本地日誌執行命令後得到的結果
- 不超時,表示在一致性的有效時間內,只需要判斷Raft響應的clientId和RequestId是否相同,即是否是最新的請求,是則表明KVServer的KVDB中有Op.key的最新資料,保證了資料的強一致性。
- 如果超時:
- 執行完Get或Append後,最後要刪除管道的raftIndex對應的Op
請求阻塞問題
通過KVServer中的time.After
實現阻塞超時、重發。
因為無論是waitChan 還是labrpc中的Call方法,都沒有“回撥超時”的概念,會阻塞在哪裡。
所以需要在Server端(或client端)實現計時器超時機制,避免無限阻塞。
重複請求問題
- Lab3A核心功能之一就是處理請求重複問題,即Duplicate Request ,實現保證一個重複的請求不會在同一個狀態機上被執行兩次,每個請求對應唯一的ClientID:RequestID
- 對於KVServer收到Client端的請求,無論是否重複,我們都提交給Raft作為它的log. 而KVServer 通過
kv.ifRequestDuplicate
方法 負責在接受apply log時判定這個log代指的Request Op是不是重複的,是重複的,我們就不在狀態機上執行,直接返回OK即可,只需要在Put方法裡考慮該問題就行,讀操作不影響狀態機的資料。
路由-負載均衡問題
Clinet . servers[] 的序號和KVServers.me 不是一 一對應的,而是隨機shuffle過的。 但是KVServer.me 和 Raft.me 是對應的。 這就導致了Client 傳送請求到一個不是LEADER的KVServer, 這個KVServer可以拿到raft.leader的序號並傳回Client, 但Client 並不能通過client.servers[raft.leader]來找到真正的Leader, 還是要隨機訪問另一個。所以說我們收到ErrWrongLeader時候,只要再隨機訪問下一個KVServer即可
ck.RecentLeaderId = GetRandomServer(len(ck.Servers))
server := ck.RecentLeaderId
for{
// RPC請求KVServer的Get方法, 成功則返回leaderId
ok := ck.Servers[server].Call("KVServer.Get", &args, &reply)
// 換下一個Server,重試,直到 OK or Error
if !ok || reply.Err == ErrWrongLeader {
// LeaderId
server = (server + 1) % len(ck.Servers)
continue
...
}
快照
Snapshot 快照其實就是Server 維護的KeyValue資料庫,可以看作是記憶體中一個map
對於Leader:
// 迴圈讀取Raft已經應用的日誌條目命令得到的迴應
func (kv *KVServer) ReadRaftApplyCommandLoop() {
for message := range kv.applyCh {
if message.CommandValid {
kv.GetCommandFromRaft(message)
}
if message.SnapshotValid {
kv.GetSnapshotFromRaft(message)
}
}
}
-
在leader應用日誌後,message.CommandValid 為true 說明RaftState[] 在遞增,即有日誌被應用並且命令是有效的。
-
那麼Leader會執行響應的Get或Put操作完成後,根據日誌條目閾值maxraftstate和當前日誌條目數量RaftStateSize判斷是否需要命令Raft進行Snapshot快照壓縮操作。
-
如果需要,則呼叫MakeSnapshot方法,將自身的KVDB,RequestID等資訊製作成snapshot, 並呼叫Raft庫的Snapshot介面。
-
Leader 安裝Snapshot , 這分為三部分,修剪log Entries [] , SnapShot 通過Persister進行持久化儲存, 之後在Appendentries中將本次的SnapShot資訊傳送給落後的Follower
-
最後返回執行結果給WaitChannel
對於Follower :
· if message.SnapshotValid {
kv.GetSnapshotFromRaft(message)
}
- 1 . Leader執行 InstallSnapshot 的RPC方法後,Raft 層會獲取 snapshot資料, 裁剪log, 通過ApplyCh上報給Server (此時SnapshotValid: true)
-
2 . Follower的Applyloop 收到請求,呼叫CondInstallSnapshot() 來詢問是否可以安裝snapshot
// 從Raft中獲取快照日誌 func (kv *KVServer) GetSnapshotFromRaft(message raft.ApplyMsg) { kv.mu.Lock() defer kv.mu.Unlock() if kv.rf.CondInstallSnapshot(message.SnapshotTerm, message.SnapshotIndex, message.Snapshot) { // 追加快照日誌 kv.ReadSnapshotToInstall(message.Snapshot) kv.lastSSPointRaftLogIndex = message.SnapshotIndex } }
-
3 . CondInstallSnapshot() 判定snapshot安裝條件,持久化snapshot, 並通知Server可以InstallSnapshot
核心程式碼
KVServer資料結構
type KVServer struct {
mu sync.Mutex
me int
// 每個KVServer對應一個Raft
rf *raft.Raft
applyCh chan raft.ApplyMsg
dead int32 // set by Kill()
// 快照日誌中,最後日誌條目的State
maxraftstate int // snapshot if log grows this big
// Your definitions here.
// 儲存put的資料,key : value
kvDB map[string]string
// index(Raft pper) -> chan
waitApplyCh map[int]chan Op
// clientId : requestId
lastRequestId map[int64]int
// last Snapshot point & raftIndex
lastSSPointRaftLogIndex int
}
啟動KVServer
// 啟動KVServer
func StartKVServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int) *KVServer {
// call labgob.Register on structures you want
// Go's RPC library to marshall/unmarshall.
DPrintf("[InitKVServer---]Server %d", me)
// 註冊rpc伺服器
labgob.Register(Op{})
kv := new(KVServer)
kv.me = me
kv.maxraftstate = maxraftstate
// You may need initialization code here.
kv.applyCh = make(chan raft.ApplyMsg)
kv.rf = raft.Make(servers, me, persister, kv.applyCh)
// You may need initialization code here.
// kv初始化
kv.kvDB = make(map[string]string)
kv.waitApplyCh = make(map[int]chan Op)
kv.lastRequestId = make(map[int64]int)
// 快照
snapshot := persister.ReadSnapshot()
if len(snapshot) > 0 {
// 讀取快照日誌
kv.ReadSnapshotToInstall(snapshot)
}
// 迴圈讀取Raft已經應用的日誌條目命令
go kv.ReadRaftApplyCommandLoop()
return kv
}
Put和Get
// RPC方法
func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
// Your code here.
if kv.killed() {
reply.Err = ErrWrongLeader
return
}
_, ifLeader := kv.rf.GetState()
// RaftServer必須是Leader
if !ifLeader {
reply.Err = ErrWrongLeader
return
}
op := Op{
Operation: "get",
Key: args.Key,
Value: "",
ClientId: args.ClientId,
RequestId: args.RequestId,
}
// 向Raft server 傳送命令
raftIndex, _, _ := kv.rf.Start(op)
DPrintf("[GET StartToRaft]From Client %d (Request %d) To Server %d, key %v, raftIndex %d", args.ClientId, args.RequestId, kv.me, op.Key, raftIndex)
// waitForCh
kv.mu.Lock()
// chForRaftIndex為儲存Op的chan,raftIndex為Raft Server的LastLogIndex+1
// 用於實現RPC呼叫Raft.Start時,儲存RPC返回的Op,通過 Raft Server 的 lastLogIndex獲取
// 通過raft的lastLogIndex,就能得到該日誌條目儲存的value,並儲存到KVDB中
chForRaftIndex, exist := kv.waitApplyCh[raftIndex]
// Loop Apply ,技術上要求線性化
// 不存在該記錄,表明呼叫還未返回結果,則繼續等待呼叫返回
if !exist {
kv.waitApplyCh[raftIndex] = make(chan Op, 1)
chForRaftIndex = kv.waitApplyCh[raftIndex]
}
// RPC呼叫完成
kv.mu.Unlock()
// Timeout
select {
// 超過一致性要求的時間,則需要通過lastRequestId,從KVDB中獲取結果
case <-time.After(time.Millisecond * CONSENSUS_TIMEOUT):
DPrintf("[GET TIMEOUT!!!]From Client %d (Request %d) To Server %d, key %v, raftIndex %d", args.ClientId, args.RequestId, kv.me, op.Key, raftIndex)
_, ifLeader := kv.rf.GetState()
// 該client的最新RequestId是否是newRequestId,不是,則返回最新RequestId
// 該步驟保證了client併發呼叫KVServer時,根據最新的RequestId,得到最新的結果
if kv.ifRequestDuplicate(op.ClientId, op.RequestId) && ifLeader {
// 根據命令獲取該client最新RequestId得到並儲存在KVDB的value
value, exist := kv.ExecuteGetOpOnKVDB(op)
if exist {
reply.Err = OK
reply.Value = value
} else {
reply.Err = ErrNoKey
reply.Value = ""
}
} else {
reply.Err = ErrWrongLeader
}
// 在一致性的有效時間內:
case raftCommitOp := <-chForRaftIndex:
DPrintf("[WaitChanGetRaftApplyMessage<--]Server %d , get Command <-- Index:%d , ClientId %d, RequestId %d, Opreation %v, Key :%v, Value :%v", kv.me, raftIndex, op.ClientId, op.RequestId, op.Operation, op.Key, op.Value)
// 該已提交到Raft的RPC請求,是本次的Op命令
if raftCommitOp.ClientId == op.ClientId &&
raftCommitOp.RequestId == op.RequestId {
// 則從KVServer的Map直接獲取value
value, exist := kv.ExecuteGetOpOnKVDB(op)
if exist {
reply.Err = OK
reply.Value = value
} else {
reply.Err = ErrNoKey
reply.Value = ""
}
} else {
reply.Err = ErrWrongLeader
}
}
kv.mu.Lock()
// Get結束後,刪除chan map中raftIndex對應的Op
delete(kv.waitApplyCh, raftIndex)
kv.mu.Unlock()
return
}
Put方法
// RPC方法
func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
// Your code here.
if kv.killed() {
reply.Err = ErrWrongLeader
return
}
_, ifLeader := kv.rf.GetState()
// RaftServer必須是Leader
if !ifLeader {
reply.Err = ErrWrongLeader
return
}
op := Op{
Operation: args.Op,
Key: args.Key,
Value: args.Value,
ClientId: args.ClientId,
RequestId: args.RequestId,
}
// 向Raft server 傳送命令
raftIndex, _, _ := kv.rf.Start(op)
DPrintf("[PUTAPPEND StartToRaft]From Client %d (Request %d) To Server %d, key %v, raftIndex %d", args.ClientId, args.RequestId, kv.me, op.Key, raftIndex)
// waitForCh
kv.mu.Lock()
// chForRaftIndex為儲存Op的chan,raftIndex為Raft Server的LastLogIndex+1
// 用於實現RPC呼叫Raft.Start時,儲存RPC返回的Op,通過 Raft Server 的 lastLogIndex獲取
// 通過raft的lastLogIndex,就能得到該日誌條目儲存的value,並儲存到KVDB中
chForRaftIndex, exist := kv.waitApplyCh[raftIndex]
// Loop Apply ,技術上要求線性化
// 不存在該記錄,表明呼叫還未返回結果,則繼續等待呼叫返回
if !exist {
kv.waitApplyCh[raftIndex] = make(chan Op, 1)
chForRaftIndex = kv.waitApplyCh[raftIndex]
}
// RPC呼叫完成
kv.mu.Unlock()
// Timeout
select {
// 超過一致性要求的時間,則需要通過lastRequestId,從KVDB中獲取結果
case <-time.After(time.Millisecond * CONSENSUS_TIMEOUT):
DPrintf("[TIMEOUT PUTAPPEND !!!!]Server %d , get Command <-- Index:%d , ClientId %d, RequestId %d, Opreation %v, Key :%v, Value :%v", kv.me, raftIndex, op.ClientId, op.RequestId, op.Operation, op.Key, op.Value)
// 該client的最新RequestId是否是newRequestId,不是,則返回最新RequestId
// 該步驟保證了client併發呼叫KVServer時,根據最新的RequestId,得到最新的結果
if kv.ifRequestDuplicate(op.ClientId, op.RequestId) {
reply.Err = OK
} else {
reply.Err = ErrWrongLeader
}
// 在一致性的有效時間內:
case raftCommitOp := <-chForRaftIndex:
DPrintf("[WaitChanGetRaftApplyMessage<--]Server %d , get Command <-- Index:%d , ClientId %d, RequestId %d, Opreation %v, Key :%v, Value :%v", kv.me, raftIndex, op.ClientId, op.RequestId, op.Operation, op.Key, op.Value)
// 該已提交到Raft的RPC請求,是本次的Op命令
if raftCommitOp.ClientId == op.ClientId &&
raftCommitOp.RequestId == op.RequestId {
reply.Err = OK
} else {
reply.Err = ErrWrongLeader
}
}
kv.mu.Lock()
delete(kv.waitApplyCh, raftIndex)
kv.mu.Unlock()
return
}