深入淺出kubernetes之client-go的DeltaFIFO
記得大學剛畢業那年看了侯俊傑的《深入淺出MFC》,就對深入淺出這四個字特別偏好,並且成為了自己對技術的要求標準——對於技術的理解要足夠的深刻以至於可以用很淺顯的道理給別人講明白。以下內容為個人見解,如有雷同,純屬巧合,如有錯誤,煩請指正。
本文基於kubernetes1.11版本,後續會根據kubernetes版本更新及時更新文件,所有程式碼引用為了簡潔都去掉了日誌列印相關的程式碼,儘量只保留有價值的內容。
在開始本文內容前,請先閱讀《深入淺出kubernetes之client-go的indexer》。
目錄
DeltaFIFO簡介
Informer是client-go的重要組成部分,在瞭解client-go之前,瞭解一下Informer的實現是很有必要的,下面引用了官方的圖,可以看到Informer在client-go中的位置。
由於Informer比較龐大,所以我們把它拆解成接獨立的模組分析,本文分析的就是DeltaFIFO模組。在理解DeltaFIFO前,我們需要知道什麼是Delta。學過微積分的同學肯定都比較好理解,可以簡單的理解為變化。那我們看看client-go是如何定義Delta的:
// 程式碼源自client-go/tools/cache/delta_fifo.go,下面型別出現順序是為了方便讀者理解 type Delta struct { Type DeltaType // Delta型別,比如增、減,後面有詳細說明 Object interface{} // 物件,Delta的粒度是一個物件 } type DeltaType string // Delta的型別用字串表達 const ( Added DeltaType = "Added" // 增加 Updated DeltaType = "Updated" // 更新 Deleted DeltaType = "Deleted" // 刪除 Sync DeltaType = "Sync" // 同步 ) type Deltas []Delta // Delta陣列
Delta其實就是kubernetes系統中物件的變化(增、刪、改、同步),FIFO比較好理解,是一個先入先出的佇列,那麼DeltaFIFO就是一個按序的(先入先出)kubernetes物件變化的佇列,這就非常符合上面圖中DeltaFIFO所在位置的功能了。
既然說到了DeltaFIFO,我們再說一說如下幾個型別,因為他們定義在DeltaFIFO的檔案中,而且在很多地方應用:
// 程式碼源自client-go/tools/cache/delta_fifo.go
// 這是一個非常通用的介面型別,只定義了一個介面函式,就是返回所有的keys。
type KeyLister interface {
ListKeys() []string
}
// 這也是一個非常通用的介面型別,只定義了一個介面函式,就是通過key獲取物件
type KeyGetter interface {
GetByKey(key string) (interface{}, bool, error)
}
// 這個介面型別就是上面兩個介面型別的組合了
type KeyListerGetter interface {
KeyLister
KeyGetter
}
為什麼要提這幾個型別的,首先是後面的章節會用到, 同時也是對《深入淺出kubernetes之client-go的indexer》的補充。有沒有發現上面兩個介面在client-go.tools.cache.Store這個介面型別中也存在,也就是說實現了Store介面的型別同時也實現了上面三個介面,golang這種沒有顯式的多繼承一時半會兒好難接受。上面三個介面基本上就是kv的標準介面,但凡是通過kv方式訪問的物件(儲存、佇列、索引等)多半具備以上介面。肯定有人會問直接使用具體的型別不就完了麼,定義這些有什麼用?答案很簡單,當你需要對kv的物件只讀但是不關心具體實現時就用上了~
接下來再來認識一個型別:
// 程式碼源自client-go/tools/cache/fifo.go
// 這個才是FIFO的抽象,DeltaFIFO只是FIFO的一種實現。
type Queue interface {
Store // 實現了儲存介面,這個很好理解,FIFO也是一種儲存
Pop(PopProcessFunc) (interface{}, error) // 在儲存的基礎上增加了Pop介面,用於彈出物件
AddIfNotPresent(interface{}) error // 物件如果不在佇列中就新增
HasSynced() bool // 通過Replace()放入第一批物件到佇列中並且已經被Pop()全部取走
Close() // 關閉佇列
}
《深入淺出kubernetes之client-go的indexer》已經對Store做了詳細說明,讀者可以先進行了解再繼續本文的內容。Queue是在Store基礎上擴充套件了Pop介面可以讓物件有序的彈出,Indexer是在Store基礎上建立了索引,可以快速檢索物件。
DeltaFIFO實現
我們先來看看DeltaFIFO的型別定義:
// 程式碼源自client-go/tools/cache/delta_fifo.go
type DeltaFIFO struct {
lock sync.RWMutex // 讀寫鎖,因為涉及到同時讀寫,讀寫鎖效能要高
cond sync.Cond // 給Pop()介面使用,在沒有物件的時候可以阻塞,內部鎖複用讀寫鎖
items map[string]Deltas // 這個應該是Store的本質了,按照kv的方式儲存物件,但是儲存的是物件的Deltas陣列
queue []string // 這個是為先入先出實現的,儲存的就是物件的鍵
populated bool // 通過Replace()介面將第一批物件放入佇列,或者第一次呼叫增、刪、改介面時標記為true
initialPopulationCount int // 通過Replace()介面將第一批物件放入佇列的物件數量
keyFunc KeyFunc // 物件鍵計算函式,在Indexer那篇文章介紹過
knownObjects KeyListerGetter // 前面介紹就是為了這是用,該物件指向的就是Indexer,
closed bool // 是否已經關閉的標記
closedLock sync.Mutex // 專為關閉設計的所,為什麼不復用讀寫鎖?
}
看過《深入淺出kubernetes之client-go的indexer》再看上面的定義就比較理解,DeltaFIFO的計算物件鍵的函式略有不同,即便建立DeltaFIFO需要給計算物件鍵的函式:
// 程式碼源自client-go/tools/cache/delta_fifo.go
func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
// 先用Deltas做一次強行轉換
if d, ok := obj.(Deltas); ok {
if len(d) == 0 {
return "", KeyError{obj, ErrZeroLengthDeltasObject}
}
// 只用最新版本的物件就可以了
obj = d.Newest().Object
}
// 後面的我們在《深入淺出kubernetes之client-go的indexer》介紹了,此處不贅述
if d, ok := obj.(DeletedFinalStateUnknown); ok {
return d.Key, nil
}
return f.keyFunc(obj)
}
DeltaFIFO的計算物件鍵的方式為什麼要先做一次Deltas的型別轉換呢?原因很簡單,那就是從DeltaFIFO.Pop()出去的物件很可能還要再新增進來(比如處理失敗需要再放進來),此時新增的物件就是已經封裝好的Deltas。
既然DeltaFIFO是Store的一種實現,簡單過一過DeltaFIFO相應的函式實現(簡單的函式放在一起介紹,重點函式專門介紹):
// 程式碼源自client-go/tools/cache/delta_fifo.go
// 假設讀者已經度過《深入淺出kubernetes之client-go的indexer》,註釋變得清爽一點
// 新增物件介面
func (f *DeltaFIFO) Add(obj interface{}) error {
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true // 佇列第一次寫入操作都要設定標記
return f.queueActionLocked(Added, obj)
}
// 更新物件介面
func (f *DeltaFIFO) Update(obj interface{}) error {
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true // 佇列第一次寫入操作都要設定標記
return f.queueActionLocked(Updated, obj)
}
// 刪除物件介面,這個函式貌似有點大,就註釋多點吧
func (f *DeltaFIFO) Delete(obj interface{}) error {
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true // 佇列第一次寫入操作都要設定標記
// 此處是需要注意的,knownObjects就是Indexer,裡面存有已知全部的物件
if f.knownObjects == nil {
// 在沒有Indexer的條件下只能通過自己儲存的物件查一下
if _, exists := f.items[id]; !exists {
return nil
}
} else {
// 自己和Indexer裡面有任何一個有這個物件多算存在
_, exists, err := f.knownObjects.GetByKey(id)
_, itemsExist := f.items[id]
if err == nil && !exists && !itemsExist {
return nil
}
}
return f.queueActionLocked(Deleted, obj)
}
// 列舉物件鍵介面
func (f *DeltaFIFO) ListKeys() []string {
f.lock.RLock()
defer f.lock.RUnlock()
list := make([]string, 0, len(f.items))
for key := range f.items {
list = append(list, key)
}
return list
}
// 列舉物件介面
func (f *DeltaFIFO) List() []interface{} {
f.lock.RLock()
defer f.lock.RUnlock()
return f.listLocked()
}
// 列舉物件的具體實現
func (f *DeltaFIFO) listLocked() []interface{} {
list := make([]interface{}, 0, len(f.items))
for _, item := range f.items {
item = copyDeltas(item)
list = append(list, item.Newest().Object)
}
return list
}
// 獲取物件介面,這個有意思哈,用物件獲取物件?如果說用Service物件獲取Pod物件是不是就能接受了?
// 因為他們的物件鍵是相同的
func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
key, err := f.KeyOf(obj)
if err != nil {
return nil, false, KeyError{obj, err}
}
return f.GetByKey(key)
}
// 通過物件鍵獲取物件
func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
f.lock.RLock()
defer f.lock.RUnlock()
d, exists := f.items[key]
if exists {
d = copyDeltas(d)
}
return d, exists, nil
}
// 判斷是否關閉
func (f *DeltaFIFO) IsClosed() bool {
f.closedLock.Lock()
defer f.closedLock.Unlock()
if f.closed {
return true
}
return false
}
上面的實現因為比較簡單,而且大部分函式都用到了queueActionLocked()函式,所以我要對這個函式做比較細緻的說明:
// 程式碼源自client-go/tools/cache/delta_fifo.go
// 從函式名稱來看把“動作”放入佇列中,這個動作就是DeltaType,而且已經加鎖了
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
// 前面提到的計算物件鍵的函式
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
// 如果是同步,並且物件未來會被刪除,那麼就直接返回,沒必要記錄這個動作了
// 肯定有人會問為什麼Add/Delete/Update這些動作可以,因為同步對於已經刪除的物件是沒有意義的
// 已經刪除的物件後續跟新增、更新有可能,因為同名的物件又被添加了,刪除也是有可能
// 刪除有些複雜,後面會有說明
if actionType == Sync && f.willObjectBeDeletedLocked(id) {
return nil
}
// 同一個物件的多次操作,所以要追加到Deltas陣列中
newDeltas := append(f.items[id], Delta{actionType, obj})
// 合併操作,去掉冗餘的delta
newDeltas = dedupDeltas(newDeltas)
// 判斷物件是否已經存在
_, exists := f.items[id]
// 合併後操作有可能變成沒有Delta麼?後面的程式碼分析來看應該不會,所以暫時不知道這個判斷目的
if len(newDeltas) > 0 {
// 如果物件沒有存在過,那就放入佇列中,如果存在說明已經在queue中了,也就沒必要再添加了
if !exists {
f.queue = append(f.queue, id)
}
// 更新Deltas陣列,通知所有呼叫Pop()的人
f.items[id] = newDeltas
f.cond.Broadcast()
} else if exists {
// 直接把物件刪除,這段程式碼我不知道什麼條件會進來,因為dedupDeltas()肯定有返回結果的
// 後面會有dedupDeltas()詳細說明
delete(f.items, id)
}
return nil
}
首先我們想想為什麼每個物件一個Deltas而不是Delta?對一個物件的多個操作,什麼操作可以合併?
-
DeltaFIFO生產者和消費者是非同步的,如果同一個目標的頻繁操作,前面操作還快取在佇列中的時候,那麼佇列就要緩衝物件的所有操作,那可以將多個操作合併麼?這是下面討論的了;
-
對於更新這種型別的操作在沒有全量基礎的情況下是沒法合併的,同時我們還不知道具體是什麼型別的物件,所以能合併的也就是有新增/刪除,兩個新增/刪除操作其實可以視為一個;
那我們就開始看看合併操作的具體實現:
// 程式碼源自client-go/tools/cache/delta_fifo.go
func dedupDeltas(deltas Deltas) Deltas {
// 小於2個delta,那就是1個唄,沒啥好合並的
n := len(deltas)
if n < 2 {
return deltas
}
// 取出最後兩個
a := &deltas[n-1]
b := &deltas[n-2]
// 判斷如果是重複的,那就刪除這兩個delta把合併後的追加到Deltas陣列尾部
if out := isDup(a, b); out != nil {
d := append(Deltas{}, deltas[:n-2]...)
return append(d, *out)
}
return deltas
}
// 判斷兩個Delta是否是重複的
func isDup(a, b *Delta) *Delta {
// 只有一個判斷,只能判斷是否為刪除類操作,和我們上面的判斷相同
// 這個函式的本意應該還可以判斷多種型別的重複,當前來看只能有刪除這一種能夠合併
if out := isDeletionDup(a, b); out != nil {
return out
}
return nil
}
// 判斷是否為刪除類的重複
func isDeletionDup(a, b *Delta) *Delta {
// 二者都是刪除那肯定有一個是重複的
if b.Type != Deleted || a.Type != Deleted {
return nil
}
// 理論上返回最後一個比較好,但是物件已經不再系統監控範圍,前一個刪除狀態是好的
if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
return a
}
return b
}
因為系統對於刪除的物件有DeletedFinalStateUnknown這個狀態,所以會存在兩次刪除的情況,但是兩次新增同一個物件由於apiserver可以保證物件的唯一性,所以處理中就沒有考慮合併兩次新增操作。
接下來我們來看看Replace()函式的實現,這個也是Store定義的介面:
// 程式碼源自client-go/tools/cache/delta_fifo.go
func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
f.lock.Lock()
defer f.lock.Unlock()
keys := make(sets.String, len(list))
// 遍歷所有的輸入目標
for _, item := range list {
// 計算目標鍵
key, err := f.KeyOf(item)
if err != nil {
return KeyError{item, err}
}
// 記錄處理過的目標鍵,採用set儲存,是為了後續快速查詢
keys.Insert(key)
// 因為輸入是目標全量,所以每個目標相當於重新同步了一次
if err := f.queueActionLocked(Sync, item); err != nil {
return fmt.Errorf("couldn't enqueue object: %v", err)
}
}
// 如果沒有儲存的話,自己儲存的就是所有的老物件,目的要看看那些老物件不在全量集合中,那麼就是刪除的物件了
if f.knownObjects == nil {
// 遍歷所有的元素
for k, oldItem := range f.items {
// 這個目標在輸入的物件中存在就可以忽略
if keys.Has(k) {
continue
}
// 輸入物件中沒有,說明物件已經被刪除了。
var deletedObj interface{}
if n := oldItem.Newest(); n != nil {
deletedObj = n.Object
}
// 終於看到哪裡用到DeletedFinalStateUnknown了,佇列中儲存物件的Deltas陣列中
// 可能已經存在Delete了,避免重複,採用DeletedFinalStateUnknown這種型別
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
// 如果populated還沒有設定,說明是第一次並且還沒有任何修改操作執行過
if !f.populated {
f.populated = true
f.initialPopulationCount = len(list) // 記錄第一次通過來的物件數量
}
return nil
}
// 下面處理的就是檢測某些目標刪除但是Delta沒有在佇列中
// 從儲存中獲取所有物件鍵
knownKeys := f.knownObjects.ListKeys()
queuedDeletions := 0
for _, k := range knownKeys {
// 物件還存在那就忽略
if keys.Has(k) {
continue
}
// 獲取物件
deletedObj, exists, err := f.knownObjects.GetByKey(k)
if err != nil {
deletedObj = nil
glog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
} else if !exists {
deletedObj = nil
glog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
}
// 累積刪除的物件數量
queuedDeletions++
// 把物件刪除的Delta放入佇列
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
// 和上面的程式碼差不多,只是計算initialPopulationCount值的時候增加了刪除物件的數量
if !f.populated {
f.populated = true
f.initialPopulationCount = len(list) + queuedDeletions
}
return nil
}
從Replace()的實現來看,主要用於實現物件的全量更新。這個可以理解為DeltaFIFO在必要的時刻做一次全量更新,這個時刻可以是定期的,也可以是事件觸發的。由於DeltaFIFO對外輸出的就是所有目標的增量變化,所以每次全量更新都要判斷物件是否已經刪除,因為在全量更新前可能沒有收到目標刪除的請求。這一點與cache不同,cache的Replace()相當於重建,因為cache就是物件全量的一種記憶體對映,所以Replace()就等於重建。
那我來問題一個非常有水平的問題,為什麼knownObjects為nil時需要對比佇列和物件全量來判斷物件是否刪除,而knownObjects不為空的時候就不需要了?如果讀者想判斷自己是否已經全部理解可以不看下面自己想想。
我們前面說過,knownObjects就是Indexer(具體實現是cache),而開篇的那副圖已經非常明確的描述了二者以及使用之間的關係。也就是說knownObjects有的物件就是使用者知道的所有物件,此時即便佇列(DeltaFIFO)中有相應的物件,在更新的全量物件中又被刪除了,那就沒必要通知使用者物件刪除了,這種情況可以假想為系統短時間新增並刪除了物件,對使用者來說等同於沒有這個物件。
現在,我們來看看Queue相對於Stored擴充套件的Pop介面:
// 程式碼源自client-go/tools/cache/delta_fifo.go
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
// 佇列中有資料麼?
for len(f.queue) == 0 {
// 看來是先判斷的是否有資料,後判斷是否關閉,這個和chan像
if f.IsClosed() {
return nil, FIFOClosedError
}
// 沒資料那就等待把
f.cond.Wait()
}
// 取出第一個物件
id := f.queue[0]
// 陣列縮小,相當於把陣列中的第一個元素彈出去了,這個不多解釋哈
f.queue = f.queue[1:]
// 取出物件,因為queue中存的是物件鍵
item, ok := f.items[id]
// 同步物件計數減一,當減到0就說明外部已經全部同步完畢了
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
// 物件不存在,這個是什麼情況?貌似我們在合併物件的時候程式碼上有這個邏輯,估計永遠不會執行
if !ok {
continue
}
// 把物件刪除
delete(f.items, id)
// Pop()需要傳入一個回撥函式,用於處理物件
err := process(item)
// 如果需要重新入佇列,那就重新入佇列
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
return item, err
}
}
上面分析的函式基本上就算是把DeltaFIFO核心邏輯分析完畢了,下面我們就把其他的介面函式簡單過一下結束本文章內容:
// 程式碼源自client-go/tools/cache/delta_fifo.go
func (f *DeltaFIFO) HasSynced() bool {
f.lock.Lock()
defer f.lock.Unlock()
// 這裡就比較明白了,一次同步全量物件後,並且全部Pop()出去才能算是同步完成
// 其實這裡所謂的同步就是全量內容已經進入Indexer,Indexer已經是系統中物件的全量快照了
return f.populated && f.initialPopulationCount == 0
}
// 新增不存在的物件
func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error {
// 這個要求放入的必須是Deltas陣列,就是通過Pop()彈出的物件
deltas, ok := obj.(Deltas)
if !ok {
return fmt.Errorf("object must be of type deltas, but got: %#v", obj)
}
// 多個Delta都是一個物件,所以用最新的就可以了
id, err := f.KeyOf(deltas.Newest().Object)
if err != nil {
return KeyError{obj, err}
}
// 後面有實現
f.lock.Lock()
defer f.lock.Unlock()
f.addIfNotPresent(id, deltas)
return nil
}
// 這個是新增不存在物件的實現
func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) {
f.populated = true
// 這裡判斷的物件是否存在
if _, exists := f.items[id]; exists {
return
}
// 放入佇列中
f.queue = append(f.queue, id)
f.items[id] = deltas
f.cond.Broadcast()
}
// 重新同步,這個在cache實現是空的,這裡面有具體實現
func (f *DeltaFIFO) Resync() error {
f.lock.Lock()
defer f.lock.Unlock()
// 如果沒有Indexer那麼重新同步是沒有意義的,因為連同步了哪些物件都不知道
if f.knownObjects == nil {
return nil
}
// 列舉Indexer裡面所有的物件鍵
keys := f.knownObjects.ListKeys()
// 遍歷物件鍵,為每個物件產生一個同步的Delta
for _, k := range keys {
// 具體實現後面有介紹
if err := f.syncKeyLocked(k); err != nil {
return err
}
}
return nil
}
// 具體物件同步實現介面
func (f *DeltaFIFO) syncKeyLocked(key string) error {
// 獲取物件
obj, exists, err := f.knownObjects.GetByKey(key)
if err != nil {
glog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
return nil
} else if !exists {
glog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
return nil
}
// 計算物件的鍵值,有人會問物件鍵不是已經傳入了麼?那個是存在Indexer裡面的物件鍵,可能與這裡的計算方式不同
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
// 物件已經在存在,說明後續會通知物件的新變化,所以再加更新也沒意義
if len(f.items[id]) > 0 {
return nil
}
// 新增物件同步的這個Delta
if err := f.queueActionLocked(Sync, obj); err != nil {
return fmt.Errorf("couldn't queue object: %v", err)
}
return nil
}
總結
分析完程式碼後我我有如下幾個設問:
- 判斷是否已同步populated和initialPopulationCount這兩個變數存在的目的是什麼?我的理解是否已同步指的是第一次從apiserver獲取全量物件是否已經全部通知到外部,也就是通過Pop()被取走。所謂的同步就是指apiserver的狀態已經同步到快取中了,也就是Indexer中;
- 介面AddIfNotPresent()存在的目的是什麼,只有在Pop()函式中使用了一次,但是在呼叫這個介面的時候已經從map中刪除了,所以肯定不存在。這個介面在我看來主要用來保險的,因為Pop()本身就存在重入佇列的可能,外部如果判斷返回錯誤重入佇列就可能會重複;
最後,我們還是用一幅圖來總結一下: