2020 6.824 的 Raft Lab 4A
技術標籤:6.824
目錄
前言
做2020的MIT6.824,完成了實驗 Lab4A,通過了測試,對於之前的Raft實現的實驗請參考Lab 2A, Lab 2B 和 Lab 2C 和 Lab 3A 以及 Lab 3B
Lab4A主要是做DB的分片,也就是Client向Server請求,Server根據不同的Request型別,向不同的DB獲取/更改資料。實驗重點需要完成master對DB server的Join/Leave/Move的三個操作,並且實驗Client對Server的configs查詢的操作。
整個實驗有三個點需要注意
- load balance的演算法
- config需要拷貝而不是引用
- labgob需要註冊JoinArgs,LeaveArgs,MoveArgs 和 QueryArgs
一、Overview
1.1 架構圖
實驗是通過資料庫分片來提升效能的,Master負責資料庫的分片,大概架構如下
1.2 架構細節
第一,對於本次分片來說,最多可以有10個Group,這裡Common檔案可以看出來
// The number of shards.
const NShards = 10
第二,對於Master來說,也是有多個servers的,所以需要通過Raft保證shards configuration的一致性,從config可以看出來,一個有三個屬性需要維護的,其中Shards就是分片configuration, 初始是[0,0,0,0,0,0,0,0,0,0],Groups就是對應每個shard分片中Raft servers的information,比如Group1 -> server[a, b, c]
type Config struct {
Num int // config number
Shards [NShards]int // shard -> gid
Groups map[int][]string // gid -> servers[]
}
舉個例子,shards初始為[0,0,0,0,0,0,0,0,0,0],那麼Join了Group1 -> servers[a, b, c] 之後,整個系統就有1個Group了,那麼,shards就會變成[1,1,1,1,1,1,1,1,1,1],如果JoinGroup2 -> servers[e, f, g]之後,整個系統就有2個groups了,那麼,10個shards就需要儘量平均分配給兩個Groups,也就是[1,1,1,1,1,2,2,2,2,2]
這裡也就是涉及到了load balance的演算法,我的演算法實現參考了 網上 的方法,並沒有用到高階的資料結構比如heap來實現
二、client
這個跟kvraft中client的實現非常相似,基本搬過來就行
其中,Clerk的建構函式是
type Clerk struct {
servers []*labrpc.ClientEnd
mu sync.Mutex
leaderId int
clientId int64
sequenceId int64
// Your data here.
}
func MakeClerk(servers []*labrpc.ClientEnd) *Clerk {
ck := new(Clerk)
ck.servers = servers
// Your code here.
ck.clientId = nrand()
return ck
}
增加一些新的屬性, 主要就是ClientId很SequenceId
type JoinArgs struct {
Servers map[int][]string // new GID -> servers mappings
ClientId int64
SequenceId int64
}
type LeaveArgs struct {
GIDs []int
ClientId int64
SequenceId int64
}
type MoveArgs struct {
Shard int
GID int
ClientId int64
SequenceId int64
}
type QueryArgs struct {
Num int // desired config number
ClientId int64
SequenceId int64
}
Join/Move/Leave/Query 的實現,對於Join/Move/Leave 基本都是一樣的
func (ck *Clerk) Query(num int) Config {
args := &QueryArgs{}
// Your code here.
args.Num = num
args.ClientId = ck.clientId
args.SequenceId = atomic.AddInt64(&ck.sequenceId, 1)
for {
reply := QueryReply{}
if ck.servers[ck.currentLeader()].Call("ShardMaster.Query", args, &reply) && !reply.WrongLeader {
return reply.Config
}
ck.leaderId = ck.changeLeader()
time.Sleep(100 * time.Millisecond)
}
}
func (ck *Clerk) Join(servers map[int][]string) {
args := &JoinArgs{}
args.Servers = servers
args.ClientId = ck.clientId
args.SequenceId = atomic.AddInt64(&ck.sequenceId, 1)
for {
reply := JoinReply{}
if ck.servers[ck.currentLeader()].Call("ShardMaster.Join", args, &reply) && !reply.WrongLeader {
return
}
ck.leaderId = ck.changeLeader()
time.Sleep(100 * time.Millisecond)
}
}
三、Master server
3.1 屬性
跟kvRaft的server還是很類似的,需要有兩個mapper,分別是requestMapper和sequenceMapper
type ShardMaster struct {
mu sync.Mutex
me int
rf *raft.Raft
applyCh chan raft.ApplyMsg
// Your data here.
dead int32
configs []Config // indexed by config num
requestMapper map[int]chan Op
sequenceMapper map[int64]int64
}
type Op struct {
// Your data here.
SequenceId int64
ClientId int64
OpType string
OpArgs interface{}
Index int
Term int
}
type joinLeaveMoveReply struct {
WrongLeader bool
Err Err
}
const (
JOIN string = "Join"
LEAVE string = "Leave"
MOVE string = "Move"
QUERY string = "Query"
)
3.2 建構函式
這裡有個很重要的地方,那就是需要向labgob Register 任何自定義的STRUCT 否則會出現interface{ } is nil 的報錯
func StartServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister) *ShardMaster {
sm := new(ShardMaster)
sm.me = me
sm.configs = make([]Config, 1)
sm.configs[0].Groups = map[int][]string{}
labgob.Register(Op{})
labgob.Register(JoinArgs{})
labgob.Register(LeaveArgs{})
labgob.Register(MoveArgs{})
labgob.Register(QueryArgs{})
sm.applyCh = make(chan raft.ApplyMsg)
sm.rf = raft.Make(servers, me, persister, sm.applyCh)
// Your code here.
sm.sequenceMapper = make(map[int64]int64)
sm.requestMapper = make(map[int]chan Op)
go sm.serverMonitor()
return sm
}
3.3 Join/Move/Leave
對於Join/Move/Leave 通過Raft保證一致性,實現基本都是一樣的,就是
- rf.Start通知raft
- 等待讓她applyMsg的通知,對比term,如果一致則返回success
func (sm *ShardMaster) Join(args *JoinArgs, reply *JoinReply) {
var isLeader bool
clientOp := Op{OpType: JOIN, OpArgs: *args, SequenceId: args.SequenceId, ClientId: args.ClientId}
clientOp.Index, clientOp.Term, isLeader = sm.rf.Start(clientOp)
if !isLeader {
reply.WrongLeader = true
return
}
joinLeaveMoveReply := sm.joinLeaveMove(clientOp)
reply.WrongLeader, reply.Err = joinLeaveMoveReply.WrongLeader, joinLeaveMoveReply.Err
}
func (sm *ShardMaster) Leave(args *LeaveArgs, reply *LeaveReply) {
var isLeader bool
clientOp := Op{OpType: LEAVE, OpArgs: *args, SequenceId: args.SequenceId, ClientId: args.ClientId}
clientOp.Index, clientOp.Term, isLeader = sm.rf.Start(clientOp)
if !isLeader {
reply.WrongLeader = true
return
}
joinLeaveMoveReply := sm.joinLeaveMove(clientOp)
reply.WrongLeader, reply.Err = joinLeaveMoveReply.WrongLeader, joinLeaveMoveReply.Err
}
func (sm *ShardMaster) Move(args *MoveArgs, reply *MoveReply) {
var isLeader bool
clientOp := Op{OpType: MOVE, OpArgs: *args, SequenceId: args.SequenceId, ClientId: args.ClientId}
clientOp.Index, clientOp.Term, isLeader = sm.rf.Start(clientOp)
if !isLeader {
reply.WrongLeader = true
return
}
joinLeaveMoveReply := sm.joinLeaveMove(clientOp)
reply.WrongLeader, reply.Err = joinLeaveMoveReply.WrongLeader, joinLeaveMoveReply.Err
}
func (sm *ShardMaster) joinLeaveMove(clientOp Op) joinLeaveMoveReply {
reply := joinLeaveMoveReply{}
ch := sm.getChannel(clientOp.Index)
defer func() {
sm.mu.Lock()
delete(sm.requestMapper, clientOp.Index)
sm.mu.Unlock()
}()
timer := time.NewTicker(2000 * time.Millisecond)
defer timer.Stop()
select {
case op := <-ch:
sm.mu.Lock()
opTerm := op.Term
sm.mu.Unlock()
if clientOp.Term != opTerm {
reply.WrongLeader = true
} else {
reply.Err = OK
}
case <-timer.C:
reply.WrongLeader = true
}
return reply
}
3.4 Query
query的實現return的內容稍微多了一點,那就是需要return config,根據要求,query為-1或者大於configs長度的都返回最新的config,否則返回對應num的config
func (sm *ShardMaster) Query(args *QueryArgs, reply *QueryReply) {
// Your code here.
DPrintf("[%v] Query is called", sm.me)
var isLeader bool
clientOp := Op{OpType: QUERY, OpArgs: *args, SequenceId: args.SequenceId, ClientId: args.ClientId}
clientOp.Index, clientOp.Term, isLeader = sm.rf.Start(clientOp)
if !isLeader {
reply.WrongLeader = true
return
}
DPrintf("Query [%v] leader is found", sm.me)
ch := sm.getChannel(clientOp.Index)
defer func() {
sm.mu.Lock()
delete(sm.requestMapper, clientOp.Index)
sm.mu.Unlock()
}()
timer := time.NewTicker(2000 * time.Millisecond)
defer timer.Stop()
select {
case op := <-ch:
DPrintf("[%v] QUERY receive op", sm.me)
sm.mu.Lock()
opTerm := op.Term
sm.mu.Unlock()
DPrintf("[%v] QUERY clientOp.Term[%v] vs opTerm[%v]", sm.me, clientOp.Term, opTerm)
if clientOp.Term != opTerm {
reply.WrongLeader = true
} else {
DPrintf("[%v] QUERY args.Num=%v sm.config=%v", sm.me, args.Num, sm.configs)
sm.mu.Lock()
reply.Err = OK
if args.Num >= 0 && args.Num < len(sm.configs) {
reply.Config = sm.configs[args.Num]
} else {
reply.Config = sm.configs[len(sm.configs)-1]
}
sm.mu.Unlock()
}
case <-timer.C:
reply.WrongLeader = true
}
}
3.5 serverMonitor
這個函式是用來監視Raft的applyCh,如果是Join或者是Leave,需要重新平衡Shards中Groups的分配,其中addNewConfig是很重要的,這是用來避免map的引用帶來的error
func (sm *ShardMaster) serverMonitor() {
for {
if sm.killed() {
return
}
select {
case msg := <-sm.applyCh:
if msg.IsSnapshot || !msg.CommandValid {
continue
}
index := msg.CommandIndex
term := msg.CommandTerm
op := msg.Command.(Op)
sm.mu.Lock()
sequenceInMapper, hasSequence := sm.sequenceMapper[op.ClientId]
op.Term = term
if !hasSequence || op.SequenceId > sequenceInMapper {
switch op.OpType {
case JOIN:
newConfig := sm.addNewConfig()
joinArgs := op.OpArgs.(JoinArgs)
for i, servers := range joinArgs.Servers {
newConfig.Groups[i] = servers
sm.balanceLoad(&newConfig, i, JOIN)
}
sm.configs = append(sm.configs, newConfig)
case LEAVE:
newConfig := sm.addNewConfig()
leaveArgs := op.OpArgs.(LeaveArgs)
for _, gid := range leaveArgs.GIDs {
delete(newConfig.Groups, gid)
sm.balanceLoad(&newConfig, gid, LEAVE)
}
sm.configs = append(sm.configs, newConfig)
case MOVE:
newConfig := sm.addNewConfig()
moveArgs := op.OpArgs.(MoveArgs)
if _, isExists := newConfig.Groups[moveArgs.GID]; isExists {
newConfig.Shards[moveArgs.Shard] = moveArgs.GID
} else {
return
}
sm.configs = append(sm.configs, newConfig)
}
sm.sequenceMapper[op.ClientId] = op.SequenceId
}
sm.mu.Unlock()
sm.getChannel(index) <- op
}
}
}
func (sm *ShardMaster) addNewConfig() Config {
lastConfig := sm.configs[len(sm.configs)-1]
nextConfig := Config{Num: lastConfig.Num + 1, Shards: lastConfig.Shards, Groups: make(map[int][]string)}
for gid, servers := range lastConfig.Groups {
nextConfig.Groups[gid] = append([]string{}, servers...)
}
return nextConfig
}
3.6 Load balance
對於Join來說,就是把多數的GroupId換成新的GroupId
比如,開始是[1,1,1,1,1,2,2,2,2,2],然後又groupId=3加入,那麼新增流程就是
[1,1,1,1,1,2,2,2,2,2] ->
[3,1,1,1,1,2,2,2,2,2] ->
[3,1,1,1,1,3,2,2,2,2] ->
[3,3,1,1,1,3,2,2,2,2]
Leave則是逆向操作,比如開始是[3,3,1,1,1,3,2,2,2,2], 然後需要把3撤走,那麼撤走流程是
[3,3,1,1,1,3,2,2,2,2] ->
[1,3,1,1,1,3,2,2,2,2] ->
[1,1,1,1,1,3,2,2,2,2] ->
[1,1,1,1,1,2,2,2,2,2]
func (sm *ShardMaster) balanceLoad(c *Config, gid int, request string) {
shardsMap := groupByGid(c)
switch request {
case JOIN:
totalGroups := len(c.Groups)
newShardNum := NShards / totalGroups
for i := 0; i < newShardNum; i++ {
maxGid := getMaxShardGid(shardsMap)
c.Shards[shardsMap[maxGid][0]] = gid
shardsMap[maxGid] = shardsMap[maxGid][1:]
}
case LEAVE:
shardsList, isExists := shardsMap[gid]
if !isExists {
return
}
delete(shardsMap, gid)
if len(c.Groups) == 0 {
c.Shards = [NShards]int{}
return
}
for _, value := range shardsList {
minGid := getMinShardGid(shardsMap)
c.Shards[value] = minGid
shardsMap[minGid] = append(shardsMap[minGid], value)
}
}
}
helper functions, 這是把shards做group操作,變成map[gid] -> shards[]
比如 [3,3,1,1,1,3,2,2,2,2] 就會變成
map[1] -> [2,3,4]
map[2] ->[6,7,8,9]
map[3] ->[0,1]
func groupByGid(c *Config) map[int][]int {
shardsMap := map[int][]int{}
for k, _ := range c.Groups {
shardsMap[k] = []int{}
}
for index, gid := range c.Shards {
shardsMap[gid] = append(shardsMap[gid], index)
}
return shardsMap
}
func getMaxShardGid(shardsMap map[int][]int) int {
max := -1
gid := -1
for index, shards := range shardsMap {
if max < len(shards) {
max = len(shards)
gid = index
}
}
return gid
}
func getMinShardGid(shardsMap map[int][]int) int {
min := NShards
gid := -1
for index, shards := range shardsMap {
if min > len(shards) {
min = len(shards)
gid = index
}
}
return gid
}
四、總結
本實驗難點在於完成load balance演算法,推薦使用heap,這樣應該是複雜度最低的