ETCD 原始碼學習--Raft 中 progress 的 inFlight 實現(九)
首先需要搞清什麼是 inFlight,inFlight在 Raft中儲存的是已傳送給 Follower的 MsgApp訊息,但沒有收到 MsgAppResp的訊息 Index 值。簡單的說就是 Leader 傳送一個訊息給 Follower,Leader在對應的 Follower狀態維護結構(progress)中,將這個訊息的 ID記錄在 inFlight中,當 Follower收到訊息之後,告知 Leader收到了這個 ID的訊息,Leader將從 inFlight中刪除,表示 Follower已經接收,否則如果 Follower在指定時間內沒有響應,Leader會根據一定策略進行重發。
今天主要講的是 inFlight的設計思路,個人覺得挺有意思。Raft中的 inFlight是通過一個具有最大長度(size)的陣列([]uint64)構造成一個環形陣列。
主要檔案
/raft/tracker/inflights.go inflight的實現
原始碼
type Inflights struct {
start int //記錄最舊的那個未被響應的訊息,在buffer中的位置
count int //已傳送,但未響應的訊息總數
size int //buffer的最大長度
buffer []uint64 //儲存ID值
}
1.inFlight通過 start、count、size使 buffer形成一個環形陣列,流程如下:
(1)初始狀態
(2)新增幾個資料,在沒有達到陣列最大長度時,count++,陣列索引位置為 next = start+count
(3)收到 ID為 14的訊息響應,刪除 11-14的訊息, count減少對應的個數, start由 0變成 4
(4)繼續新增資料,直到 count+start等於陣列的長度
(5)繼續新增一個數據
2.元素追加
//判斷buffer是否已滿,如果滿了,不能繼續新增 func (in *Inflights) Full() bool { return in.count == in.size } //buffer 自動增長 func (in *Inflights) grow() { newSize := len(in.buffer) * 2 if newSize == 0 { newSize = 1 } else if newSize > in.size { newSize = in.size } newBuffer := make([]uint64, newSize) copy(newBuffer, in.buffer) in.buffer = newBuffer } //追加元素 func (in *Inflights) Add(inflight uint64) { if in.Full() { //判斷是否已滿 panic("cannot add into a Full inflights") } next := in.start + in.count //下一個buffer位置 size := in.size if next >= size { next -= size } if next >= len(in.buffer) { in.grow() } in.buffer[next] = inflight in.count++ }
3.釋放元素
func (in *Inflights) FreeLE(to uint64) {
if in.count == 0 || to < in.buffer[in.start] {
return
}
idx := in.start
var i int
//從 start 開始,直到找到最大且小於 to 的元素位置
for i = 0; i < in.count; i++ {
if to < in.buffer[idx] {
break
}
size := in.size
if idx++; idx >= size {
idx -= size
}
}
//重新計算 count 和 start
in.count -= i
in.start = idx
if in.count == 0 {
in.start = 0
}
}
總結
1.inFlight通過 size、count和 start使 buffer 形成一個環形陣列。
2.size是 buffer 最大長度,start 表示最舊的元素所在的位置,count表示 buffer中未得到響應的元素數量。
3.next = start+count,當 next>size時,next位置為 next-size。
PS:歡迎糾正