1. 程式人生 > 其它 >2020 6.824 的 Raft Lab 4A

2020 6.824 的 Raft Lab 4A

技術標籤:6.824

目錄

前言

做2020的MIT6.824,完成了實驗 Lab4A,通過了測試,對於之前的Raft實現的實驗請參考Lab 2ALab 2BLab 2CLab 3A 以及 Lab 3B

Lab4A主要是做DB的分片,也就是Client向Server請求,Server根據不同的Request型別,向不同的DB獲取/更改資料。實驗重點需要完成master對DB server的Join/Leave/Move的三個操作,並且實驗Client對Server的configs查詢的操作。

整個實驗有三個點需要注意

  1. load balance的演算法
  2. config需要拷貝而不是引用
  3. labgob需要註冊JoinArgs,LeaveArgs,MoveArgs 和 QueryArgs

一、Overview

1.1 架構圖

實驗是通過資料庫分片來提升效能的,Master負責資料庫的分片,大概架構如下
在這裡插入圖片描述

1.2 架構細節

第一,對於本次分片來說,最多可以有10個Group,這裡Common檔案可以看出來

// The number of shards.
const NShards = 10

第二,對於Master來說,也是有多個servers的,所以需要通過Raft保證shards configuration的一致性,從config可以看出來,一個有三個屬性需要維護的,其中Shards就是分片configuration, 初始是[0,0,0,0,0,0,0,0,0,0],Groups就是對應每個shard分片中Raft servers的information,比如Group1 -> server[a, b, c]

type Config struct {
	Num    int              // config number
	Shards [NShards]int     // shard -> gid
	Groups map[int][]string // gid -> servers[]
}

舉個例子,shards初始為[0,0,0,0,0,0,0,0,0,0],那麼Join了Group1 -> servers[a, b, c] 之後,整個系統就有1個Group了,那麼,shards就會變成[1,1,1,1,1,1,1,1,1,1],如果JoinGroup2 -> servers[e, f, g]之後,整個系統就有2個groups了,那麼,10個shards就需要儘量平均分配給兩個Groups,也就是[1,1,1,1,1,2,2,2,2,2]

這裡也就是涉及到了load balance的演算法,我的演算法實現參考了 網上 的方法,並沒有用到高階的資料結構比如heap來實現


二、client

這個跟kvraft中client的實現非常相似,基本搬過來就行

其中,Clerk的建構函式

type Clerk struct {
	servers []*labrpc.ClientEnd

	mu         sync.Mutex
	leaderId   int
	clientId   int64
	sequenceId int64

	// Your data here.
}
func MakeClerk(servers []*labrpc.ClientEnd) *Clerk {
	ck := new(Clerk)
	ck.servers = servers
	// Your code here.
	ck.clientId = nrand()
	return ck
}

增加一些新的屬性, 主要就是ClientId很SequenceId

type JoinArgs struct {
	Servers    map[int][]string // new GID -> servers mappings
	ClientId   int64
	SequenceId int64
}

type LeaveArgs struct {
	GIDs       []int
	ClientId   int64
	SequenceId int64
}

type MoveArgs struct {
	Shard      int
	GID        int
	ClientId   int64
	SequenceId int64
}

type QueryArgs struct {
	Num        int // desired config number
	ClientId   int64
	SequenceId int64
}

Join/Move/Leave/Query 的實現,對於Join/Move/Leave 基本都是一樣的

func (ck *Clerk) Query(num int) Config {
	args := &QueryArgs{}
	// Your code here.
	args.Num = num
	args.ClientId = ck.clientId
	args.SequenceId = atomic.AddInt64(&ck.sequenceId, 1)

	for {
		reply := QueryReply{}
		if ck.servers[ck.currentLeader()].Call("ShardMaster.Query", args, &reply) && !reply.WrongLeader {
			return reply.Config
		}
		ck.leaderId = ck.changeLeader()
		time.Sleep(100 * time.Millisecond)
	}
}

func (ck *Clerk) Join(servers map[int][]string) {
	args := &JoinArgs{}
	args.Servers = servers
	args.ClientId = ck.clientId
	args.SequenceId = atomic.AddInt64(&ck.sequenceId, 1)

	for {
		reply := JoinReply{}
		if ck.servers[ck.currentLeader()].Call("ShardMaster.Join", args, &reply) && !reply.WrongLeader {
			return
		}
		ck.leaderId = ck.changeLeader()
		time.Sleep(100 * time.Millisecond)
	}
}


三、Master server

3.1 屬性

跟kvRaft的server還是很類似的,需要有兩個mapper,分別是requestMapper和sequenceMapper


type ShardMaster struct {
	mu      sync.Mutex
	me      int
	rf      *raft.Raft
	applyCh chan raft.ApplyMsg

	// Your data here.
	dead    int32
	configs []Config // indexed by config num

	requestMapper  map[int]chan Op
	sequenceMapper map[int64]int64
}

type Op struct {
	// Your data here.
	SequenceId int64
	ClientId   int64
	OpType     string
	OpArgs     interface{}
	Index      int
	Term       int
}

type joinLeaveMoveReply struct {
	WrongLeader bool
	Err         Err
}

const (
	JOIN  string = "Join"
	LEAVE string = "Leave"
	MOVE  string = "Move"
	QUERY string = "Query"
)

3.2 建構函式

這裡有個很重要的地方,那就是需要向labgob Register 任何自定義的STRUCT 否則會出現interface{ } is nil 的報錯

func StartServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister) *ShardMaster {
	sm := new(ShardMaster)
	sm.me = me

	sm.configs = make([]Config, 1)
	sm.configs[0].Groups = map[int][]string{}

	labgob.Register(Op{})
	labgob.Register(JoinArgs{})
	labgob.Register(LeaveArgs{})
	labgob.Register(MoveArgs{})
	labgob.Register(QueryArgs{})
	sm.applyCh = make(chan raft.ApplyMsg)
	sm.rf = raft.Make(servers, me, persister, sm.applyCh)

	// Your code here.
	sm.sequenceMapper = make(map[int64]int64)
	sm.requestMapper = make(map[int]chan Op)

	go sm.serverMonitor()

	return sm
}

3.3 Join/Move/Leave

對於Join/Move/Leave 通過Raft保證一致性,實現基本都是一樣的,就是

  1. rf.Start通知raft
  2. 等待讓她applyMsg的通知,對比term,如果一致則返回success
func (sm *ShardMaster) Join(args *JoinArgs, reply *JoinReply) {
	var isLeader bool
	clientOp := Op{OpType: JOIN, OpArgs: *args, SequenceId: args.SequenceId, ClientId: args.ClientId}
	clientOp.Index, clientOp.Term, isLeader = sm.rf.Start(clientOp)
	if !isLeader {
		reply.WrongLeader = true
		return
	}
	joinLeaveMoveReply := sm.joinLeaveMove(clientOp)
	reply.WrongLeader, reply.Err = joinLeaveMoveReply.WrongLeader, joinLeaveMoveReply.Err
}

func (sm *ShardMaster) Leave(args *LeaveArgs, reply *LeaveReply) {
	var isLeader bool
	clientOp := Op{OpType: LEAVE, OpArgs: *args, SequenceId: args.SequenceId, ClientId: args.ClientId}
	clientOp.Index, clientOp.Term, isLeader = sm.rf.Start(clientOp)
	if !isLeader {
		reply.WrongLeader = true
		return
	}
	joinLeaveMoveReply := sm.joinLeaveMove(clientOp)
	reply.WrongLeader, reply.Err = joinLeaveMoveReply.WrongLeader, joinLeaveMoveReply.Err
}

func (sm *ShardMaster) Move(args *MoveArgs, reply *MoveReply) {
	var isLeader bool
	clientOp := Op{OpType: MOVE, OpArgs: *args, SequenceId: args.SequenceId, ClientId: args.ClientId}
	clientOp.Index, clientOp.Term, isLeader = sm.rf.Start(clientOp)
	if !isLeader {
		reply.WrongLeader = true
		return
	}
	joinLeaveMoveReply := sm.joinLeaveMove(clientOp)
	reply.WrongLeader, reply.Err = joinLeaveMoveReply.WrongLeader, joinLeaveMoveReply.Err
}

func (sm *ShardMaster) joinLeaveMove(clientOp Op) joinLeaveMoveReply {
	reply := joinLeaveMoveReply{}
	ch := sm.getChannel(clientOp.Index)
	defer func() {
		sm.mu.Lock()
		delete(sm.requestMapper, clientOp.Index)
		sm.mu.Unlock()
	}()

	timer := time.NewTicker(2000 * time.Millisecond)
	defer timer.Stop()
	select {
	case op := <-ch:
		sm.mu.Lock()
		opTerm := op.Term
		sm.mu.Unlock()
		if clientOp.Term != opTerm {
			reply.WrongLeader = true
		} else {
			reply.Err = OK
		}
	case <-timer.C:
		reply.WrongLeader = true
	}
	return reply
}

3.4 Query

query的實現return的內容稍微多了一點,那就是需要return config,根據要求,query為-1或者大於configs長度的都返回最新的config,否則返回對應num的config

func (sm *ShardMaster) Query(args *QueryArgs, reply *QueryReply) {
	// Your code here.
	DPrintf("[%v] Query is called", sm.me)
	var isLeader bool
	clientOp := Op{OpType: QUERY, OpArgs: *args, SequenceId: args.SequenceId, ClientId: args.ClientId}
	clientOp.Index, clientOp.Term, isLeader = sm.rf.Start(clientOp)
	if !isLeader {
		reply.WrongLeader = true
		return
	}

	DPrintf("Query [%v] leader is found", sm.me)

	ch := sm.getChannel(clientOp.Index)
	defer func() {
		sm.mu.Lock()
		delete(sm.requestMapper, clientOp.Index)
		sm.mu.Unlock()
	}()

	timer := time.NewTicker(2000 * time.Millisecond)
	defer timer.Stop()
	select {
	case op := <-ch:
		DPrintf("[%v] QUERY receive op", sm.me)
		sm.mu.Lock()
		opTerm := op.Term
		sm.mu.Unlock()
		DPrintf("[%v] QUERY clientOp.Term[%v] vs opTerm[%v]", sm.me, clientOp.Term, opTerm)
		if clientOp.Term != opTerm {
			reply.WrongLeader = true
		} else {
			DPrintf("[%v] QUERY args.Num=%v sm.config=%v", sm.me, args.Num, sm.configs)
			sm.mu.Lock()
			reply.Err = OK
			if args.Num >= 0 && args.Num < len(sm.configs) {
				reply.Config = sm.configs[args.Num]
			} else {
				reply.Config = sm.configs[len(sm.configs)-1]
			}
			sm.mu.Unlock()
		}
	case <-timer.C:
		reply.WrongLeader = true
	}
}

3.5 serverMonitor

這個函式是用來監視Raft的applyCh,如果是Join或者是Leave,需要重新平衡Shards中Groups的分配,其中addNewConfig是很重要的,這是用來避免map的引用帶來的error

func (sm *ShardMaster) serverMonitor() {
	for {
		if sm.killed() {
			return
		}
		select {
		case msg := <-sm.applyCh:
			if msg.IsSnapshot || !msg.CommandValid {
				continue
			}

			index := msg.CommandIndex
			term := msg.CommandTerm
			op := msg.Command.(Op)
			sm.mu.Lock()
			sequenceInMapper, hasSequence := sm.sequenceMapper[op.ClientId]
			op.Term = term
			if !hasSequence || op.SequenceId > sequenceInMapper {
				switch op.OpType {
				case JOIN:
					newConfig := sm.addNewConfig()
					joinArgs := op.OpArgs.(JoinArgs)
					for i, servers := range joinArgs.Servers {
						newConfig.Groups[i] = servers
						sm.balanceLoad(&newConfig, i, JOIN)
					}
					sm.configs = append(sm.configs, newConfig)
				case LEAVE:
					newConfig := sm.addNewConfig()
					leaveArgs := op.OpArgs.(LeaveArgs)
					for _, gid := range leaveArgs.GIDs {
						delete(newConfig.Groups, gid)
						sm.balanceLoad(&newConfig, gid, LEAVE)
					}
					sm.configs = append(sm.configs, newConfig)
				case MOVE:
					newConfig := sm.addNewConfig()
					moveArgs := op.OpArgs.(MoveArgs)
					if _, isExists := newConfig.Groups[moveArgs.GID]; isExists {
						newConfig.Shards[moveArgs.Shard] = moveArgs.GID
					} else {
						return
					}
					sm.configs = append(sm.configs, newConfig)
				}
				sm.sequenceMapper[op.ClientId] = op.SequenceId
			}
			sm.mu.Unlock()
			sm.getChannel(index) <- op
		}
	}
}

func (sm *ShardMaster) addNewConfig() Config {
	lastConfig := sm.configs[len(sm.configs)-1]
	nextConfig := Config{Num: lastConfig.Num + 1, Shards: lastConfig.Shards, Groups: make(map[int][]string)}
	for gid, servers := range lastConfig.Groups {
		nextConfig.Groups[gid] = append([]string{}, servers...)
	}
	return nextConfig
}

3.6 Load balance

對於Join來說,就是把多數的GroupId換成新的GroupId
比如,開始是[1,1,1,1,1,2,2,2,2,2],然後又groupId=3加入,那麼新增流程就是
[1,1,1,1,1,2,2,2,2,2] ->
[3,1,1,1,1,2,2,2,2,2] ->
[3,1,1,1,1,3,2,2,2,2] ->
[3,3,1,1,1,3,2,2,2,2]

Leave則是逆向操作,比如開始是[3,3,1,1,1,3,2,2,2,2], 然後需要把3撤走,那麼撤走流程是
[3,3,1,1,1,3,2,2,2,2] ->
[1,3,1,1,1,3,2,2,2,2] ->
[1,1,1,1,1,3,2,2,2,2] ->
[1,1,1,1,1,2,2,2,2,2]

func (sm *ShardMaster) balanceLoad(c *Config, gid int, request string) {
	shardsMap := groupByGid(c)
	switch request {
	case JOIN:
		totalGroups := len(c.Groups)
		newShardNum := NShards / totalGroups
		for i := 0; i < newShardNum; i++ {
			maxGid := getMaxShardGid(shardsMap)
			c.Shards[shardsMap[maxGid][0]] = gid
			shardsMap[maxGid] = shardsMap[maxGid][1:]
		}
	case LEAVE:
		shardsList, isExists := shardsMap[gid]
		if !isExists {
			return
		}
		delete(shardsMap, gid)
		if len(c.Groups) == 0 {
			c.Shards = [NShards]int{}
			return
		}
		for _, value := range shardsList {
			minGid := getMinShardGid(shardsMap)
			c.Shards[value] = minGid
			shardsMap[minGid] = append(shardsMap[minGid], value)
		}
	}
}

helper functions, 這是把shards做group操作,變成map[gid] -> shards[]
比如 [3,3,1,1,1,3,2,2,2,2] 就會變成
map[1] -> [2,3,4]
map[2] ->[6,7,8,9]
map[3] ->[0,1]

func groupByGid(c *Config) map[int][]int {
	shardsMap := map[int][]int{}
	for k, _ := range c.Groups {
		shardsMap[k] = []int{}
	}
	for index, gid := range c.Shards {
		shardsMap[gid] = append(shardsMap[gid], index)
	}

	return shardsMap
}

func getMaxShardGid(shardsMap map[int][]int) int {
	max := -1
	gid := -1
	for index, shards := range shardsMap {
		if max < len(shards) {
			max = len(shards)
			gid = index
		}
	}
	return gid
}

func getMinShardGid(shardsMap map[int][]int) int {
	min := NShards
	gid := -1
	for index, shards := range shardsMap {
		if min > len(shards) {
			min = len(shards)
			gid = index
		}
	}
	return gid
}

四、總結

本實驗難點在於完成load balance演算法,推薦使用heap,這樣應該是複雜度最低的