1. 程式人生 > >MIT6.824 Lab2 Raft(3)

MIT6.824 Lab2 Raft(3)

  最後一部分介紹一下測試程式碼中的測試函式。
  緊接著上次,現在是TestFailNoAgree函式測試在節點Fail太多的情況下無法達成一致性。

func TestFailNoAgree(t *testing.T) {
    servers := 5
    cfg := make_config(t, servers, false)
    defer cfg.cleanup()

    fmt.Printf("Test: no agreement if too many followers fail ...\n")

    cfg.one(10, servers)

    // 3 of 5 followers disconnect
leader := cfg.checkOneLeader() cfg.disconnect((leader + 1) % servers) cfg.disconnect((leader + 2) % servers) cfg.disconnect((leader + 3) % servers) index, _, ok := cfg.rafts[leader].Start(20) if ok != true { t.Fatalf("leader rejected Start()") } if index != 2
{ t.Fatalf("expected index 2, got %v", index) } time.Sleep(2 * RaftElectionTimeout) n, _ := cfg.nCommitted(index) if n > 0 { t.Fatalf("%v committed but no majority", n) } // repair failures cfg.connect((leader + 1) % servers) cfg.connect((leader + 2
) % servers) cfg.connect((leader + 3) % servers) // the disconnected majority may have chosen a leader from // among their own ranks, forgetting index 2. // or perhaps leader2 := cfg.checkOneLeader() index2, _, ok2 := cfg.rafts[leader2].Start(30) if ok2 == false { t.Fatalf("leader2 rejected Start()") } if index2 < 2 || index2 > 3 { t.Fatalf("unexpected index %v", index2) } cfg.one(1000, servers) fmt.Printf(" ... Passed\n") }

  通過one函式中列印函式,我們可以檢視不同情況下當前系統的Leader(Leader=1)。在TestFailNoAgree函式中,先呼叫one函式檢查一致性(index=1),然後斷開3個Follwer節點,此時客戶端發出的請求(index=2),raft系統中只剩下2個節點,所以Append Entry同意的節點數少於majority,該序號日誌項應該無法committed。緊接著重新將斷開連線的3個節點加入網路,由於斷開節點數超過majority,所以此時系統可能重新選舉Leader,呼叫checkOneLeader函式發現Leader=2,所以原先index=2的請求就會被拋棄。然後客戶端發出請求(index=2),返回的序號應該為2。
  TestConcurrentStarts函式測試在一個Term中併發請求的一致性。

func TestConcurrentStarts(t *testing.T) {
    servers := 3
    cfg := make_config(t, servers, false)
    defer cfg.cleanup()

    fmt.Printf("Test: concurrent Start()s ...\n")

    var success bool
loop:
    for try := 0; try < 5; try++ {
        if try > 0 {
            // give solution some time to settle
            time.Sleep(3 * time.Second)
        }

        leader := cfg.checkOneLeader()
        _, term, ok := cfg.rafts[leader].Start(1)
        if !ok {
            // leader moved on really quickly
            continue
        }

        iters := 5
        var wg sync.WaitGroup
        is := make(chan int, iters)
        for ii := 0; ii < iters; ii++ {
            wg.Add(1)
            go func(i int) {
                defer wg.Done()
                i, term1, ok := cfg.rafts[leader].Start(100 + i)
                if term1 != term {
                    return
                }
                if ok != true {
                    return
                }
                is <- i
            }(ii)
        }

        wg.Wait()
        close(is)

        for j := 0; j < servers; j++ {
            if t, _ := cfg.rafts[j].GetState(); t != term {
                // term changed -- can't expect low RPC counts
                continue loop
            }
        }

        failed := false
        cmds := []int{}
        for index := range is {
            cmd := cfg.wait(index, servers, term)
            if ix, ok := cmd.(int); ok {
                if ix == -1 {
                    // peers have moved on to later terms
                    // so we can't expect all Start()s to
                    // have succeeded
                    failed = true
                    break
                }
                cmds = append(cmds, ix)
            } else {
                t.Fatalf("value %v is not an int", cmd)
            }
        }

        if failed {
            // avoid leaking goroutines
            go func() {
                for range is {
                }
            }()
            continue
        }

        for ii := 0; ii < iters; ii++ {
            x := 100 + ii
            ok := false
            for j := 0; j < len(cmds); j++ {
                if cmds[j] == x {
                    ok = true
                }
            }
            if ok == false {
                t.Fatalf("cmd %v missing in %v", x, cmds)
            }
        }

        success = true
        break
    }

    if !success {
        t.Fatalf("term changed too often")
    }

    fmt.Printf("  ... Passed\n")
}

  先新建1個有3個節點的raft系統,呼叫checkOneLeader函式來獲得raft系統當前的leader序號,呼叫Start函式給該節點發送命令(Index=1),然後新建1個可容納5個元素的Channel,建立5個goroutine去併發地傳送命令,這裡使用sync包中的WaitGroup來保證同步。在每個goroutine傳送命令完後檢查是否在先前的Term中,如果命令成功則寫入序號(這裡Index為2-6)。等5個goroutine都結束後判斷先前的操作是否在同一Term中,如果不是則重頭再來,主要是保證高併發RPC。然後從先前的Channel中取出序號,這裡通過wait函式來等待相應序號的命令日誌被raft系統所有節點提交。如果Term改變了比如系統出現錯誤,那麼清除Channel中的命令。最後比價從Channel中獲得的命令是否與先前傳送請求的命令相同,即命令是否被有序的提交了。
  TestRejoin函式測試當出現網路分割槽時的一致性。

func TestRejoin(t *testing.T) {
    servers := 3
    cfg := make_config(t, servers, false)
    defer cfg.cleanup()

    fmt.Printf("Test: rejoin of partitioned leader ...\n")

    cfg.one(101, servers)

    // leader network failure
    leader1 := cfg.checkOneLeader()
    cfg.disconnect(leader1)

    // make old leader try to agree on some entries
    cfg.rafts[leader1].Start(102)
    cfg.rafts[leader1].Start(103)
    cfg.rafts[leader1].Start(104)

    // new leader commits, also for index=2
    cfg.one(103, 2)

    // new leader network failure
    leader2 := cfg.checkOneLeader()
    cfg.disconnect(leader2)

    // old leader connected again
    cfg.connect(leader1)

    cfg.one(104, 2)

    // all together now
    cfg.connect(leader2)

    cfg.one(105, servers)

    fmt.Printf("  ... Passed\n")
}

  先新建1個有3個節點的raft系統,呼叫one函式來發送命令請求(Index=1,cmd=101,Term=1)並完成提交。然後斷開leader節點與網路的連線,此時形成2個網路分割槽,即Leader節(n1)點和2個Follwer節點(n2,n3)。向n1節點發送3個命令(Term=1),但是無法獲得提交,因為該網路分割槽的節點數目少於Majority,那麼此時n1節點的Index=4,Term=1。原先的2個Follwer節點重新選舉,向新Leader節點(n2)傳送命令請求(Index=2,cmd=103,Term=2)並提交,然後斷開n2節點與網路的連線,將n1節點重新納入網路中。由於n1節點的Term比n3節點的Term小,所以n3節點會成為Leader,n1節點上不同的Log會被拋棄。然後呼叫one函式再發送命令請求(Index=3,cmd=104,Term=3)並完成提交。最後將n2節點重新加入網路中,n3節點仍然是Leader節點,會追加日誌到n2節點。最後呼叫one函式來發送命令請求(Index=4,cmd=105,Term=3)。
  具體變化過程是(Term值不一定與實際符合):
  在呼叫one(101)後Leader為N1
  N1:Term=1,log為{101}
  N2:Term=1,log為{101}
  N3:Term=1,log為{101}
  當斷開N1並呼叫start(102)、start(103)、start(104)後
  N1:Term=1,log為{101、102、103、104}
  N2:Term=1,log為{101}
  N3:Term=1,log為{101}
  當呼叫one(103)後Leader為N2
  N1:Term=1,log為{101、102、103、104}
  N2:Term=2,log為{101、103}
  N3:Term=2,log為{101、103}
  當斷開N2並連線N1並呼叫one(104)後Leader為N3,這裡Term值可能為3或者4,因為當斷開N2連線N1之前,N3可能超時
  N1:Term=3,log為{101、103、104}
  N2:Term=3,log為{101、103、104}
  N3:Term=2,log為{101、103}
  當連線N2並呼叫one(105)後Leader仍為N3
  N1:Term=3,log為{101、103、104、105}
  N2:Term=3,log為{101、103、104、105}
  N3:Term=3,log為{101、103、104、105}
  
  之後的測試分析根據上面流程即可,等以後有空再補上。。。