1. 程式人生 > >MIT6.824 Lab 3: Fault-tolerant Key/Value Service (1)

MIT6.824 Lab 3: Fault-tolerant Key/Value Service (1)

Introduction
  在這次實驗中,我們將使用Lab2的Raft庫來實現容錯的key-value儲存服務。
  儲存系統將由客戶端和key/value伺服器,每個key/value伺服器使用Raft節點。客戶端傳送Put(), Append()和Get()的RPCs 到key/value伺服器(kvraft),然後將這些RPC呼叫存入Raft的log中並按序執行。客戶端可以向任何kvraft伺服器傳送RPC請求,但是如果該伺服器不是Raft領導者或者請求失敗超時,則需要重新發送給另外一個。假如操作被committed到Raft的log中並應用到狀態機中,那麼它的結果需要反饋到客戶端。假如commit操作失敗(比如領導者被替換),客戶端必須重新發送請求。
  本次實驗分為2部分。在Part A,我們要實現key/value服務,無需考慮日誌長度。在Part B,我們需要考慮日誌長度,實現Snapshot。

Part A: Key/value service without log compaction
  該服務支援3種RPC呼叫:Put(key, value),Append(key, arg)和Get(key)。Put()函式替換key對應的value,Append(key, arg)函式將arg增加到key對應的value,Get()函式獲取key對應的value。Append值到不存在的key,就理解為Put函式。

具體實現
  與前面部落格一樣,我們先從測試程式碼出發,一步步實現具體函式。
  首先是TestBasic測試函式,該測試函式檢查在無Fault情況下單客戶端操作的正確性。其中涉及到了1個重要的測試函式GenericTest。

func GenericTest(t *testing.T, tag string, nclients int, unreliable bool, crash bool, partitions bool, maxraftstate int) {
    const nservers = 5
    cfg := make_config(t, tag, nservers, unreliable, maxraftstate)
    defer cfg.cleanup()

    ck := cfg.makeClient(cfg.All())

    done_partitioner := int32
(0) done_clients := int32(0) ch_partitioner := make(chan bool) clnts := make([]chan int, nclients) for i := 0; i < nclients; i++ { clnts[i] = make(chan int) } for i := 0; i < 3; i++ { // log.Printf("Iteration %v\n", i) atomic.StoreInt32(&done_clients, 0) atomic.StoreInt32(&done_partitioner, 0) go spawn_clients_and_wait(t, cfg, nclients, func(cli int, myck *Clerk, t *testing.T) { j := 0 defer func() { clnts[cli] <- j }() last := "" key := strconv.Itoa(cli) myck.Put(key, last) for atomic.LoadInt32(&done_clients) == 0 { if (rand.Int() % 1000) < 500 { nv := "x " + strconv.Itoa(cli) + " " + strconv.Itoa(j) + " y" // log.Printf("%d: client new append %v\n", cli, nv) myck.Append(key, nv) last = NextValue(last, nv) j++ } else { // log.Printf("%d: client new get %v\n", cli, key) v := myck.Get(key) if v != last { log.Fatalf("get wrong value, key %v, wanted:\n%v\n, got\n%v\n", key, last, v) } } } }) if partitions { // Allow the clients to perform some operations without interruption time.Sleep(1 * time.Second) go partitioner(t, cfg, ch_partitioner, &done_partitioner) } time.Sleep(5 * time.Second) atomic.StoreInt32(&done_clients, 1) // tell clients to quit atomic.StoreInt32(&done_partitioner, 1) // tell partitioner to quit if partitions { // log.Printf("wait for partitioner\n") <-ch_partitioner // reconnect network and submit a request. A client may // have submitted a request in a minority. That request // won't return until that server discovers a new term // has started. cfg.ConnectAll() // wait for a while so that we have a new term time.Sleep(electionTimeout) } if crash { // log.Printf("shutdown servers\n") for i := 0; i < nservers; i++ { cfg.ShutdownServer(i) } // Wait for a while for servers to shutdown, since // shutdown isn't a real crash and isn't instantaneous time.Sleep(electionTimeout) // log.Printf("restart servers\n") // crash and re-start all for i := 0; i < nservers; i++ { cfg.StartServer(i) } cfg.ConnectAll() } // log.Printf("wait for clients\n") for i := 0; i < nclients; i++ { // log.Printf("read from clients %d\n", i) j := <-clnts[i] if j < 10 { log.Printf("Warning: client %d managed to perform only %d put operations in 1 sec?\n", i, j) } key := strconv.Itoa(i) // log.Printf("Check %v for client %d\n", j, i) v := ck.Get(key) checkClntAppends(t, i, v, j) } if maxraftstate > 0 { // Check maximum after the servers have processed all client // requests and had time to checkpoint if cfg.LogSize() > 2*maxraftstate { t.Fatalf("logs were not trimmed (%v > 2*%v)", cfg.LogSize(), maxraftstate) } } } fmt.Printf(" ... Passed\n") }

  該函式接收7個引數,nclients表示併發客戶端個數,unreliable代表RPC呼叫的可靠性,crash表示是否發生節點down了的情況,partitions表示是否發生網路分割槽,maxraftstate代表log的最大長度。該函式中首選確定kvraft節點個數為5個,呼叫make_config函式來初始化kvraft系統。進入make_config函式可以發現與之前實驗中的一樣,最重要部分在於建立kvraft節點,即StartKVServer函式。

func StartKVServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int) *RaftKV {
    // call gob.Register on structures you want
    // Go's RPC library to marshall/unmarshall.
    gob.Register(Op{})

    kv := new(RaftKV)
    kv.me = me
    kv.maxraftstate = maxraftstate
    kv.persister = persister
    // Your initialization code here.

    kv.applyCh = make(chan raft.ApplyMsg)
    kv.rf = raft.Make(servers, me, persister, kv.applyCh)
    kv.data = make(map[string]string)
    kv.pendingOps = make(map[int][]*P_Op)
    kv.op_count = make(map[int64]int64)

    go func() {
        for msg := range kv.applyCh {
            kv.Apply(&msg)
        }
    }()

    return kv
}

  在StartKVServer函式中進行初始化1個kvraft節點,涉及的結構體為RaftKV結構體即描述kvraft節點。

type RaftKV struct {
    mu      sync.Mutex
    me      int
    rf      *raft.Raft
    applyCh chan raft.ApplyMsg

    maxraftstate int // snapshot if log grows this big
    persister  *raft.Persister
    data       map[string]string
    pendingOps map[int][]*P_Op
    op_count   map[int64]int64
}

  在RaftKV結構體中有幾個重要的成員,maxraftstate用於表示log的最大長度,persister表示用於永久儲存,data表示儲存key/value,pendingOps用於記錄正在執行的操作,op_count用於記錄每個節點最後1個已經執行操作的Id。而用於每個客戶端操作的結構體如下所示:

type Op struct {
    Type   int
    Key    string
    Value  string
    Client int64
    Id     int64
}

type P_Op struct {
    flag chan bool
    op   *Op
}

  其中Op結構體為具體儲存到log中的內容,包括了操作型別(get、put、append),鍵值對(Key/Value),客戶端Id和該客戶端的操作Id(2者組合標示1個操作Id)。而P_op結構用於表示伺服器節點上正在執行的操作,使用chan來表示同步,因為客戶端需要等raft節點commit後才能返回。
  回到StartKeyServer函式中,當raft節點commit時我們需要進行具體處理即接收訊息來處理,這裡使用goroutine來接受applyCh中的訊息,呼叫Apply函式來處理。
  上面都是伺服器端的初始化工作,在GenericTest函式中初始化server後呼叫makeClient函式來進行初始化client。進入函式發現重要的部分是MakeClerk函式。

type Clerk struct {
    servers []*labrpc.ClientEnd
    // You will have to modify this struct.
    leader_id    int
    client_id    int64
    cur_op_count int64
}

func MakeClerk(servers []*labrpc.ClientEnd) *Clerk {
    ck := new(Clerk)
    ck.servers = servers
    // You'll have to add code here.
    ck.client_id = nrand()
    ck.leader_id = 0
    ck.cur_op_count = 0
    return ck
}

  首先是客戶端結構體Clerk的具體內容,leader_id用於記錄上一次成功操作通訊的服務節點即記住Leader節點。由於只有Leader節點才能接受操作請求,這裡記住Leader節點避免了每次請求都要遍歷服務叢集節點來找到Leader節點。client_id表示客戶端Id,在初始化函式中使用nrand()函式來生成隨機值。cur_op_count用於記錄最後一次發出的操作Id。
  再次回到GenericTest函式中,對伺服器和客戶端初始化後,進入for迴圈。在for迴圈中原子性地設定done_clients和done_partitioner變數的值為0,用於控制後面goroutine中的迴圈。goroutine執行spawn_clients_and_wait函式,建立指定個數的客戶端併發地發出操作請求。在run_client函式中呼叫makeClient函式建立客戶端,並執行fn函式。這裡使用chan來實現同步通訊,確保fn函式被執行。

func run_client(t *testing.T, cfg *config, me int, ca chan bool, fn func(me int, ck *Clerk, t *testing.T)) {
    ok := false
    defer func() { ca <- ok }()
    ck := cfg.makeClient(cfg.All())
    fn(me, ck, t)
    ok = true
    cfg.deleteClient(ck)
}

func spawn_clients_and_wait(t *testing.T, cfg *config, ncli int, fn func(me int, ck *Clerk, t *testing.T)) {
    ca := make([]chan bool, ncli)
    for cli := 0; cli < ncli; cli++ {
        ca[cli] = make(chan bool)
        go run_client(t, cfg, cli, ca[cli], fn)
    }
    // log.Printf("spawn_clients_and_wait: waiting for clients")
    for cli := 0; cli < ncli; cli++ {
        ok := <-ca[cli]
        // log.Printf("spawn_clients_and_wait: client %d is done\n", cli)
        if ok == false {
            t.Fatalf("failure")
        }
    }
}

  在GenericTest函式中,fn函式為匿名函式,其主要功能是先呼叫Put函式執行put操作,然後不斷地呼叫Append函式來執行append操作,在此過程中會呼叫Get函式來執行get操作檢查上一次操作的值是否正確被寫入。
  所以重點還是實現3個RPC操作。
  我們先來看看客戶端的呼叫函式。對於put和append操作,這2者的可以用1個flag來表示操作區別。主要實現的函式是PutAppend函式。

func (ck *Clerk) PutAppend(key string, value string, op string) {
    // You will have to modify this function.
    var args PutAppendArgs

    args.Key = key
    args.Value = value
    args.Op = op
    args.Client = ck.client_id
    args.Id = atomic.AddInt64(&ck.cur_op_count, 1)
    for {
        var reply PutAppendReply
        ck.servers[ck.leader_id].Call("RaftKV.PutAppend", &args, &reply)
        if reply.Err == OK && reply.WrongLeader == false {
            DPrintf("Call success\n")
            break
        } else {
            ck.leader_id = (ck.leader_id + 1) % len(ck.servers)
        }
    }

}

  該函式中我們需要構建rpc呼叫的請求引數和儲存結果的引數。

// Put or Append
type PutAppendArgs struct {
    Key   string
    Value string
    Op    string // "Put" or "Append"
    Client int64
    Id     int64
}

type PutAppendReply struct {
    WrongLeader bool
    Err         Err
}

  請求引數PutAppendArgs的內容為表示鍵值對的Key/Value,操作型別Op,客戶端Id的Client和操作Id。結果引數PutAppendReply的內容為表示該服務節點是否是Leader節點的WrongLeader以及錯誤原因Err。
  在PutAppend函式中先封裝這次操作請求的請求引數,然後迴圈呼叫Leader服務節點的RPC請求。如果返回結果沒問題,則跳出迴圈,否則(該節點不為Leader)將請求發給下一個服務節點。
  客戶端關於get請求的操作與上面類似,這裡不再詳述。
  接下來看一下,伺服器端接收到請求後的處理流程。

func (kv *RaftKV) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
    // Your code here.
    var op Op

    if args.Op == "Put" {
        op.Type = OpPut
    } else {
        op.Type = OpAppend
    }
    op.Key = args.Key
    op.Value = args.Value
    op.Client = args.Client
    op.Id = args.Id
    reply.WrongLeader = kv.execOp(op)
    if reply.WrongLeader {
        reply.Err = ErrNotLeader
    } else {
        reply.Err = OK
    }
}

  由於3種請求為簡單操作,其具體實現類似,所以定義了1個通用的函式execOp來處理請求。在這之前將請求引數中內容提取封裝成Op結構體,因為具體儲存到log中的是Op結構體。

func (kv *RaftKV) execOp(op Op) bool {
    op_idx, _, is_leader := kv.rf.Start(op)
    if !is_leader {
        DPrintf("This server is not Leader\n")
        return STATUS_FOLLOWER
    }

    waiter := make(chan bool, 1)
    DPrintf("Append to pendingOps op_idx:%v  op:%v\n", op_idx, op)
    kv.mu.Lock()
    kv.pendingOps[op_idx] = append(kv.pendingOps[op_idx], &P_Op{flag: waiter, op: &op})
    kv.mu.Unlock()

    var ok bool
    timer := time.NewTimer(TIMEOUT)
    select {
    case ok = <-waiter:
    case <-timer.C:
        DPrintf("Wait operation apply to state machine exceeds timeout....\n")
        ok = false
    }
    delete(kv.pendingOps, op_idx)
    if !ok {
        DPrintf("Wrong leader\n")
        return STATUS_FOLLOWER
    }
    return STATUS_LEADER
}

  在execOp函式中,呼叫raft節點的Start函式來達成共識提交本次操作。然後根據本次操作新建1個P_Op結構體例項表示這次操作正在執行,新增到等待執行操作的佇列中。同時新建1個計時器來記錄等待時間。使用select實現超時機制,當waiter chan中收到訊息時表示本次操作有了具體結果。如果沒有超時並且本次操作正確執行了則返回成功資訊,否則返回錯誤資訊。
  而waiter chan的訊息主要是在前面提到的Apply函式中寫入的。

func (kv *RaftKV) Apply(msg *raft.ApplyMsg) {
    kv.mu.Lock()
    defer kv.mu.Unlock()

    var args Op
    args = msg.Command.(Op)
    if kv.op_count[args.Client] >= args.Id {
        DPrintf("Duplicate operation\n")
    } else {
        switch args.Type {
        case OpPut:
            DPrintf("Put Key/Value %v/%v\n", args.Key, args.Value)
            kv.data[args.Key] = args.Value
        case OpAppend:
            DPrintf("Append Key/Value %v/%v\n", args.Key, args.Value)
            kv.data[args.Key] = kv.data[args.Key] + args.Value
        default:
        }
        kv.op_count[args.Client] = args.Id
    }

    for _, i := range kv.pendingOps[msg.Index] {
        if i.op.Client == args.Client && i.op.Id == args.Id {
            DPrintf("Client:%v %v, Id:%v %v", i.op.Client, args.Client, i.op.Id, args.Id)
            i.flag <- true
        } else {
            DPrintf("Client:%v %v, Id:%v %v", i.op.Client, args.Client, i.op.Id, args.Id)
            i.flag <- false
        }
    }
}

  在Apply函式中節點收到了日誌提交的訊息。先將Log的Command轉換為Op結構體,判斷該提交的操作是否是冗餘的操作。這裡涉及到了RPC呼叫請求丟失時的處理,對於這種情況我們在這裡採取了簡單的處理方式,就是重新接受操作請求但是在提交日誌執行操作時判斷一下是否是冗餘的操作(通過Id標示)。如果是冗餘操作則忽視但是操作結果仍視為成功,如果不是則進行相應操作(Put或者Append)。當執行操作之後需要訊息反饋給等待的客戶端,可以通過前面使用的waiter chan,即往裡面寫入成功的訊息。至此整個操作流程就闡述完了。
  後面的一些測試函式其實檢查的是raft的正確性。