1. 程式人生 > 其它 >ETCD 原始碼學習--Raft log 的實現(十)

ETCD 原始碼學習--Raft log 的實現(十)

技術標籤:etcdetcdraftraft log

在 ETCD 原始碼學習過程,不會講解太多的原始碼知識,只講解相關的實現機制,需要關注原始碼細節的朋友可以自行根據文章中的提示,找到相關原始碼進行學習。

Raft log主要有兩部分組成,一是已提交但未被上層模組處理的 unstable訊息,另一部分是已被上層處理的 stable訊息。

主要檔案

/raft/log.go raft log對 stable和 unstable的封裝

/raft/log_unstable.log unstable儲存的實現

/raft/storage.log stable儲存的實現

log

log主要維護幾個主要狀態來維護節點的 log的資訊,committed (已提交的 Index)、applied(已被上層應用的 Index)、storage(已被上層模組處理的entries訊息)和 unstable(已提交未處理的訊息)

資料結構:

type raftLog struct {
	storage Storage //持久化儲存
	unstable unstable //未持久化儲存
	committed uint64 //當前節點已提交的最大Index
	applied uint64 //當前節點已應用的最大Index
	logger Logger
	maxNextEntsSize uint64
}

追加:

func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {
	if l.matchTerm(index, logTerm) { //檢視 index 的 term 與 logTerm 是否匹配·
		lastnewi = index + uint64(len(ents))
		ci := l.findConflict(ents) //查詢 ents 中,index  與 term 衝突的位置。
		switch {
		case ci == 0: //沒有,全部追加完成
		case ci <= l.committed: //如果衝突的位置在已提交的位置之前,有問題
			l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
		default:
			//在提交位置之後,將未衝突的追加
			offset := index + 1
			l.append(ents[ci-offset:]...)
		}
		l.commitTo(min(committed, lastnewi))
		return lastnewi, true
	}
	return 0, false
}

func (l *raftLog) append(ents ...pb.Entry) uint64 {
	if len(ents) == 0 {
		return l.lastIndex()
	}
	if after := ents[0].Index - 1; after < l.committed {
		l.logger.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed)
	}
	l.unstable.truncateAndAppend(ents)
	return l.lastIndex()
}

提交 log 資料

func (l *raftLog) commitTo(tocommit uint64) {
	// never decrease commit
	if l.committed < tocommit {
		if l.lastIndex() < tocommit {
			l.logger.Panicf("tocommit(%d) is out of range [lastIndex(%d)]. Was the raft log corrupted, truncated, or lost?", tocommit, l.lastIndex())
		}
		l.committed = tocommit
	}
}
func (l *raftLog) appliedTo(i uint64) {
	if i == 0 {
		return
	}
	if l.committed < i || i < l.applied {
		l.logger.Panicf("applied(%d) is out of range [prevApplied(%d), committed(%d)]", i, l.applied, l.committed)
	}
	l.applied = i
}
func (l *raftLog) stableTo(i, t uint64) { l.unstable.stableTo(i, t) }

func (l *raftLog) stableSnapTo(i uint64) { l.unstable.stableSnapTo(i) }

unstable

unstable的日誌追加主要在 ratf/raft.log中的appendEntry(Leader),以及handleAppendEntries(Follower/Candidate)。

unstable資料結構:

type unstable struct {
	snapshot *pb.Snapshot //快照
	entries []pb.Entry //訊息
	offset  uint64 // 已被上層模組處理的最大的 entry.Index + 1。
	logger Logger
}

截斷與追加entry:

func (u *unstable) truncateAndAppend(ents []pb.Entry) {
    //ents[0] 是這批 entries 的第一個訊息,所以它的 Index 最小
	after := ents[0].Index
	switch {
    //如圖1
	case after == u.offset+uint64(len(u.entries)):
		u.entries = append(u.entries, ents...)
    //如圖2
	case after <= u.offset:
		u.offset = after
		u.entries = ents
    //其他取並集
    default:
		u.entries = append([]pb.Entry{}, u.slice(u.offset, after)...)
		u.entries = append(u.entries, ents...)
	}
}

func (u *unstable) slice(lo uint64, hi uint64) []pb.Entry {
	u.mustCheckOutOfBounds(lo, hi)//判斷lo,hi 是否超過entries範圍
	return u.entries[lo-u.offset : hi-u.offset]
}

圖1:

圖2:

commit log:

//i entry.index, t entry.term
func (u *unstable) stableTo(i, t uint64) {
	gt, ok := u.maybeTerm(i) //嘗試獲取 i 的任期
	if !ok {
		return
	}
	if gt == t && i >= u.offset {
		u.entries = u.entries[i+1-u.offset:] //縮減陣列
		u.offset = i + 1  //更新已 stable 的最大index
		u.shrinkEntriesArray()
	}
}
//縮減entries
func (u *unstable) shrinkEntriesArray() {
	const lenMultiple = 2
	if len(u.entries) == 0 {
		u.entries = nil
	} else if len(u.entries)*lenMultiple < cap(u.entries) {
		newEntries := make([]pb.Entry, len(u.entries))
		copy(newEntries, u.entries)
		u.entries = newEntries
	}
}

stable

資料結構:

type MemoryStorage struct {
 sync.Mutex
 hardState pb.HardState // Term、Vote、commit 等節點資訊
 snapshot pb.Snapshot //快照
 ents []pb.Entry //entries
}

追加 log:

func (ms *MemoryStorage) Append(entries []pb.Entry) error {
    ...
	first := ms.firstIndex()
	last := entries[0].Index + uint64(len(entries)) - 1
	...
	if first > entries[0].Index {
		entries = entries[first-entries[0].Index:]
	}
	
	offset := entries[0].Index - ms.ents[0].Index
    //這三種情況與 unstable 相似,這裡不在做解釋
	switch {
	case uint64(len(ms.ents)) > offset:
		ms.ents = append([]pb.Entry{}, ms.ents[:offset]...)
		ms.ents = append(ms.ents, entries...)
	case uint64(len(ms.ents)) == offset:
		ms.ents = append(ms.ents, entries...)
	default:
	    ...
	}
	return nil
}

其他:

ApplySnapshot 申請快照

CreateSnapshot建立快照

Compact壓縮entries

總結

1.Raft中 log模板主要包括兩個部分的儲存,一是未持久化的 log,另一個是已持久化的 log,並通過 committed和 applied還維護節點 Index資訊。

2.未持久化的 log (unstable log)是客戶端進行的提案,當前節點接收到了訊息,只是暫時放到了一個 msgs佇列,但未進行處理,需要等待上層模組進行持久化和進行相應的處理。

3.持久化 log (stable)是當前節點的上層應用對 msgs佇列中的訊息進行了相關處理之後,儲存到 stable中,並且對 unstable log進行 commit。所以一個 log要麼儲存在 unstable中,要麼存在的 stable中。

PS:歡迎糾正