1. 程式人生 > 其它 >Mit 6.824 Lab3 KV Raft實現

Mit 6.824 Lab3 KV Raft實現

paper地址:http://nil.csail.mit.edu/6.824/2021/labs/lab-kvraft.html

前言

建議在實現 Lab3 之前,結合 Lab2 的實現以及 Raft 論文進行實現 Lab3,即基於 Lab2 實現的 Raft 庫構建容錯鍵/值儲存服務。

開始

整體架構

  1. 首先,結合 Lab3 的 paper 中給出的架構圖進行理解本架構,下面會給出個人理解的通俗版本的圖,來幫助理解。

  2. 其次,如果讀過paper和Raft論文,應該會清楚一個要點:每個KVServer(raftServerId) 對應paper架構圖裡的 State Machine,也就是狀態機,而每個 KVServer 對應 Lab2 實現的 Raft peer,並且 KVServer 之間是藉助 Raft Service 來實現共識性,不直接互動的。

  3. 根據 paper 裡對 Lab3 要求的描述,可以清楚 KVServer 通過ClientId可以知道 Client 的請求來自具體哪個客戶端,同時儲存每個客戶端的請求資訊和狀態,所以每個客戶端請求過來時,都賦予了一個剛生成的唯一ID,並且同一個請求對應唯一的序列號(ClientId),這兩個 ID 就可以確定唯一性請求。這些在 client.go 和 server.go 就有具體程式碼和註釋說明。

  4. 客戶端的Id用 nrand() 隨機生成唯一ID,經過測試最多有7個客戶端ID且不會重複,每個 Client 維護一個 lastRequestId,通過mathrand(len(KVServer))生成,表示每一次請求的 Seq 序列號 clientId。

  5. KVServer 通過維護了 lastRequestId,使得 Client 併發呼叫時,能通過最新的 RequestId, 得到最新的結果,保證應用程式的強一致性,這個強一致性通過定時器實現一段時間內(500ms)的分散式資料強一致性。

請求和響應流程:

請求響應流程,以Put/Get為例子:

  1. KVServer 收到 Client 的Request請求後,通過raft.Start() 提交Op給raft 庫, 然後通過Chan機制,等待Raft 返回結果到 waitApplyCh, 也就是等待Raft應用日誌到狀態機後,才通過給chan緩衝區放入響應資料來響應給KVServer。
  2. 在Raft的所有peer 進行 apply當前請求的命令Op後, 每個Server會在單獨的執行緒ApplyLoop 中等待不斷到來的ApplyCh中的Op,直到 ApplyCh 緩衝區得到 Raft 的響應。
  3. Raft庫 執行這個Op(GET全部執行,重複的PUT, APPEND不執行)
  4. Leader 等待Apply Loop 完成,之後根據Op中的資訊將Raft庫中的執行結果返回給Wait Channel , 中才有Wait Channel 在等結果
  5. 最後Leader將執行結果封裝後返回給Client

具體請求流程

  1. go kv.ReadRaftApplyCommandLoop()這個Loop裡,監聽讀取KVServer的applyCh
  2. 通過在KVServer裡的 applyCh chan raft.ApplyMsg ,藉助管道chan的機制,實現寫阻塞,也就是實現了在請求響應過程中,節點 peers監聽一個chan 管道,管道接收到,才能觸發接下來的操作。
  3. 只有Leader負責接受client的請求,才能觸發以上條件,follower不主動觸發,只能等待被leader同步
  4. 接受到請求後,Leader判定條件是否準確,正確則交由Raft#Start(Op)方法,接下來就阻塞等待方法的回撥,等待結果返回。
  5. 根據需要設定 WaitChan 等待結果,同時設定Timeout,用來判定是否響應超時
  6. 之後Raft會將Leader的 Op 執行結果同步給所有Follower,ApplyEntry 同步到上層的每個KVServer
  7. 根據管道的返回時間是否超時,判定是否在強一致性的時間內能否得到響應
    1. 如果超時:
      1. 會先進行ifRequestDuplicate() 判斷RequestId是否過時,依然是最新的RequestId 則從Leader的狀態機中執行 Op 命令,返回本地日誌執行命令後得到的結果
    2. 不超時,表示在一致性的有效時間內,只需要判斷Raft響應的clientId和RequestId是否相同,即是否是最新的請求,是則表明KVServer的KVDB中有Op.key的最新資料,保證了資料的強一致性。
  8. 執行完Get或Append後,最後要刪除管道的raftIndex對應的Op

請求阻塞問題

通過KVServer中的time.After 實現阻塞超時、重發。

因為無論是waitChan 還是labrpc中的Call方法,都沒有“回撥超時”的概念,會阻塞在哪裡。

所以需要在Server端(或client端)實現計時器超時機制,避免無限阻塞。

重複請求問題

  • Lab3A核心功能之一就是處理請求重複問題,即Duplicate Request ,實現保證一個重複的請求不會在同一個狀態機上被執行兩次,每個請求對應唯一的ClientID:RequestID
  • 對於KVServer收到Client端的請求,無論是否重複,我們都提交給Raft作為它的log. 而KVServer 通過 kv.ifRequestDuplicate方法 負責在接受apply log時判定這個log代指的Request Op是不是重複的,是重複的,我們就不在狀態機上執行,直接返回OK即可,只需要在Put方法裡考慮該問題就行,讀操作不影響狀態機的資料。

路由-負載均衡問題

Clinet . servers[] 的序號和KVServers.me 不是一 一對應的,而是隨機shuffle過的。 但是KVServer.me 和 Raft.me 是對應的。 這就導致了Client 傳送請求到一個不是LEADER的KVServer, 這個KVServer可以拿到raft.leader的序號並傳回Client, 但Client 並不能通過client.servers[raft.leader]來找到真正的Leader, 還是要隨機訪問另一個。所以說我們收到ErrWrongLeader時候,只要再隨機訪問下一個KVServer即可

ck.RecentLeaderId = GetRandomServer(len(ck.Servers))
server := ck.RecentLeaderId
for{
   // RPC請求KVServer的Get方法, 成功則返回leaderId
    ok := ck.Servers[server].Call("KVServer.Get", &args, &reply)
    // 換下一個Server,重試,直到 OK or Error
    if !ok || reply.Err == ErrWrongLeader {
    // LeaderId
    server = (server + 1) % len(ck.Servers)
    continue
	...
}

快照

Snapshot 快照其實就是Server 維護的KeyValue資料庫,可以看作是記憶體中一個map

對於Leader:

// 迴圈讀取Raft已經應用的日誌條目命令得到的迴應
func (kv *KVServer) ReadRaftApplyCommandLoop() {
	for message := range kv.applyCh {
		if message.CommandValid {
			kv.GetCommandFromRaft(message)
		}
		if message.SnapshotValid {
			kv.GetSnapshotFromRaft(message)
		}
	}
}
  • 在leader應用日誌後,message.CommandValid 為true 說明RaftState[] 在遞增,即有日誌被應用並且命令是有效的。

  • 那麼Leader會執行響應的Get或Put操作完成後,根據日誌條目閾值maxraftstate和當前日誌條目數量RaftStateSize判斷是否需要命令Raft進行Snapshot快照壓縮操作。

  • 如果需要,則呼叫MakeSnapshot方法,將自身的KVDB,RequestID等資訊製作成snapshot, 並呼叫Raft庫的Snapshot介面。

  • Leader 安裝Snapshot , 這分為三部分,修剪log Entries [] , SnapShot 通過Persister進行持久化儲存, 之後在Appendentries中將本次的SnapShot資訊傳送給落後的Follower

  • 最後返回執行結果給WaitChannel

對於Follower :

·		if message.SnapshotValid {
			kv.GetSnapshotFromRaft(message)
		}
  • 1 . Leader執行 InstallSnapshot 的RPC方法後,Raft 層會獲取 snapshot資料, 裁剪log, 通過ApplyCh上報給Server (此時SnapshotValid: true)
  • 2 . Follower的Applyloop 收到請求,呼叫CondInstallSnapshot() 來詢問是否可以安裝snapshot

    // 從Raft中獲取快照日誌
    func (kv *KVServer) GetSnapshotFromRaft(message raft.ApplyMsg) {
    	kv.mu.Lock()
    	defer kv.mu.Unlock()
    	if kv.rf.CondInstallSnapshot(message.SnapshotTerm, message.SnapshotIndex, message.Snapshot) {
    		// 追加快照日誌
    		kv.ReadSnapshotToInstall(message.Snapshot)
    		kv.lastSSPointRaftLogIndex = message.SnapshotIndex
    	}
    }
    
  • 3 . CondInstallSnapshot() 判定snapshot安裝條件,持久化snapshot, 並通知Server可以InstallSnapshot

核心程式碼

KVServer資料結構
type KVServer struct {
   mu sync.Mutex
   me int
   // 每個KVServer對應一個Raft
   rf      *raft.Raft
   applyCh chan raft.ApplyMsg
   dead    int32 // set by Kill()

   // 快照日誌中,最後日誌條目的State
   maxraftstate int // snapshot if log grows this big
   // Your definitions here.
   // 儲存put的資料,key : value
   kvDB map[string]string
   // index(Raft pper) -> chan
   waitApplyCh map[int]chan Op
   // clientId : requestId
   lastRequestId map[int64]int

   // last Snapshot point & raftIndex
   lastSSPointRaftLogIndex int
}

啟動KVServer

// 啟動KVServer
func StartKVServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int) *KVServer {
	// call labgob.Register on structures you want
	// Go's RPC library to marshall/unmarshall.
	DPrintf("[InitKVServer---]Server %d", me)
	// 註冊rpc伺服器
	labgob.Register(Op{})

	kv := new(KVServer)
	kv.me = me
	kv.maxraftstate = maxraftstate

	// You may need initialization code here.

	kv.applyCh = make(chan raft.ApplyMsg)
	kv.rf = raft.Make(servers, me, persister, kv.applyCh)

	// You may need initialization code here.
	// kv初始化
	kv.kvDB = make(map[string]string)
	kv.waitApplyCh = make(map[int]chan Op)
	kv.lastRequestId = make(map[int64]int)

	// 快照
	snapshot := persister.ReadSnapshot()
	if len(snapshot) > 0 {
		// 讀取快照日誌
		kv.ReadSnapshotToInstall(snapshot)
	}
	// 迴圈讀取Raft已經應用的日誌條目命令
	go kv.ReadRaftApplyCommandLoop()
	return kv
}

Put和Get

// RPC方法
func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
	// Your code here.
	if kv.killed() {
		reply.Err = ErrWrongLeader
		return
	}

	_, ifLeader := kv.rf.GetState()
	// RaftServer必須是Leader
	if !ifLeader {
		reply.Err = ErrWrongLeader
		return
	}

	op := Op{
		Operation: "get",
		Key:       args.Key,
		Value:     "",
		ClientId:  args.ClientId,
		RequestId: args.RequestId,
	}

	// 向Raft server 傳送命令
	raftIndex, _, _ := kv.rf.Start(op)
	DPrintf("[GET StartToRaft]From Client %d (Request %d) To Server %d, key %v, raftIndex %d", args.ClientId, args.RequestId, kv.me, op.Key, raftIndex)

	// waitForCh
	kv.mu.Lock()
	// chForRaftIndex為儲存Op的chan,raftIndex為Raft Server的LastLogIndex+1
	// 用於實現RPC呼叫Raft.Start時,儲存RPC返回的Op,通過 Raft Server 的 lastLogIndex獲取
	// 通過raft的lastLogIndex,就能得到該日誌條目儲存的value,並儲存到KVDB中
	chForRaftIndex, exist := kv.waitApplyCh[raftIndex]
	// Loop Apply ,技術上要求線性化
	// 不存在該記錄,表明呼叫還未返回結果,則繼續等待呼叫返回
	if !exist {
		kv.waitApplyCh[raftIndex] = make(chan Op, 1)
		chForRaftIndex = kv.waitApplyCh[raftIndex]
	}
	// RPC呼叫完成
	kv.mu.Unlock()

	// Timeout
	select {
	// 超過一致性要求的時間,則需要通過lastRequestId,從KVDB中獲取結果
	case <-time.After(time.Millisecond * CONSENSUS_TIMEOUT):
		DPrintf("[GET TIMEOUT!!!]From Client %d (Request %d) To Server %d, key %v, raftIndex %d", args.ClientId, args.RequestId, kv.me, op.Key, raftIndex)
		_, ifLeader := kv.rf.GetState()

		// 該client的最新RequestId是否是newRequestId,不是,則返回最新RequestId
		// 該步驟保證了client併發呼叫KVServer時,根據最新的RequestId,得到最新的結果
		if kv.ifRequestDuplicate(op.ClientId, op.RequestId) && ifLeader {
			// 根據命令獲取該client最新RequestId得到並儲存在KVDB的value
			value, exist := kv.ExecuteGetOpOnKVDB(op)
			if exist {
				reply.Err = OK
				reply.Value = value
			} else {
				reply.Err = ErrNoKey
				reply.Value = ""
			}
		} else {
			reply.Err = ErrWrongLeader
		}

	// 在一致性的有效時間內:
	case raftCommitOp := <-chForRaftIndex:
		DPrintf("[WaitChanGetRaftApplyMessage<--]Server %d , get Command <-- Index:%d , ClientId %d, RequestId %d, Opreation %v, Key :%v, Value :%v", kv.me, raftIndex, op.ClientId, op.RequestId, op.Operation, op.Key, op.Value)
		// 該已提交到Raft的RPC請求,是本次的Op命令
		if raftCommitOp.ClientId == op.ClientId &&
			raftCommitOp.RequestId == op.RequestId {
			// 則從KVServer的Map直接獲取value
			value, exist := kv.ExecuteGetOpOnKVDB(op)
			if exist {
				reply.Err = OK
				reply.Value = value
			} else {
				reply.Err = ErrNoKey
				reply.Value = ""
			}
		} else {
			reply.Err = ErrWrongLeader
		}
	}

	kv.mu.Lock()
	// Get結束後,刪除chan map中raftIndex對應的Op
	delete(kv.waitApplyCh, raftIndex)
	kv.mu.Unlock()
	return
}

Put方法

// RPC方法
func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
	// Your code here.
	if kv.killed() {
		reply.Err = ErrWrongLeader
		return
	}

	_, ifLeader := kv.rf.GetState()
	// RaftServer必須是Leader
	if !ifLeader {
		reply.Err = ErrWrongLeader
		return
	}

	op := Op{
		Operation: args.Op,
		Key:       args.Key,
		Value:     args.Value,
		ClientId:  args.ClientId,
		RequestId: args.RequestId,
	}

	// 向Raft server 傳送命令
	raftIndex, _, _ := kv.rf.Start(op)
	DPrintf("[PUTAPPEND StartToRaft]From Client %d (Request %d) To Server %d, key %v, raftIndex %d", args.ClientId, args.RequestId, kv.me, op.Key, raftIndex)

	// waitForCh
	kv.mu.Lock()
	// chForRaftIndex為儲存Op的chan,raftIndex為Raft Server的LastLogIndex+1
	// 用於實現RPC呼叫Raft.Start時,儲存RPC返回的Op,通過 Raft Server 的 lastLogIndex獲取
	// 通過raft的lastLogIndex,就能得到該日誌條目儲存的value,並儲存到KVDB中
	chForRaftIndex, exist := kv.waitApplyCh[raftIndex]
	// Loop Apply ,技術上要求線性化
	// 不存在該記錄,表明呼叫還未返回結果,則繼續等待呼叫返回
	if !exist {
		kv.waitApplyCh[raftIndex] = make(chan Op, 1)
		chForRaftIndex = kv.waitApplyCh[raftIndex]
	}
	// RPC呼叫完成
	kv.mu.Unlock()

	// Timeout
	select {
	// 超過一致性要求的時間,則需要通過lastRequestId,從KVDB中獲取結果
	case <-time.After(time.Millisecond * CONSENSUS_TIMEOUT):
		DPrintf("[TIMEOUT PUTAPPEND !!!!]Server %d , get Command <-- Index:%d , ClientId %d, RequestId %d, Opreation %v, Key :%v, Value :%v", kv.me, raftIndex, op.ClientId, op.RequestId, op.Operation, op.Key, op.Value)

		// 該client的最新RequestId是否是newRequestId,不是,則返回最新RequestId
		// 該步驟保證了client併發呼叫KVServer時,根據最新的RequestId,得到最新的結果
		if kv.ifRequestDuplicate(op.ClientId, op.RequestId) {
			reply.Err = OK
		} else {
			reply.Err = ErrWrongLeader
		}

	// 在一致性的有效時間內:
	case raftCommitOp := <-chForRaftIndex:
		DPrintf("[WaitChanGetRaftApplyMessage<--]Server %d , get Command <-- Index:%d , ClientId %d, RequestId %d, Opreation %v, Key :%v, Value :%v", kv.me, raftIndex, op.ClientId, op.RequestId, op.Operation, op.Key, op.Value)

		// 該已提交到Raft的RPC請求,是本次的Op命令
		if raftCommitOp.ClientId == op.ClientId &&
			raftCommitOp.RequestId == op.RequestId {
			reply.Err = OK
		} else {
			reply.Err = ErrWrongLeader
		}
	}

	kv.mu.Lock()
	delete(kv.waitApplyCh, raftIndex)
	kv.mu.Unlock()
	return
}