1. 程式人生 > >深入淺出kubernetes之client-go的SharedInformer

深入淺出kubernetes之client-go的SharedInformer

記得大學剛畢業那年看了侯俊傑的《深入淺出MFC》,就對深入淺出這四個字特別偏好,並且成為了自己對技術的要求標準——對於技術的理解要足夠的深刻以至於可以用很淺顯的道理給別人講明白。以下內容為個人見解,如有雷同,純屬巧合,如有錯誤,煩請指正。

本文基於kubernetes1.11版本,後續會根據kubernetes版本更新及時更新文件,所有程式碼引用為了簡潔都去掉了日誌列印相關的程式碼,儘量只保留有價值的內容。

在開始本文內容前,請先閱讀《深入淺出kubernetes之client-go的indexer》《深入淺出kubernetes之client-go的DeltaFIFO》


目錄

SharedInformer概述

ListerWatcher

Reflector實現

Controller實現

SharedInformer分析

SharedInformer的定義

CacheMutationDetector

sharedProcessor分析

processorListener分析

sharedProcessor管理processorListener

SharedInformer實現

總結


SharedInformer概述

Informer(就是SharedInformer)是client-go的重要組成部分,在瞭解client-go之前,瞭解一下Informer的實現是很有必要的,下面引用了官方的圖,可以看到Informer在client-go中的位置。

                             

前期鋪墊了Indexer和DeltaFIFO,為的就是方便本文的理解,也是時候來一個長篇大論了。SharedInformer總名字上直譯就是資訊提供者,至於Shared是什麼意思,我們來看看官方註釋:

// SharedInformer has a shared data cache and is capable of distributing notifications for changes
// to the cache to multiple listeners who registered via AddEventHandler

不難看出Shared指的是多個listeners共享同一個cache,而且資源的變化會同時通知到cache和listeners。這個解釋和上面圖所展示的內容的是一致的,cache我們在Indexer的介紹中已經分析過了,listerners指的就是OnAdd、OnUpdate、OnDelete這些回撥函式背後的物件,本文就要對Informer進行系統性的分析。我們先對上面的圖做一些初步的認識:

  1. List/Watch:List是列舉apiserver中物件的介面,Watch是監控apiserver資源變化的介面;
  2. Reflector:我習慣成稱之為反射器,實現對apiserver指定型別物件的監控,其中反射實現的就是把監控的結果例項化成具體的物件;
  3. DeltaIFIFO:將Reflector監控的變化的物件形成一個FIFO佇列,此處的Delta就是變化,DeltaFIFO我們已經有文章詳細介紹了;
  4. LocalStore:指的就是Indexer的實現cache,這裡面快取的就是apiserver中的物件(其中有一部分可能還在DeltaFIFO中),此時使用者再查詢物件的時候就直接從cache中查詢,減少了apiserver的壓力;
  5. Callbacks:通知回撥函式,Infomer感知的所有物件變化都是通過回撥函式通知使用者(Listener);

ListerWatcher

ListerWatcher是一個interface型別,定義如下:

// 程式碼源自client-go/tools/cache/listwatch.go
// 其中metav1.ListOptions,runtime.Object,watch.Interface都定義在apimachinery這個包中
type ListerWatcher interface {
    // 根據選項列舉物件
    List(options metav1.ListOptions) (runtime.Object, error)
    // 根據選項監控物件變化
    Watch(options metav1.ListOptions) (watch.Interface, error)
}

 這裡面我們不會無限的展開下去,只要知道ListerWatcher是通過apiserver的API來列舉和監控的就行了,具體是如何實現的其實當前來看並不重要。需要注意一點:ListerWatcher是針對某一類物件的,比如Pod,不是所有物件的,這個在構造ListerWatcher物件的時候由apiserver的client型別決定了。

Reflector實現

為了方便理解,本章節引入資源的概念,其實對於kubernetes資源和物件是同一個東西,只是我更喜歡稱之為物件。但是下面需要引入同類物件這個概念,所以採用資源代表同類物件的集合(例如Pod集合)。按照慣例,我們先從型別定義入手:

// 程式碼源自client-go/tools/cache/reflector.go
type Reflector struct {
    name string                                 // 名字
    metrics *reflectorMetrics                   // 但凡遇到metrics多半是用於做監控的,可以忽略
    expectedType reflect.Type                   // 反射的型別,也就是要監控的物件型別,比如Pod
    store Store                                 // 儲存,就是DeltaFIFO,為什麼,後面會有程式碼證明
    listerWatcher ListerWatcher                 // 這個是用來從apiserver獲取資源用的
    period       time.Duration                  // 反射器在List和Watch的時候理論上是死迴圈,只有出現錯誤才會退出
                                                // 這個變數用在出錯後多長時間再執行List和Watch,預設值是1秒鐘
    resyncPeriod time.Duration                  // 重新同步的週期,很多人肯定認為這個同步週期指的是從apiserver的同步週期
                                                // 其實這裡面同步指的是shared_informer使用者需要定期同步全量物件
    ShouldResync func() bool                    // 如果需要同步,呼叫這個函式問一下,當然前提是該函式指標不為空
    clock clock.Clock                           // 時鐘
    lastSyncResourceVersion string              // 最後一次同步的資源版本
    lastSyncResourceVersionMutex sync.RWMutex   // 還專門為最後一次同步的資源版本弄了個鎖
}

根據上面定義的成員變數,我們可以推匯出:

  1. listerWatcher用於獲取和監控資源,lister可以獲取物件的全量,watcher可以獲取物件的增量(變化);
  2. 系統會週期性的執行list-watch的流程,一旦過程中失敗就要重新執行流程,這個重新執行的週期就是period指定的;
  3. expectedType規定了監控物件的型別,非此型別的物件將會被忽略;
  4. 例項化後的expectedType型別的物件會被新增到store中;
  5. kubernetes資源在apiserver中都是有版本的,物件的任何除了修改(新增、刪除、更新)都會造成資源版本更新,所以lastSyncResourceVersion就是指的這個版本;
  6. 如果使用者需要定期同步全量物件,那麼Reflector就會定期產生全量物件的同步事件給DeltaFIFO;

按照上面的推導,基本每個成員變數都涉及到了,彷彿我們已經知道Reflector的工作原理了,下面我們就要通過原始碼逐一驗證上面的推導。Reflector有一個Run()函式,這個是Reflector的核心功能流程,我們可以沿著這個流程分析:

// 程式碼源自client-go/tools/cache/reflector.go
func (r *Reflector) Run(stopCh <-chan struct{}) {
    // func Until(f func(), period time.Duration, stopCh <-chan struct{})是下面函式的宣告
    // 這裡面我們不用關心wait.Until是如何實現的,只要知道他呼叫函式f會被每period週期執行一次
    // 意思就是f()函式執行完畢再等period時間後在執行一次,也就是r.ListAndWatch()會被週期性的呼叫
    wait.Until(func() {
        if err := r.ListAndWatch(stopCh); err != nil {
            utilruntime.HandleError(err)
        }
    }, r.period, stopCh)
}

從這裡看,程式碼的實現符合推導2,我們繼續看ListAndWatch()函式實現:

// 程式碼源自client-go/tools/cache/reflector.go
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
    var resourceVersion string
    // 很多儲存類的系統都是這樣設計的,資料採用版本的方式記錄,資料每變化(新增、刪除、更新)都會觸發版本更新,
    // 這樣的做法可以避免全量資料訪問。以apiserver資源監控為例,只要監控比快取中資源版本大的物件就可以了,
    // 把變化的部分更新到快取中就可以達到與apiserver一致的效果,一般資源的初始版本為0,從0版本開始列舉就是全量的物件了
    options := metav1.ListOptions{ResourceVersion: "0"}
    // 與監控相關的內容不多解釋
    r.metrics.numberOfLists.Inc()
    start := r.clock.Now()
    // 列舉資源,這部分是apimachery相關的內容,讀者感興趣可以自己瞭解
    list, err := r.listerWatcher.List(options)
    if err != nil {
        return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)
    }
    // 還是監控相關的
    r.metrics.listDuration.Observe(time.Since(start).Seconds())
    // 下面的程式碼主要是利用apimachinery相關的函式實現,就是把列舉返回的結果轉換為物件陣列
    // 下面的程式碼大部分來自apimachinery,此處不做過多說明,讀者只要知道實現什麼功能就行了
    listMetaInterface, err := meta.ListAccessor(list)
    if err != nil {
        return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
    }
    resourceVersion = listMetaInterface.GetResourceVersion()
    
    items, err := meta.ExtractList(list)
    if err != nil {
        return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
    }
    // 和監控相關的內容
    r.metrics.numberOfItemsInList.Observe(float64(len(items)))
    // 以上部分都是物件例項化的過程,可以稱之為反射,也是Reflector這個名字的主要來源,本文不是講解反射原理的,
    // 而是作為SharedInformer的前端,所以我們重點介紹的是物件在SharedInformer中流轉過程,所以反射原理部分不做為重點講解
    // 這可是真正從apiserver同步過來的全量物件,所以要同步到DeltaFIFO中
    if err := r.syncWith(items, resourceVersion); err != nil {
        return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
    }
    // 設定最新的同步的物件版本
    r.setLastSyncResourceVersion(resourceVersion)
    // 下面要啟動一個後臺協程實現定期的同步操作,這個同步就是將SharedInformer裡面的物件全量以同步事件的方式通知使用者
    // 我們暫且稱之為“後臺同步協程”,Run()函式退出需要後臺同步協程退出,所以下面的cancelCh就是幹這個用的
    // 利用defer close(cancelCh)實現的,而resyncerrc是後臺同步協程反向通知Run()函式的報錯通道
    // 當後臺同步協程出錯,Run()函式接收到訊號就可以退出了
    resyncerrc := make(chan error, 1)
    cancelCh := make(chan struct{})
    defer close(cancelCh)
    // 下面這個匿名函式就是後臺同步協程的函數了
    go func() {
        // resyncCh返回的就是一個定時器,如果resyncPeriod這個為0那麼就會返回一個永久定時器,cleanup函式是用來清理定時器的
        resyncCh, cleanup := r.resyncChan()
        defer func() {
            cleanup() 
        }()
        // 死迴圈等待各種訊號
        for {
            // 只有定時器有訊號才繼續處理,其他的都會退出
            select {
            case <-resyncCh:
            case <-stopCh:
                return
            case <-cancelCh:
                return
            }
            // ShouldResync是個函式地址,建立反射器物件的時候傳入,即便時間到了,也要通過函式問問是否需要同步
            if r.ShouldResync == nil || r.ShouldResync() {
                // 我們知道這個store是DeltaFIFO,DeltaFIFO.Resync()做了什麼,讀者自行溫習相關的文章~
                // 就在這裡實現了我們前面提到的同步,從這裡看所謂的同步就是以全量物件同步事件的方式通知使用者
                if err := r.store.Resync(); err != nil {
                    resyncerrc <- err
                    return
                }
            }
            // 清理掉當前的計時器,獲取下一個同步時間定時器
            cleanup()
            resyncCh, cleanup = r.resyncChan()
        }
    }()

    // 前面已經列舉了全量物件,接下來就是watch的邏輯了
    for {
        // 如果有退出訊號就立刻返回,否則就會往下走,因為有default.
        select {
        case <-stopCh:
            return nil
        default:
        }

        // 計算watch的超時時間
        timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
        // 設定watch的選項,因為前期列舉了全量物件,從這裡只要監聽最新版本以後的資源就可以了
        // 如果沒有資源變化總不能一直掛著吧?也不知道是卡死了還是怎麼了,所以有一個超時會好一點
        options = metav1.ListOptions{
            ResourceVersion: resourceVersion,
            TimeoutSeconds: &timeoutSeconds,
        }
        // 監控相關
        r.metrics.numberOfWatches.Inc()
        // 開始監控物件
        w, err := r.listerWatcher.Watch(options)
        // watch產生錯誤了,大部分錯誤就要退出函式然後再重新來一遍流程
        if err != nil {
            switch err {
            case io.EOF:
            case io.ErrUnexpectedEOF:
            default:
                utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err))
            }
            // 類似於網路拒絕連線的錯誤要等一會兒再試,因為可能網路繁忙
            if urlError, ok := err.(*url.Error); ok {
                if opError, ok := urlError.Err.(*net.OpError); ok {
                    if errno, ok := opError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED {
                        time.Sleep(time.Second)
                        continue
                    }
                }
            }
            return nil
        }

        // watch返回是流,apiserver會將變化的資源通過這個流傳送出來,client-go最終通過chan實現的
        // 所以watchHandler()是一個需要持續從chan讀取資料的流程,所以需要傳入resyncerrc和stopCh
        // 用於非同步通知退出或者後臺同步協程錯誤
        if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
            if err != errorStopRequested {
                glog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
            }
            return nil
        }
    }
}

上面的函式中,呼叫了兩個私有函式,分別為syncWith()和watchHandler()。syncWith()用於實現一次從apiserver全量物件的同步,這裡的同步和我們上面提到的同步不是一回事,這裡指的是從apiserver的同步。watchHandler是實現監控apiserver資源變化的處理過程,主要就是把apiserver的資源變化轉換為DeltaFIFO呼叫。我們接下來就看這兩個函式的具體實現

接下來我們就要看看watchHandler做了什麼?

// 程式碼源自client-go/tools/cache/reflector.go
// 實現apiserver全量物件的同步
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
    // 做一次slice型別轉換
    found := make([]interface{}, 0, len(items))
    for _, item := range items {
        found = append(found, item)
    }
    // 直接呼叫了DeltaFIFO的Replace()介面,這個介面就是用於同步全量物件的
    return r.store.Replace(found, resourceVersion)
}
// 實現從watch返回的chan中持續讀取變化的資源,並轉換為DeltaFIFO相應的呼叫
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
    start := r.clock.Now()
    eventCount := 0
    // 監控相關
    defer func() {
        r.metrics.numberOfItemsInWatch.Observe(float64(eventCount))
        r.metrics.watchDuration.Observe(time.Since(start).Seconds())
    }()

    // 這裡就開始無限迴圈的從chan中讀取資源的變化,也可以理解為資源的增量變化,同時還要監控各種訊號
loop:
    for {
        select {
        // 系統退出訊號
        case <-stopCh:
            return errorStopRequested
        // 後臺同步協程出錯訊號
        case err := <-errc:
            return err
        // watch函式返回的是一個chan,通過這個chan持續的讀取物件
        case event, ok := <-w.ResultChan():
            // 如果不OK,說明chan關閉了,就要重新獲取,這裡面我們可以推測這個chan可能會執行過程中重新建立
            // 否則就應該退出而不是繼續迴圈
            if !ok {
                break loop
            }
            // 看來event可以作為錯誤的返回值,挺有意思,而不是通過關閉chan,這種方式可以傳遞錯誤資訊,關閉chan做不到
            if event.Type == watch.Error {
                return apierrs.FromObject(event.Object)
            }
            // 這裡面就是利用反射例項化物件了,而且判斷了物件型別是我們設定的型別
            if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a {
                utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
                continue
            }
            // 和list操作相似,也要獲取物件的版本,要更新快取中的版本,下次watch就可以忽略這些資源了
            meta, err := meta.Accessor(event.Object)
            if err != nil {
                utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
                continue
            }
            newResourceVersion := meta.GetResourceVersion()
            // 根據事件的型別做不同的DeltaFIFO的操作
            switch event.Type {
            // 向DeltaFIFO新增一個新增的Delta
            case watch.Added:
                err := r.store.Add(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
                }
            // 更新物件,向DeltaFIFO新增一個更新的Delta
            case watch.Modified:
                err := r.store.Update(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
            }
            // 刪除物件,向DeltaFIFO新增一個刪除的Delta
            case watch.Deleted:
                err := r.store.Delete(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
            }
            // 其他型別就不知道幹什麼了,只能報錯
            default:
            utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
            }
            // 更新最新資源版本
            *resourceVersion = newResourceVersion
            r.setLastSyncResourceVersion(newResourceVersion)
            eventCount++
        }
    }
    // watch返回時間非常短而且沒有任何事件要處理,這個屬於異常現象,因為我們watch是設定了超時的
    watchDuration := r.clock.Now().Sub(start)
    if watchDuration < 1*time.Second && eventCount == 0 {
        r.metrics.numberOfShortWatches.Inc()
        return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
    }

    return nil
}

 至此,Reflector的核心功能就算分析完了,我們再把其他的周邊函式簡單的過一下:

// 程式碼源自client-go/tools/cache/reflector.go
func (r *Reflector) setLastSyncResourceVersion(v string) {
    // 設定已經獲取到資源的最新版本
    r.lastSyncResourceVersionMutex.Lock()
    defer r.lastSyncResourceVersionMutex.Unlock()
    r.lastSyncResourceVersion = v

    rv, err := strconv.Atoi(v)
    if err == nil {
        r.metrics.lastResourceVersion.Set(float64(rv))
    }
}

// 獲取resync定時器,叫定時器比較好理解,叫chan很難和定時關聯起來
func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
    // 如果resyncPeriod說明就不用定時同步,返回的是永久超時的定時器
    if r.resyncPeriod == 0 {
        return neverExitWatch, func() bool { return false }
    }
    // 構建定時起
    t := r.clock.NewTimer(r.resyncPeriod)
    return t.C(), t.Stop
}

再次,我對Reflector做一下總結,其中內容和上面的推導很相似:

  1. Reflector利用apiserver的client列舉全量物件(版本為0以後的物件全部列舉出來)
  2. 將全量物件採用Replace()介面同步到DeltaFIFO中,並且更新資源的版本號,這個版本號後續會用到;
  3. 開啟一個協程定時執行resync,如果沒有設定定時同步則不會執行,同步就是把全量物件以同步事件的方式通知出去;
  4. 通過apiserver的client監控(watch)資源,監控的當前資源版本號以後的物件,因為之前的都已經獲取到了;
  5. 一旦有物件發生變化,那麼就會根據變化的型別(新增、更新、刪除)呼叫DeltaFIFO的相應介面,產生一個相應的物件Delta,同時更新當前資源的版本;

Controller實現

此controller非我們比較熟悉的controller-manager管理的各種各樣的controller,kubernetes裡面controller簡直是氾濫啊。這裡的controller定義在client-go/tools/cache/controller.go中,目的是用來把Reflector、DeltaFIFO組合起來形成一個相對固定的、標準的處理流程。理解了Controller,基本就算把SharedInfomer差不多搞懂了。話不多說,先上程式碼:

// 程式碼源自client-go/tools/cache/controller.go
// 這是一個Controller的抽象
type Controller interface {
    Run(stopCh <-chan struct{})      // 核心流程函式
    HasSynced() bool                 // apiserver中的物件是否已經同步到了Store中
    LastSyncResourceVersion() string // 最新的資源版本號
}

從上面的定義來看,HasSynced()可呼叫DeltaFIFO. HasSynced()實現,LastSyncResourceVersion()可以通過Reflector實現。因為Controller把多個模組整合起來實現了一套業務邏輯,所以在建立Controller需要提供一些配置:

// 程式碼源自client-go/tools/cache/controller.go
type Config struct {
    Queue                          // SharedInformer使用DeltaFIFO
    ListerWatcher                  // 這個用來構造Reflector
    Process ProcessFunc            // 這個在呼叫DeltaFIFO.Pop()使用,彈出物件要如何處理
    ObjectType runtime.Object      // 物件型別,這個肯定是Reflector使用
    FullResyncPeriod time.Duration // 全量同步週期,這個在Reflector使用
    ShouldResync ShouldResyncFunc  // Reflector在全量更新的時候會呼叫該函式詢問
    RetryOnError bool              // 錯誤是否需要嘗試
}

從上面兩個型別的定義我們可以猜測:Controller自己構造Reflector獲取物件,Reflector作為DeltaFIFO生產者持續監控apiserver的資源變化並推送到佇列中。Controller的Run()應該是佇列的消費者,從佇列中彈出物件並呼叫Process()處理。所以Controller相比於Reflector因為佇列的加持表現為每次有資源變化就會呼叫一次使用者定義的處理函式。

我們順著上面的推測看看程式碼的具體實現:

// 程式碼源自client-go/tools/cache/controller.go
// controller是Controller的實現型別
type controller struct {
    config         Config       // 配置,上面有講解
    reflector      *Reflector   // 反射器
    reflectorMutex sync.RWMutex // 反射器的鎖
    clock          clock.Clock  // 時鐘
}
// 核心業務邏輯實現
func (c *controller) Run(stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    // 建立一個協程,如果收到系統退出的訊號就關閉佇列,相當於在這裡析構的佇列
    go func() {
        <-stopCh
        c.config.Queue.Close()
    }()
    // 建立Reflector,傳入的引數都是我們上一個章節解釋過的,這裡不贅述
    r := NewReflector(
        c.config.ListerWatcher,
        c.config.ObjectType,
        c.config.Queue,
        c.config.FullResyncPeriod,
    )
    // r.ShouldResync的存在就是為了以後使用少些一點程式碼?否則直接使用c.config.ShouldResync不就完了麼?不明白用意
    r.ShouldResync = c.config.ShouldResync
    r.clock = c.clock
    // 記錄反射器
    c.reflectorMutex.Lock()
    c.reflector = r
    c.reflectorMutex.Unlock()
    // wait.Group不是本章的講解內容,只要把它理解為類似barrier就行了
    // 被他管理的所有的協程都退出後呼叫Wait()才會退出,否則就會被阻塞
    var wg wait.Group
    defer wg.Wait()
    // StartWithChannel()會啟動協程執行Reflector.Run(),同時接收到stopCh訊號就會退出協程
    wg.StartWithChannel(stopCh, r.Run)
    // wait.Until()在前面的章節講過了,週期性的呼叫c.processLoop(),這裡來看是1秒
    // 不用擔心呼叫頻率太高,正常情況下c.processLoop是不會返回的,除非遇到了解決不了的錯誤,因為他是個迴圈
    wait.Until(c.processLoop, time.Second, stopCh)
}

從上面程式碼上看,私有函式processLoop()才是核心邏輯的實現:

// 程式碼源自client-go/tools/cache/controller.go
func (c *controller) processLoop() {
    for {
        // 從佇列中彈出一個物件,然後處理它,這才是最主要的部分,這個c.config.Process是構造Controller的時候通過Config傳進來的
        // 所以這個讀者要特別注意了,這個函式其實是ShareInformer傳進來的,所以在分析SharedInformer的時候要重點分析的
        obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
        if err != nil {
            // 如果FIFO關閉了那就退出
            if err == FIFOClosedError {
                return
            }
            // 如果錯誤可以再試試
            if c.config.RetryOnError {
                c.config.Queue.AddIfNotPresent(obj)
            }
        }
    }
}

上面的程式碼是不是很簡單?這就對了,因為核心處理邏輯實現在了Process函式中了。這時候是不是很想看看這個Process()函式到底幹了什麼?後面SharedInformer的實現章節會有詳細介紹,我們還得把周邊函式過一遍:

// 程式碼源自client-go/tools/cache/controller.go
// HasSynced() 呼叫的就是DeltaFIFO.HasSynced()實現的
func (c *controller) HasSynced() bool {
    return c.config.Queue.HasSynced()
}
// LastSyncResourceVersion() 是利用Reflector實現的
func (c *controller) LastSyncResourceVersion() string {
    if c.reflector == nil {
        return ""
    }
    return c.reflector.LastSyncResourceVersion()
}

SharedInformer分析

SharedInformer的定義

本文之所以長是因為需要前期介紹很多內容,現在才剛剛開始本文的正題,我們來看看原始碼是怎麼定義SharedInformer:

// 程式碼源自client-go/tools/cache/shared_informer.go
type SharedInformer interface {
    // 新增資源事件處理器,關於ResourceEventHandler的定義在下面
    // 相當於註冊回撥函式,當有資源變化就會通過回撥通知使用者,是不是能和上面介紹的Controller可以聯絡上了?
    // 為什麼是Add不是Reg,說明可以支援多個handler
    AddEventHandler(handler ResourceEventHandler)
    // 上面新增的是不需要週期同步的處理器,下面的介面新增的是需要週期同步的處理器,週期同步上面提了好多遍了,不贅述
    AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
    // Store這個有專門的文章介紹,這個函式就是獲取Store的介面,說明SharedInformer內有Store物件
    GetStore() Store
    // Controller在上面的章節介紹了,說明SharedInformer內有Controller物件
    GetController() Controller
    // 這個應該是SharedInformer的核心邏輯實現的地方
    Run(stopCh <-chan struct{})
    // 因為有Store,這個函式就是告知使用者Store裡面是否已經同步了apiserver的資源,這個介面很有用
    // 當建立完SharedInformer後,通過Reflector從apiserver同步全量物件,然後在通過DeltaFIFO一個一個的同志到cache
    // 這個介面就是告知使用者,全量的物件是不是已經同步到了cache,這樣就可以從cache列舉或者查詢了
    HasSynced() bool
    // 最新同步資源的版本,這個就不多說了,通過Controller(Controller通過Reflector)實現
    LastSyncResourceVersion() string
}
// 擴充套件了SharedInformer型別,從型別名字上看共享的是Indexer,Indexer也是一種Store的實現
type SharedIndexInformer interface {
    // 繼承了SharedInformer
    SharedInformer
    // 擴充套件了Indexer相關的介面
    AddIndexers(indexers Indexers) error
    GetIndexer() Indexer
}
// 程式碼源自client-go/tools/cache/controller.go,SharedInformer使用者如果需要處理資源的事件
// 那麼就要自己實現相應的回撥函式
type ResourceEventHandler interface {
    // 新增物件回撥函式
    OnAdd(obj interface{})
    // 更新物件回撥函式
    OnUpdate(oldObj, newObj interface{})
    // 刪除物件回撥函式
    OnDelete(obj interface{})
}

相信對Reflector、Controller、Indexer有深入瞭解的人基本已經把ShareInformer的實現基本猜個差不多了,那麼我們來看看SharedInformer的實現類是如何定義的:

// 程式碼源自client-go/tools/cache/shared_informer.go
type sharedIndexInformer struct {
    // Indexer也是一種Store,這個我們知道的,Controller負責把Reflector和FIFO邏輯串聯起來
    // 所以這兩個變數就涵蓋了開篇那張圖裡面的Reflector、DeltaFIFO和LocalStore(cache)
    indexer    Indexer
    controller Controller
    // sharedIndexInformer把上面提到的ResourceEventHandler進行了在層封裝,並統一由sharedProcessor管理,後面章節專門介紹
    processor             *sharedProcessor
    // CacheMutationDetector其實沒啥用,我理解是開發者自己實現的一個除錯工具,用來發現物件突變的
    // 實現方法也比較簡單,DeltaFIFO彈出的物件在處理前先備份(深度拷貝)一份,然後定期比對兩個物件是否相同
    // 如果不同那就報警,說明處理過程中有人修改過物件,這個功能預設是關閉,所以我說沒啥用
    cacheMutationDetector CacheMutationDetector
    // 這兩個變數是給Reflector用的,我們知道Reflector是在Controller建立的
    listerWatcher ListerWatcher
    objectType    runtime.Object
    // 定期同步的週期,因為可能存在多個ResourceEventHandler,就有可能存在多個同步週期,sharedIndexInformer採用最小的週期
    // 這個週期值就儲存在resyncCheckPeriod中,通過AddEventHandler()新增的處理器都採用defaultEventHandlerResyncPeriod
    resyncCheckPeriod time.Duration
    defaultEventHandlerResyncPeriod time.Duration
    // 時鐘
    clock clock.Clock
    // 啟動、停止標記,肯定有人會問為啥用兩個變數,一個變數不就可以實現啟動和停止了麼?
    // 其實此處是三個狀態,啟動前,已啟動和已停止,start表示了兩個狀態,而且為啟動標記專門做了個鎖
    // 說明啟動前和啟動後有互斥的資源操作
    started, stopped bool
    startedLock      sync.Mutex

    // 這個名字起的也是夠了,因為DeltaFIFO每次Pop()的時候需要傳入一個函式用來處理Deltas
    // 處理Deltas也就意味著要把訊息通知給處理器,如果此時呼叫了AddEventHandler()
    // 就會存在崩潰的問題,所以要有這個鎖,阻塞Deltas....細想名字也沒毛病~
    blockDeltas sync.Mutex
}

上面程式碼註釋中簡單介紹了sharedProcessor和CacheMutationDetector,我們下面就要對他們兩個型別做詳細解析。 

CacheMutationDetector

CacheMutationDetector這個就是檢測物件在過程中突變的,何所謂突變呢?突變就是莫名其妙的修改了,如何實現突變檢測,也是比較簡單的。CacheMutationDetector對所有的物件做了一次深度拷貝(DeepCopy),然後定期比較兩個物件是否一致,當發現有不同時說明物件突變了,然後就panic。我認為CacheMutationDetector是用來除錯的,因為程式碼預設是關閉的:

// 程式碼源自client-go/tools/cache/mutation_detector.go
// 預設關閉突變檢測
var mutationDetectionEnabled = false
// 但是可以通過環境變數的KUBE_CACHE_MUTATION_DETECTOR開啟
func init() {
    mutationDetectionEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_CACHE_MUTATION_DETECTOR"))
}

// 這個是突變檢測的型別抽象
type CacheMutationDetector interface {
    AddObject(obj interface{})  // 用於記錄所有的物件
    Run(stopCh <-chan struct{}) // 開啟協程定期比對
}
// 建立CacheMutationDetector物件
func NewCacheMutationDetector(name string) CacheMutationDetector {
    // 如果沒有開啟選項就構造一個什麼都不做的物件
    if !mutationDetectionEnabled {
        return dummyMutationDetector{}
    }
    // 如果開啟了選項,那麼就構造一個預設的突變檢測器
    glog.Warningln("Mutation detector is enabled, this will result in memory leakage.")
    return &defaultCacheMutationDetector{name: name, period: 1 * time.Second}
}
// 這就是什麼都不做的突變檢測器
type dummyMutationDetector struct{}
func (dummyMutationDetector) Run(stopCh <-chan struct{}) {
}
func (dummyMutationDetector) AddObject(obj interface{}) {
}

預設的突變檢測器讀者自行分析把,因為比較簡單並且預設還不開啟,我就省點筆墨了~ 

sharedProcessor分析

有沒有感覺shared這個詞被kubernetes玩兒壞了(繼controller之後有一個背玩兒壞的單詞),sharedProcessor這又shared啥了?首先需要知道Processor的定義,這裡定義的Processor就是處理事件的東西。什麼事件,就是SharedInformer向外部通知的事件。因為官方程式碼沒有註釋,我猜是shared是同一個SharedInformer,有沒有很繞嘴?還有更繞的在後面呢,我們還要了解一個新的型別,那就是processorListener,processor剛說完,又來了個Listener!

通過SharedInformer.AddEventHandler()新增的處理器最終就會封裝成processorListener,然後通過sharedProcessor管理起來,通過processorListener的封裝就可以達到所謂的有事處理,沒事掛起。

processorListener分析

processorListener可以理解為兩個核心功能,一個是processor,一個是listener,用一句話概括,有事做事沒事掛起。先看看processorListener的定義:

// 程式碼源自clien-go/tools/cache/shared_informer.go
type processorListener struct {
    // nextCh、addCh、handler、pendingNotifications的用法請參看我的《golang的chan有趣用法》裡面有相關的例子
    // 總結這四個變數實現了事件的輸入、緩衝、處理,事件就是apiserver資源的變化
    nextCh chan interface{}
    addCh  chan interface{}
    handler ResourceEventHandler
    pendingNotifications buffer.RingGrowing
    // 下面四個變數就是跟定時同步相關的了,requestedResyncPeriod是處理器設定的定時同步週期
    // resyncPeriod是跟sharedIndexInformer對齊的同步時間,因為sharedIndexInformer管理了多個處理器
    // 最終所有的處理器都會對齊到一個週期上,nextResync就是下一次同步的時間點
    requestedResyncPeriod time.Duration
    resyncPeriod time.Duration
    nextResync time.Time
    resyncLock sync.Mutex
}

我們需要知道就是processor如何接收事件(此處事件就是apiserver的資源變化,也就是DeltaFIFO輸出的Deltas)?如何通知事件處理器?如何緩衝處理器?如何阻塞處理器進而形成listener的?一系列的問題我們需要沿著處理邏輯的流程逐一解釋。第一個問題,事件是如何傳入的:

// 程式碼源自client-go/tools/cache/shared_informer.go
// 對,就這麼簡單,通過addCh傳入,這裡面的notification就是我們所謂的事件
func (p *processorListener) add(notification interface{}) {
    p.addCh <- notification
}

因為addCh是無緩衝chan,呼叫add()函式的人是事件分發器。意思就是從DeltaFIFO彈出的Deltas要要逐一送到多個處理器,此時如果處理器沒有及時處理會造成addCh把分發器阻塞,那別的處理器也就同樣無法收到新的事件了。這一點,processorListener利用一個後臺協程處理這個問題(相應的原理參看《golang的chan有趣用法》):

// 程式碼源自client-go/tools/cache/shared_informer.go
// 這個函式是通過sharedProcessor利用wait.Group啟動的,讀者可以自行檢視wait.Group
func (p *processorListener) pop() {
    defer utilruntime.HandleCrash()
    // nextCh是在這裡,函式退出前析構的
    defer close(p.nextCh)
    // 臨時變數,下面會用到
    var nextCh chan<- interface{}
    var notification interface{}
    // 進入死迴圈啦
    for {
        select {
        // 有兩種情況,nextCh還沒有初始化,這個語句就會被阻塞,這個我在《深入淺出golang之chan》說過
        // nextChan後面會賦值為p.nextCh,因為p.nextCh也是無緩衝的chan,資料不傳送成功就阻塞                        
        case nextCh <- notification:
            // 如果傳送成功了,那就從緩衝中再取一個事件出來
            var ok bool
            notification, ok = p.pendingNotifications.ReadOne()
            if !ok {
                // 如果沒有事件,那就把nextCh再次設定為nil,接下來對於nextCh操作還會被阻塞
                nextCh = nil
            }
        // 從p.addCh讀取一個事件出來,這回看到消費p.addCh的地方了
        case notificationToAdd, ok := <-p.addCh:
            // 說明p.addCh關閉了,只能退出
            if !ok {
                return
            }
            // notification為空說明當前還沒傳送任何事件給處理器
            if notification == nil {
                // 那就把剛剛獲取的事件通過p.nextCh傳送個處理器
                notification = notificationToAdd
                nextCh = p.nextCh
            } else {
                // 上一個事件還沒有傳送成功,那就先放到快取中
                // pendingNotifications可以想象為一個slice,這樣方便理解,是一個動態的快取,
                p.pendingNotifications.WriteOne(notificationToAdd)
            }
        }
    }
}

pop()函式實現的非常巧妙,利用一個協程就把接收、緩衝、傳送全部解決了。它充分的利用了golang的select可以同時操作多個chan的特性,同時從addChd讀取資料從nextCh傳送資料,這兩個chan任何一個完成都可以啟用協程。對於C/C++程式猿理解起來有點費勁,但這就是GO的魅力所在。接下來,我們看看從nextCh讀取事件後是如何處理的:

// 程式碼源自client-go/tools/cache/shared_informer.go
// 這個也是sharedProcessor通過wait.Group啟動的
func (p *processorListener) run() {
    // 因為wait.Until需要傳入退出訊號的chan
    stopCh := make(chan struct{})
    // wait.Until不多說了,我在前期不點的文章中說過了,只要沒有收到退出訊號就會週期的執行傳入的函式
    wait.Until(func() {
        // wait.ExponentialBackoff()和wait.Until()類似,wait.Until()是無限迴圈
        // wait.ExponentialBackoff()是嘗試幾次,每次等待時間會以指數上漲
        err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
            // 這也是chan的range用法,可以參看我的《深入淺出golang的chan》瞭解細節
            for next := range p.nextCh {
                // 判斷事件型別,這裡面的handler就是呼叫SharedInfomer.AddEventHandler()傳入的
                // 理論上處理的不是Deltas麼?怎麼變成了其他型別,這是SharedInformer做的二次封裝,後面會看到
                switch notification := next.(type) {
                case updateNotification:
                    p.handler.OnUpdate(notification.oldObj, notification.newObj)
                case addNotification:
                    p.handler.OnAdd(notification.newObj)
                case deleteNotification:
                    p.handler.OnDelete(notification.oldObj)
                default:
                    utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
                }
            }

            return true, nil
        })

        // 執行到這裡只能是nextCh已經被關閉了,所以關閉stopCh,通知wait.Until()退出
        if err == nil {
            close(stopCh)
        }
    }, 1*time.Minute, stopCh)
}

因為processorListener其他函式沒啥大用,上面兩個函式就就已經把核心功能都實現了。processorListener就是實現了事件的緩衝和處理,此處的處理就是使用者傳入的函式。在沒有事件的時候可以阻塞處理器,當事件較多是可以把事件緩衝起來,實現了事件分發器與處理器的非同步處理。

processorListener的run()和pop()函式是sharedProcessor啟動的協程呼叫的,所以下面就要對sharedProcessor進行分析了。

sharedProcessor管理processorListener

sharedProcessor的定義如下:

// client-go/tools/cache/shared_informer.go
// sharedProcessor是通過陣列組織處理器的,只是分了需要定時同步和不需要要同步兩類
type sharedProcessor struct {
    listenersStarted bool                 // 所有處理器是否已經啟動的標識
    listenersLock    sync.RWMutex         // 讀寫鎖
    listeners        []*processorListener // 通用的處理器
    syncingListeners []*processorListener // 需要定時同步的處理器
    clock            clock.Clock          // 時鐘
    wg               wait.Group           // 前面講過了processorListener每個需要兩個協程,
                                          // 用wait.Group來管理所有處理器的攜程,保證他們都能退出
}

在sharedProcessor裡面用的都是listener(監聽器),我更傾向於叫處理器,因為我更看重他是處理事件的,所以後面見到listener我叫處理器的時候不要奇怪。我們來看看新增一個處理器是如何實現的:

// 程式碼源自client-go/tools/cache/shared_informer.go
// 新增處理器,sharedIndexInformer.AddEventHandler()就會呼叫這個函式實現處理器的新增
func (p *sharedProcessor) addListener(listener *processorListener) {
    // 加鎖,這個很好理解
    p.listenersLock.Lock()
    defer p.listenersLock.Unlock()
    // 把處理器新增到陣列中
    p.addListenerLocked(listener)
    // 通過wait.Group啟動兩個協程,做的事情我們在processorListener說過了,這裡就是我們上面提到的啟動兩個協程的地方
    // 這個地方判斷了listenersStarted,這說明sharedProcessor在啟動前、後都可以新增處理器
    if p.listenersStarted {
        p.wg.Start(listener.run)
        p.wg.Start(listener.pop)
    }
}
// 把處理器新增到陣列中
func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
    // 兩類(定時同步和不同步)的處理器陣列都添加了,這是因為沒有定時同步的也會用預設的時間,後面我們會看到
    // 那麼問題來了,那還用兩個陣列幹什麼呢?
    p.listeners = append(p.listeners, listener)
    p.syncingListeners = append(p.syncingListeners, listener)
}

在SharedInformer的介面中有一個與之對應的介面,就是SharedInformer.AddEventHandler()。因為SharedInformer沒有刪除處理器的藉口,sharedProcessor也沒有相應藉口。接下來就是sharedProcessor的分發事件的介面:

// 程式碼源自client-go/tools/cache/shared_informer.go
// 通過函式名稱也能感覺到分發的感覺~sync表示obj物件是否為同步事件物件
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
    // 加鎖沒毛病
    p.listenersLock.RLock()
    defer p.listenersLock.RUnlock()

    // 無論是否為sync,新增處理器的程式碼中我們知道兩個陣列都會被新增,所以判斷不判斷沒啥區別~
    // 所以我的猜測是程式碼以前實現的是明顯區分兩類的,但隨著程式碼的更新二者的界限已經沒那麼明顯了
    if sync {
        for _, listener := range p.syncingListeners {
            listener.add(obj)
        }
    } else {
        for _, listener := range p.listeners {
            listener.add(obj)
        }
    }
}

sharedProcessor執行起來後,唯一需要做的就是等待退出訊號然後關閉所有的處理器,來看看具體實現程式碼:

// 程式碼源自client-go/tools/cache/shared_informer.go
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
    // 啟動前、後對於新增處理器的邏輯是不同,啟動前的處理器是不會立刻啟動連個協程執行處理器的pop()和run()函式的
    // 而是在這裡統一的啟動
    func() {
        p.listenersLock.RLock()
        defer p.listenersLock.RUnlock()
        // 遍歷所有的處理器,然後為處理器啟動兩個後臺協程
        for _, listener := range p.listeners {
            p.wg.Start(listener.run)
            p.wg.Start(listener.pop)
        }
        p.listenersStarted = true
	}()
    // 等待退出訊號
    <-stopCh
    p.listenersLock.RLock()
    defer p.listenersLock.RUnlock()
    // 關閉addCh,processorListener.pop()這個協程就會退出,不明白的可以再次回顧程式碼
    // 因為processorListener.pop()會關閉processorListener.nextCh,processorListener.run()就會退出
    // 所以這裡只要關閉processorListener.addCh就可以自動實現兩個協程的退出,不得不說設計的還是挺巧妙的
    for _, listener := range p.listeners {
        close(listener.addCh) 
    }
    // 等待所有的協程退出,這裡指的所有協程就是所有處理器的那兩個協程
    p.wg.Wait()
}

SharedInformer實現

既然已經定性為長篇大論了,那就從建立SharedInformer物件開始一擼到底。client-go實現了兩個建立SharedInformer的介面,如下所示:

// 程式碼源自client-go/tools/cache/shared_informer.go
// lw:這個是apiserver客戶端相關的,用於Reflector從apiserver獲取資源,所以需要外部提供
// objType:這個SharedInformer監控的物件型別
// resyncPeriod:同步週期,SharedInformer需要多長時間給使用者傳送一次全量物件的同步時間
func NewSharedInformer(lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration) SharedInformer {
    // 還是用SharedIndexInformer實現的
    return NewSharedIndexInformer(lw, objType, resyncPeriod, Indexers{})
}
// 建立SharedIndexInformer物件,其中大部分引數再上面的函式已經介紹了
// indexers:需要外部提供計算物件索引鍵的函式,也就是這裡面的物件需要通過什麼方式建立索引
func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
    realClock := &clock.RealClock{}
    sharedIndexInformer := &sharedIndexInformer{
        // 管理所有處理器用的,這個上面的章節解釋了
        processor:                       &sharedProcessor{clock: realClock},
        // 其實就是在構造cache,讀者可以自行檢視NewIndexer()的實現,
        // 在cache中的物件用DeletionHandlingMetaNamespaceKeyFunc計算物件鍵,用indexers計算索引鍵
        // 可以想象成每個物件鍵是Namespace/Name,每個索引鍵是Namespace,即按照Namesapce分類
        // 因為objType決定了只有一種型別物件,所以Namesapce是最大的分類
        indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
        // 下面這兩主要就是給Controller用,確切的說是給Reflector用的
        listerWatcher:                   lw,
        objectType:                      objType,
        // 無論是否需要定時同步,SharedInformer都提供了一個預設的同步時間,當然這個是外部設定的
        resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
        defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
        // 預設沒有開啟的物件突變檢測器,沒啥用,也不多介紹
        cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
        clock: realClock,
    }
    return sharedIndexInformer
}

建立完ShareInformer物件,就要新增事件處理器了:

// 程式碼源自client-go/tools/cache/shared_informer.go
// 新增沒有指定同步週期的事件處理器
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
    // defaultEventHandlerResyncPeriod是預設的同步週期,在建立SharedInformer的時候設定的
    s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
}
// 新增需要定期同步的事件處理器
func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
    // 因為是否已經開始對於新增事件處理器的方式不同,後面會有介紹,所以此處加了鎖
    s.startedLock.Lock()
    defer s.startedLock.Unlock()

    // 如果已經結束了,那就可以直接返回了
    if s.stopped {
        return
    }
    // 如果有同步週期,==0就是永遠不用同步
    if resyncPeriod > 0 {
        // 同步週期不能太短,太短對於系統來說反而是個負擔,大量的無效計算浪費在這上面
        if resyncPeriod < minimumResyncPeriod {
            resyncPeriod = minimumResyncPeriod
        }
        // SharedInformer管理了很多處理器,每個處理器都有自己的同步週期,所以此處要統一成一個,稱之為對齊
        // SharedInformer會選擇所有處理器中最小的那個作為所有處理器的同步週期,稱為對齊後的同步週期
        // 此處就要判斷是不是比當前對齊後的同步週期還要小
        if resyncPeriod < s.resyncCheckPeriod {
            // 如果已經啟動了,那麼只能用和大家一樣的週期
            if s.started {
                resyncPeriod = s.resyncCheckPeriod
            // 如果沒啟動,那就讓大家都用最新的對齊同步週期
            } else {
                s.resyncCheckPeriod = resyncPeriod
                s.processor.resyncCheckPeriodChanged(resyncPeriod)
            }
        }
    }
    // 建立處理器,程式碼一直用listener,可能想強調沒事件就掛起把,我反而想用處理器這個名詞
    // determineResyncPeriod()這個函式讀者自己分析把,非常簡單,這裡面只要知道建立了處理器就行了
    listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
    // 如果沒有啟動,那麼直接新增處理器就可以了
    if !s.started {
        s.processor.addListener(listener)
        return
    }

    // 這個鎖就是暫停再想所有的處理器分發事件用的,因為這樣會遍歷所有的處理器,此時新增會有風險
    s.blockDeltas.Lock()
    defer s.blockDeltas.Unlock()
    // 新增處理器
    s.processor.addListener(listener)
    // 這裡有意思啦,遍歷緩衝中的所有物件,通知處理器,因為SharedInformer已經啟動了,可能很多物件已經讓其他的處理器處理過了,
    // 所以這些物件就不會再通知新新增的處理器,此處就是解決這個問題的
    for _, item := range s.indexer.List() {
        listener.add(addNotification{newObj: item})
    }
}

事件處理器新增完了,就要看SharedInformer如何把事件分發給每個處理器的了:

// 程式碼源自client-go/tools/cache/shared_informer.go
// sharedIndexInformer的核心邏輯函式
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    // 在此處構造的DeltaFIFO
    fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
    // 這裡的Config是我們介紹Reflector時介紹的那個Config
    cfg := &Config{
        // 我前面一直在說Reflector輸入到DeltaFIFO,這裡算是直接證明了
        Queue:            fifo,            
        // 下面這些變數我們在Reflector都說了,這裡贅述
        ListerWatcher:    s.listerWatcher, 
        ObjectType:       s.objectType,
        FullResyncPeriod: s.resyncCheckPeriod,
        RetryOnError:     false,
        ShouldResync:     s.processor.shouldResync,
        // 這個才是重點,Controller呼叫DeltaFIFO.Pop()介面傳入的就是這個回撥函式,也是我們後續重點介紹的
        Process: s.HandleDeltas,
    }
    // 建立Controller,這個不用多說了
    func() {
        s.startedLock.Lock()
        defer s.startedLock.Unlock()

        s.controller = New(cfg)
        s.controller.(*controller).clock = s.clock
        s.started = true
    }()
    // 這個processorStopCh 是給sharedProcessor和cacheMutationDetector傳遞退出訊號的
    // 因為這裡要建立兩個協程執行sharedProcessor和cacheMutationDetector的核心函式
    processorStopCh := make(chan struct{})
    var wg wait.Group
    defer wg.Wait()              // Wait for Processor to stop
    defer close(processorStopCh) // Tell Processor to stop
    wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
    wg.StartWithChannel(processorStopCh, s.processor.run)

    // Run()函式都退出了,也就應該設定結束的標識了
    defer func() {
        s.startedLock.Lock()
        defer s.startedLock.Unlock()
        s.stopped = true 
    }()
    // 啟動Controller,Controller一旦執行,整個流程就開始啟動了,所以叫Controller也不為過
    // 畢竟Controller是SharedInformer的發動機嘛
    s.controller.Run(stopCh)
}

sharedIndexInformer通過Run()函式啟動了Controller和sharedProcess(),Controller通過DeltaFIFO.Pop()函式彈出Deltas,並呼叫函式處理,這個處理函式就是sharedIndexInformer.HandleDeltas(),這個函式是銜接Controller和sharedProcess的關鍵點,他把Deltas轉換為sharedProcess需要的各種Notification型別。下面我們就對這個函式進行程式碼分析:

// 程式碼源自client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
    // 看到這裡就知道為啥起名為blockDeltas了,這是阻塞處理器Deltas啊~因為分發事件到處理器,所以要加鎖
    s.blockDeltas.Lock()
    defer s.blockDeltas.Unlock()

    // Deltas裡面包含了一個物件的多個增量操作,所以要從最老的Delta到最先的Delta遍歷處理
    for _, d := range obj.(Deltas) {
        // 根據不同的Delta做不同的操作,但是大致分為物件新增、刪除兩大類操作
        // 所有的操作都要先同步到cache在通知處理器,這樣保持處理器和cache的狀態是一致的
        switch d.Type {
        // 同步、新增、更新都是物件新增類的造作,至於是否是更新還要看cache是否有這個物件
        case Sync, Added, Updated:
            // 看看物件是不是有定時同步產生的事件
            isSync := d.Type == Sync
            // 檢測突變,沒啥用
            s.cacheMutationDetector.AddObject(d.Object)
            // 如果cache中有的物件,一律看做是更新事件
            if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
                // 把物件更新到cache中
                if err := s.indexer.Update(d.Object); err != nil {
                    return err
                }
                // 通知處理器處理事件
                s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
            // cache中沒有的物件,一律看做是新增事件
            } else {
                // 把物件新增到cache中
                if err := s.indexer.Add(d.Object); err != nil {
                    return err
                }
                // 通知處理器處理器事件
                s.processor.distribute(addNotification{newObj: d.Object}, isSync)
            }
        // 物件被刪除
        case Deleted:
            // 從cache中刪除物件
            if err := s.indexer.Delete(d.Object); err != nil {
                return err
            }
            // 通知所有的處理器物件被刪除了
            s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
        }
    }
    return nil
}

至此,我們算是把SharedInformer的核心功能全部擼了一遍,以前對他感覺非常複雜,先來來看並不複雜,甚至非常簡單。

總結

我們對SharedInformer做一下總結:

  1. 利用apiserver的api實現資源的列舉和監控(Reflector實現);

  2. 利用cache儲存apiserver中的部分物件,通過物件型別進行制定,並在cache中採用Namespace做物件的索引

  3. 先通過apiserver的api將物件的全量列舉出來儲存在cache中,然後再watch資源,一旦有變化就更新cache中;

  4. 更新到cache中的過程通過DeltaFIFO實現的有順序的更新,因為資源狀態是通過全量+增量方式實現同步的,所以順序錯誤會造成狀態不一致;

  5. 使用者可以註冊回撥函式(類似掛鉤子),在更新到cache的同時通知使用者處理,為了保證回撥處理不被某一個處理器阻塞,SharedInformer實現了processorListener非同步緩衝處理;

  6. 真個過程是Controller是發動機,驅動整個流程運轉;

最後我們還是用一幅圖來總結SharedInformer,絕對的乾貨(其中Reflector.resync()因為是個匿名函式,所以用斜體,其實是不存在這個函式的)~