ETCD 原始碼學習--Raft log 的實現(十)
在 ETCD 原始碼學習過程,不會講解太多的原始碼知識,只講解相關的實現機制,需要關注原始碼細節的朋友可以自行根據文章中的提示,找到相關原始碼進行學習。
Raft log主要有兩部分組成,一是已提交但未被上層模組處理的 unstable訊息,另一部分是已被上層處理的 stable訊息。
主要檔案
/raft/log.go raft log對 stable和 unstable的封裝
/raft/log_unstable.log unstable儲存的實現
/raft/storage.log stable儲存的實現
log
log主要維護幾個主要狀態來維護節點的 log的資訊,committed (已提交的 Index)、applied(已被上層應用的 Index)、storage(已被上層模組處理的entries訊息)和 unstable(已提交未處理的訊息)
資料結構:
type raftLog struct {
storage Storage //持久化儲存
unstable unstable //未持久化儲存
committed uint64 //當前節點已提交的最大Index
applied uint64 //當前節點已應用的最大Index
logger Logger
maxNextEntsSize uint64
}
追加:
func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) { if l.matchTerm(index, logTerm) { //檢視 index 的 term 與 logTerm 是否匹配· lastnewi = index + uint64(len(ents)) ci := l.findConflict(ents) //查詢 ents 中,index 與 term 衝突的位置。 switch { case ci == 0: //沒有,全部追加完成 case ci <= l.committed: //如果衝突的位置在已提交的位置之前,有問題 l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed) default: //在提交位置之後,將未衝突的追加 offset := index + 1 l.append(ents[ci-offset:]...) } l.commitTo(min(committed, lastnewi)) return lastnewi, true } return 0, false } func (l *raftLog) append(ents ...pb.Entry) uint64 { if len(ents) == 0 { return l.lastIndex() } if after := ents[0].Index - 1; after < l.committed { l.logger.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed) } l.unstable.truncateAndAppend(ents) return l.lastIndex() }
提交 log 資料
func (l *raftLog) commitTo(tocommit uint64) { // never decrease commit if l.committed < tocommit { if l.lastIndex() < tocommit { l.logger.Panicf("tocommit(%d) is out of range [lastIndex(%d)]. Was the raft log corrupted, truncated, or lost?", tocommit, l.lastIndex()) } l.committed = tocommit } } func (l *raftLog) appliedTo(i uint64) { if i == 0 { return } if l.committed < i || i < l.applied { l.logger.Panicf("applied(%d) is out of range [prevApplied(%d), committed(%d)]", i, l.applied, l.committed) } l.applied = i } func (l *raftLog) stableTo(i, t uint64) { l.unstable.stableTo(i, t) } func (l *raftLog) stableSnapTo(i uint64) { l.unstable.stableSnapTo(i) }
unstable
unstable的日誌追加主要在 ratf/raft.log中的appendEntry(Leader),以及handleAppendEntries(Follower/Candidate)。
unstable資料結構:
type unstable struct {
snapshot *pb.Snapshot //快照
entries []pb.Entry //訊息
offset uint64 // 已被上層模組處理的最大的 entry.Index + 1。
logger Logger
}
截斷與追加entry:
func (u *unstable) truncateAndAppend(ents []pb.Entry) {
//ents[0] 是這批 entries 的第一個訊息,所以它的 Index 最小
after := ents[0].Index
switch {
//如圖1
case after == u.offset+uint64(len(u.entries)):
u.entries = append(u.entries, ents...)
//如圖2
case after <= u.offset:
u.offset = after
u.entries = ents
//其他取並集
default:
u.entries = append([]pb.Entry{}, u.slice(u.offset, after)...)
u.entries = append(u.entries, ents...)
}
}
func (u *unstable) slice(lo uint64, hi uint64) []pb.Entry {
u.mustCheckOutOfBounds(lo, hi)//判斷lo,hi 是否超過entries範圍
return u.entries[lo-u.offset : hi-u.offset]
}
圖1:
圖2:
commit log:
//i entry.index, t entry.term
func (u *unstable) stableTo(i, t uint64) {
gt, ok := u.maybeTerm(i) //嘗試獲取 i 的任期
if !ok {
return
}
if gt == t && i >= u.offset {
u.entries = u.entries[i+1-u.offset:] //縮減陣列
u.offset = i + 1 //更新已 stable 的最大index
u.shrinkEntriesArray()
}
}
//縮減entries
func (u *unstable) shrinkEntriesArray() {
const lenMultiple = 2
if len(u.entries) == 0 {
u.entries = nil
} else if len(u.entries)*lenMultiple < cap(u.entries) {
newEntries := make([]pb.Entry, len(u.entries))
copy(newEntries, u.entries)
u.entries = newEntries
}
}
stable
資料結構:
type MemoryStorage struct {
sync.Mutex
hardState pb.HardState // Term、Vote、commit 等節點資訊
snapshot pb.Snapshot //快照
ents []pb.Entry //entries
}
追加 log:
func (ms *MemoryStorage) Append(entries []pb.Entry) error {
...
first := ms.firstIndex()
last := entries[0].Index + uint64(len(entries)) - 1
...
if first > entries[0].Index {
entries = entries[first-entries[0].Index:]
}
offset := entries[0].Index - ms.ents[0].Index
//這三種情況與 unstable 相似,這裡不在做解釋
switch {
case uint64(len(ms.ents)) > offset:
ms.ents = append([]pb.Entry{}, ms.ents[:offset]...)
ms.ents = append(ms.ents, entries...)
case uint64(len(ms.ents)) == offset:
ms.ents = append(ms.ents, entries...)
default:
...
}
return nil
}
其他:
ApplySnapshot 申請快照
CreateSnapshot建立快照
Compact壓縮entries
總結
1.Raft中 log模板主要包括兩個部分的儲存,一是未持久化的 log,另一個是已持久化的 log,並通過 committed和 applied還維護節點 Index資訊。
2.未持久化的 log (unstable log)是客戶端進行的提案,當前節點接收到了訊息,只是暫時放到了一個 msgs佇列,但未進行處理,需要等待上層模組進行持久化和進行相應的處理。
3.持久化 log (stable)是當前節點的上層應用對 msgs佇列中的訊息進行了相關處理之後,儲存到 stable中,並且對 unstable log進行 commit。所以一個 log要麼儲存在 unstable中,要麼存在的 stable中。
PS:歡迎糾正