1. 程式人生 > >16.深入k8s:Informer使用及其原始碼分析

16.深入k8s:Informer使用及其原始碼分析

![63831060_p0_master1200](https://img.luozhiyun.com/20201018000039.jpg) > 轉載請宣告出處哦~,本篇文章釋出於luozhiyun的部落格:https://www.luozhiyun.com > > 由於這部分的程式碼是在client-go 中,所以使用的原始碼版本是client-go 1.19 這次講解我用了很一些圖,儘可能的把這個模組給描述清楚,如果感覺對你有所幫助不妨發一封郵件激勵一下我~ ## Informer機制 ### 機制設計 Informer主要有兩個作用: 1. 通過一種叫作 ListAndWatch 的方法,把 APIServer 中的 API 物件快取在了本地,並負責更新和維護這個快取。ListAndWatch通過 APIServer 的 LIST API“獲取”所有最新版本的 API 物件;然後,再通過 WATCH API 來“監聽”所有這些 API 物件的變化; 2. 註冊相應的事件,之後如果監聽到的事件變化就會呼叫事件對應的EventHandler,實現回撥。 Informer執行原理如下: ![image-20201017000845410](https://img.luozhiyun.com/20201018000043.png) 根據流程圖來解釋一下Informer中幾個元件的作用: * Reflector:用於監控指定的k8s資源,當資源發生變化時,觸發相應的變更事件,如Added事件、Updated事件、Deleted事件,並將器資源物件放到本地DeltaFIFO Queue中; * DeltaFIFO:DeltaFIFO是一個先進先出的佇列,可以儲存資源物件的操作型別; * Indexer:用來儲存資源物件並自帶索引功能的本地儲存,Reflector從DeltaFIFO中將消費出來的資源物件儲存至Indexer; Reflector 包會和 apiServer 建立長連線,並使用 ListAndWatch 方法獲取並監聽某一個資源的變化。List 方法將會獲取某個資源的所有例項,Watch 方法則監聽資源物件的建立、更新以及刪除事件,然後將事件放入到DeltaFIFO Queue中; 然後Informer會不斷的從 Delta FIFO Queue 中 pop 增量事件,並根據事件的型別來決定新增、更新或者是刪除本地快取;接著Informer 根據事件型別來觸發事先註冊好的 Event Handler觸發回撥函式,然後然後將該事件丟到 Work Queue 這個工作佇列中。 ### 例項 將到了go-client部分的程式碼,我們可以直接通過例項來進行上手跑動,Informers Example程式碼示例如下: ```go package main import ( "flag" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/homedir" "log" "path/filepath" "time" ) func main() { var kubeconfig *string //如果是windows,那麼會讀取C:\Users\xxx\.kube\config 下面的配置檔案 //如果是linux,那麼會讀取~/.kube/config下面的配置檔案 if home := homedir.HomeDir(); home != "" { kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file") } else { kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file") } flag.Parse() config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig) if err != nil { panic(err) } clientset, err := kubernetes.NewForConfig(config) if err != nil { panic(err) } stopCh := make(chan struct{}) defer close(stopCh) //表示每分鐘進行一次resync,resync會週期性地執行List操作 sharedInformers := informers.NewSharedInformerFactory(clientset, time.Minute) informer := sharedInformers.Core().V1().Pods().Informer() informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { mObj := obj.(v1.Object) log.Printf("New Pod Added to Store: %s", mObj.GetName()) }, UpdateFunc: func(oldObj, newObj interface{}) { oObj := oldObj.(v1.Object) nObj := newObj.(v1.Object) log.Printf("%s Pod Updated to %s", oObj.GetName(),nObj.GetName()) }, DeleteFunc: func(obj interface{}) { mObj := obj.(v1.Object) log.Printf("Pod Deleted from Store: %s", mObj.GetName()) }, }) informer.Run(stopCh) } ``` 要執行這段程式碼,需要我們將k8s伺服器上的~/.kube程式碼拷貝到本地,我是win10的機器所以拷貝到`C:\Users\xxx\.kube`中。 informers.NewSharedInformerFactory會傳入兩個引數,第1個引數clientset是用於與k8s apiserver互動的客戶端,第2個引數是代表每分鐘會執行一次resync,resync會週期性執行List將所有資源存放再Informer Store中,如果該引數是0,則禁用resync功能。 通過informer.AddEventHandler函式可以為pod資源新增資源事件回撥方法,支援3種資源事件回撥方法: * AddFunc * UpdateFunc * DeleteFunc 通過名稱我們就可以知道是新增、更新、刪除時會回撥這些方法。 在我們初次執行run方法的時候,可以會將監控的k8s上pod存放到本地,並回調AddFunc方法,如下日誌: ``` 2020/10/17 15:13:10 New Pod Added to Store: dns-test 2020/10/17 15:13:10 New Pod Added to Store: web-1 2020/10/17 15:13:10 New Pod Added to Store: fluentd-elasticsearch-nwqph 2020/10/17 15:13:10 New Pod Added to Store: kube-flannel-ds-amd64-bjmt2 2020/10/17 15:13:10 New Pod Added to Store: kubernetes-dashboard-65665f84db-jrw6k 2020/10/17 15:13:10 New Pod Added to Store: mongodb 2020/10/17 15:13:10 New Pod Added to Store: web-0 .... ``` ## 原始碼解析 ### 初始化 #### shared Informer初始化 shared Informer初始化的時候會呼叫到informers.NewSharedInformerFactory進行初始化。 檔案位置:informers/factory.go ```go func NewSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory { return NewSharedInformerFactoryWithOptions(client, defaultResync) } func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory { factory := &sharedInformerFactory{ client: client, namespace: v1.NamespaceAll, defaultResync: defaultResync, informers: make(map[reflect.Type]cache.SharedIndexInformer), startedInformers: make(map[reflect.Type]bool), customResync: make(map[reflect.Type]time.Duration), } // Apply all options for _, opt := range options { factory = opt(factory) } return factory } ``` NewSharedInformerFactory方法最終會呼叫到NewSharedInformerFactoryWithOptions初始化一個sharedInformerFactory,在初始化的時候會初始化一個informers,用來快取不同型別的informer。 #### informer 初始化 informer初始化會呼叫sharedInformerFactory的方法進行初始化,並且可以呼叫不同資源的Informer。 ```go podInformer := sharedInformers.Core().V1().Pods().Informer() nodeInformer := sharedInformers.Node().V1beta1().RuntimeClasses().Informer() ``` 定義不同資源的Informer可以用來監控node或pod。 通過呼叫Informer方法會根據型別來建立Informer,同一類資源會共享同一個informer。 檔案路徑:informers/factory.go ```go func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { //建立informer return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) } func (f *podInformer) Informer() cache.SharedIndexInformer { //傳入上面定義的defaultInformer方法,用於建立informer return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer) } func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer { f.lock.Lock() defer f.lock.Unlock() //獲取informer型別 informerType := reflect.TypeOf(obj) //查詢map快取,如果存在,那麼直接返回 informer, exists := f.informers[informerType] if exists { return informer } //根據型別查詢resync的週期 resyncPeriod, exists := f.customResync[informerType] if !exists { resyncPeriod = f.defaultResync } //呼叫defaultInformer方法建立informer informer = newFunc(f.client, resyncPeriod) f.informers[informerType] = informer return informer } ``` 呼叫InformerFor方法的時候會傳入defaultInformer方法用於建立informer。 InformerFor方法裡面首先會去sharedInformerFactory的map快取中根據型別查詢對應的informer,如果存在那麼直接返回,如果不存在,那麼則會呼叫newFunc方法建立informer,然後設定到informers快取中。 下面我們看一下NewFilteredPodInformer是如何建立Informer的: 檔案位置:informers/core/v1/pod.go ```go func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { return cache.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { if tweakListOptions != nil { tweakListOptions(&options) } //呼叫apiserver獲取pod列表 return client.CoreV1().Pods(namespace).List(context.TODO(), options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { if tweakListOptions != nil { tweakListOptions(&options) } //呼叫apiserver監控pod列表 return client.CoreV1().Pods(namespace).Watch(context.TODO(), options) }, }, &corev1.Pod{}, resyncPeriod, indexers, ) } ``` 這裡是真正的建立一個informer,並註冊了List&Watch的回撥函式,list回撥函式的api類似下面這樣: ```go result = &v1.PodList{} err = c.client.Get(). Namespace(c.ns). Resource("pods"). VersionedParams(&opts, scheme.ParameterCodec). Timeout(timeout). Do(ctx). Into(result) ``` 構造Informer通過NewSharedIndexInformer完成: ```go func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer { realClock := &clock.RealClock{} sharedIndexInformer := &sharedIndexInformer{ processor: &sharedProcessor{clock: realClock}, indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), listerWatcher: lw, objectType: exampleObject, resyncCheckPeriod: defaultEventHandlerResyncPeriod, defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod, cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)), clock: realClock, } return sharedIndexInformer } ``` sharedIndexInformer裡面會建立sharedProcessor,設定List&Watch的回撥函式,建立了一個indexer,我們這裡看一下NewIndexer是怎麼建立indexer的: ```go func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer { return &cache{ cacheStorage: NewThreadSafeStore(indexers, Indices{}), keyFunc: keyFunc, } } ``` NewIndexer方法建立了一個cache,它的keyFunc是DeletionHandlingMetaNamespaceKeyFunc,即接受一個object,生成它的**namepace/name**的字串。cache裡面的資料會存放到cacheStorage中,它是一個threadSafeMap用來儲存資源物件並自帶索引功能的本地儲存。 ### 註冊EventHandler事件 EventHandler事件的註冊是通過informer的AddEventHandler方法進行的。在呼叫AddEventHandler方法的時候,傳入一個cache.ResourceEventHandlerFuncs結構體: 檔案位置:tools/cache/shared_informer.go ```go func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) { s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod) } func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) { s.startedLock.Lock() defer s.startedLock.Unlock() ... //初始化監聽器 listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize) //如果informer還沒啟動,那麼直接將監聽器加入到processor監聽器列表中 if !s.started { s.processor.addListener(listener) return } //如果informer已經啟動,那麼需要加鎖 s.blockDeltas.Lock() defer s.blockDeltas.Unlock() s.processor.addListener(listener) //然後將indexer中快取的資料寫入到listener中 for _, item := range s.indexer.List() { listener.add(addNotification{newObj: item}) } } ``` AddEventHandler方法會呼叫到AddEventHandlerWithResyncPeriod方法中,然後呼叫newProcessListener初始化listener。 接著會校驗informer是否已經啟動,如果沒有啟動,那麼直接將監聽器加入到processor監聽器列表中並返回;如果informer已經啟動,那麼需要加鎖將監聽器加入到processor監聽器列表中,然後將indexer中快取的資料寫入到listener中。 需要注意的是listener.add方法會呼叫processorListener的add方法,這個方法會將資料寫入到addCh管道中: ```go func (p *processorListener) add(notification interface{}) { p.addCh <- notification } ``` addCh管道里面數據是用來處理事件回撥的,後面我會說到。 大致的流程如下: ![image-20201017213620364](https://img.luozhiyun.com/20201018000210.png) ### 啟動Informer模組 最後我們在上面的demo中會使用sharedIndexInformer的Run方法來啟動Informer模組。 檔案位置:tools/cache/shared_informer.go ```go func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() //初始化DeltaFIFO佇列 fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ KnownObjects: s.indexer, EmitDeltaTypeReplaced: true, }) cfg := &Config{ //設定Queue為DeltaFIFO佇列 Queue: fifo, //設定List&Watch的回撥函式 ListerWatcher: s.listerWatcher, ObjectType: s.objectType, //設定Resync週期 FullResyncPeriod: s.resyncCheckPeriod, RetryOnError: false, //判斷有哪些監聽器到期需要被Resync ShouldResync: s.processor.shouldResync, Process: s.HandleDeltas, WatchErrorHandler: s.watchErrorHandler, } func() { s.startedLock.Lock() defer s.startedLock.Unlock() //非同步建立controller s.controller = New(cfg) s.controller.(*controller).clock = s.clock s.started = true }() processorStopCh := make(chan struct{}) var wg wait.Group defer wg.Wait() // Wait for Processor to stop defer close(processorStopCh) // Tell Processor to stop wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run) //呼叫run方法啟動processor wg.StartWithChannel(processorStopCh, s.processor.run) defer func() { s.startedLock.Lock() defer s.startedLock.Unlock() s.stopped = true }() //啟動controller s.controller.Run(stopCh) } ``` 這段程式碼主要做了以下幾件事: 1. 呼叫NewDeltaFIFOWithOptions方法初始化DeltaFIFO佇列; 2. 初始化Config結果體,作為建立controller的引數; 3. 非同步建立controller; 4. 呼叫run方法啟動processor; 5. 呼叫run方法啟動controller; 下面我們看看sharedProcessor的run方法做了什麼: ```go func (p *sharedProcessor) run(stopCh <-chan struct{}) { func() { ... //遍歷監聽器 for _, listener := range p.listeners { //下面兩個方法是核心的事件call back的方法 p.wg.Start(listener.run) p.wg.Start(listener.pop) } p.listenersStarted = true }() ... } ``` run方法會呼叫processorListener的run方法和pop方法,這兩個方法合在一起完成了事件回撥。 ```go func (p *processorListener) add(notification interface{}) { p.addCh <- notification } func (p *processorListener) pop() { defer utilruntime.HandleCrash() defer close(p.nextCh) // Tell .run() to stop var nextCh chan<- interface{} var notification interface{} for { select { case nextCh <- notification: // Notification dispatched var ok bool notification, ok = p.pendingNotifications.ReadOne() if !ok { // Nothing to pop nextCh = nil // Disable this select case } case notificationToAdd, ok := <-p.addCh: if !ok { return } if notification == nil { notification = notificationToAdd nextCh = p.nextCh } else { p.pendingNotifications.WriteOne(notificationToAdd) } } } } ``` 這段程式碼,我把add方法也貼到這裡了,是因為監聽的事件都是從這個方法傳入的,然後寫入到addCh管道中。 pop方法在select程式碼塊中會獲取addCh管道中的資料,第一個迴圈的時候notification是nil,所以會將nextCh設定為p.nextCh;第二個迴圈的時候會將資料寫入到nextCh中。 當notification不為空的時候是直接將資料存入pendingNotifications快取中的,取也是從pendingNotifications中讀取。 下面我們看看run方法: ```go func (p *processorListener) run() { stopCh := make(chan struct{}) wait.Until(func() { for next := range p.nextCh { switch notification := next.(type) { case updateNotification: p.handler.OnUpdate(notification.oldObj, notification.newObj) case addNotification: p.handler.OnAdd(notification.newObj) case deleteNotification: p.handler.OnDelete(notification.oldObj) default: utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next)) } } // the only way to get here is if the p.nextCh is empty and closed close(stopCh) }, 1*time.Second, stopCh) } ``` run每秒遍歷一次nextCh中的資料,然後根據不同的notification型別執行不同的回撥方法,這裡會回撥到我們在main方法中註冊的eventHandler。 下面我們再回到sharedIndexInformer的Run方法中往下走,會執行controller的Run方法。 檔案位置:tools/cache/controller.go ```go func (c *controller) Run(stopCh <-chan struct{}) { ... //建立Reflector r := NewReflector( c.config.ListerWatcher, c.config.ObjectType, c.config.Queue, c.config.FullResyncPeriod, ) ... //啟動Reflector wg.StartWithChannel(stopCh, r.Run) //每秒中迴圈呼叫DeltaFIFO佇列的pop方法, wait.Until(c.processLoop, time.Second, stopCh) wg.Wait() } ``` 這裡對應Informer執行原理裡面Informer上部分建立Reflector並進行監聽,和下部分迴圈呼叫DeltaFIFO佇列的pop方法進行分發。 ### 啟動Reflector進行監聽 Reflector的Run方法最後會呼叫到Reflector的ListAndWatch方法進行監聽獲取資源。ListAndWatch程式碼會分為兩部分,一部分是List,一部分是Watch。 我們先看List部分程式碼: 程式碼位置:tools/cache/reflector.go ```go func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { ... if err := func() error { ... go func() { defer func() { if r := recover(); r != nil { panicCh <- r } }() pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) { //根據引數獲取pod 列表 return r.listerWatcher.List(opts) })) ... list, paginatedResult, err = pager.List(context.Background(), options) ... close(listCh) }() ... //獲取資源版本號 resourceVersion = listMetaInterface.GetResourceVersion() initTrace.Step("Resource version extracted") //將資源資料轉換成資源物件列表 items, err := meta.ExtractList(list) ... //將資源物件列表中的資源物件和資源版本號儲存至DeltaFIFO佇列中 if err := r.syncWith(items, resourceVersion); err != nil { return fmt.Errorf("unable to sync list result: %v", err) } ... r.setLastSyncResourceVersion(resourceVersion) return nil }(); err != nil { return err } ... } ``` 這部分的程式碼會分為如下幾個部分: 1. 呼叫listerWatcher.List方法,獲取資源下的所有物件的資料,這個方法會通過api呼叫到apiServer獲取資源列表,程式碼我在上面已經貼出來了; 2. 呼叫listMetaInterface.GetResourceVersion獲取資源版本號; 3. 呼叫meta.ExtractList方法將資源資料轉換成資源物件列表; 4. 將資源物件列表中的資源物件和資源版本號儲存至DeltaFIFO佇列中; 5. 最後呼叫setLastSyncResourceVersion方法更新資源版本號; 下面看看Watch部分的程式碼: ```go func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { ... for { ... //呼叫clientset客戶端api與apiServer建立長連線,監控指定資源的變更 w, err := r.listerWatcher.Watch(options) ... //處理資源的變更事件 if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil { ... return nil } } } ``` 這裡會迴圈呼叫clientset客戶端api與apiServer建立長連線,監控指定資源的變更,如果監控到有資源變更,那麼會呼叫watchHandler處理資源的變更事件。 ```go func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error { ... loop: for { select { case <-stopCh: return errorStopRequested case err := <-errc: return err case event, ok := <-w.ResultChan(): ... // 獲取資源版本號 newResourceVersion := meta.GetResourceVersion() switch event.Type { //將新增資源事件新增到DeltaFIFO佇列中 case watch.Added: err := r.store.Add(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err)) } //將更新資源事件新增到DeltaFIFO佇列中 case watch.Modified: err := r.store.Update(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err)) } //將刪除資源事件新增到DeltaFIFO佇列中 case watch.Deleted: err := r.store.Delete(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err)) } ... *resourceVersion = newResourceVersion r.setLastSyncResourceVersion(newResourceVersion) eventCount++ } } ... } ``` watchHandler方法會根據傳入的資源型別呼叫不同的方法轉換成不同的Delta然後存入到DeltaFIFO佇列中。 ### processLoop分發DeltaFIFO佇列中任務 processLoop方法,以1s為週期,週期性的執行。 檔案位置:tools/cache/controller.go ```go func (c *controller) processLoop() { for { obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) if err != nil { if err == ErrFIFOClosed { return } if c.config.RetryOnError { // This is the safe way to re-enqueue. c.config.Queue.AddIfNotPresent(obj) } } } } ``` 這裡會迴圈將DeltaFIFO佇列中資料pop出隊,然後交給Process方法進行處理,Process方法是在上面呼叫sharedIndexInformer的Run方法的資料設定,設定的方法是sharedIndexInformer的HandleDeltas方法。 ```go func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { s.blockDeltas.Lock() defer s.blockDeltas.Unlock() // from oldest to newest //根據obj的Type型別進行分發 for _, d := range obj.(Deltas) { switch d.Type { case Sync, Replaced, Added, Updated: s.cacheMutationDetector.AddObject(d.Object) //如果快取中存在該物件 if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { //更新indexr if err := s.indexer.Update(d.Object); err != nil { return err } isSync := false switch { case d.Type == Sync: // Sync events are only propagated to listeners that requested resync isSync = true case d.Type == Replaced: //新老物件獲取版本號進行比較 if accessor, err := meta.Accessor(d.Object); err == nil { if oldAccessor, err := meta.Accessor(old); err == nil { // Replaced events that didn't change resourceVersion are treated as resync events // and only propagated to listeners that requested resync isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion() } } } s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) // 如果快取中不存在該物件 } else { if err := s.indexer.Add(d.Object); err != nil { return err } s.processor.distribute(addNotification{newObj: d.Object}, false) } case Deleted: if err := s.indexer.Delete(d.Object); err != nil { return err } s.processor.distribute(deleteNotification{oldObj: d.Object}, false) } } return nil } ``` HandleDeltas會與indexer快取互動更新我們從Delta FIFO中取到的內容,之後通過`s.processor.distribute()`進行訊息的分發。 在distribute中,sharedProcesser通過`listener.add(obj)`向每個listener分發該object。而該函式中又執行了`p.addCh <- notification`。 ```go func (p *sharedProcessor) distribute(obj interface{}, sync bool) { p.listenersLock.RLock() defer p.listenersLock.RUnlock() if sync { for _, listener := range p.syncingListeners { listener.add(obj) } } else { for _, listener := range p.listeners { listener.add(obj) } } } ``` 這裡可以結合上面的`p.wg.Start(listener.run)`和`p.wg.Start(listener.pop)`方法來進行理解,這裡將notification傳入到addCh管道之後會觸發EventHandler事件。 這裡我用一張圖總結一下informer的Run方法流程: ![image-20201017233145402](https://img.luozhiyun.com/20201018000217.png) 至此,我們分析完了informer的所有機制。 ## 總結 通過上面分析,我們全面熟悉了k8s是如何通過Informer機制實現ListAndWatch獲取並監視 API 物件變化。 熟悉了Informer與Reflector是如何協同進行資料的傳遞,但是我這裡有點遺憾的是限於篇幅,沒有去詳細的講解DeltaFIFO佇列裡面是如何進行資料的儲存與獲取,實際上這個佇列的實現也是非常的有意思的。 對於Indexer來說,我在文章裡面也只說到了獲取DeltaFIFO佇列的資料後更新到Indexer的ThreadSafeMap中,但是並沒有講ThreadSafeMap這個儲存是如何做的,裡面的索引又是如何建立的,這些各位同學感興趣的也可以去研究一下。 ## Reference https://www.kubernetes.org.cn/2693.html https://github.com/kubernetes/sample-controller/blob/master/docs/controller-client-go.md https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/ https://mp.weixin.qq.com/s?__biz=MzU1OTAzNzc5MQ==&mid=2247484052&idx=1&sn=cec9f4a1ee0d21c5b2c51bd147b8af59&chksm=fc1c2ea4cb6ba7b283eef5ac4a45985437c648361831bc3e6dd5f38053be1968b3389386e415&scene=21#wechat_r