1. 程式人生 > >k8s informer

k8s informer

以nodecontroller為例:

一、先建立sharedInformers供所有controler使用,是一個工廠Informer.(sharedInformers := informers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)()))。其成員有[]sharedIndexInformer切片。

二、每次建立一個controler如node controler、rccontroler等,都會生成一個相應的sharedIndexInformer(namespaceInformer、podInformer等),每種sharedindexInformer中有對應的listwatch函式。

如pod index informer:

func newPodInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
   sharedIndexInformer := cache.NewSharedIndexInformer(
      &cache.ListWatch{
         ListFunc: func(options meta_v1.ListOptions) (runtime.Object, error) {
            return client.CoreV1().Pods(meta_v1.NamespaceAll).List(options)
         },
         WatchFunc: func(options meta_v1.ListOptions) (watch.Interface, error) {
            return client.CoreV1().Pods(meta_v1.NamespaceAll).Watch(options)
         },
      },
      &api_v1.Pod{},
      resyncPeriod,
      cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
   )
   return sharedIndexInformer
}

三、podInformer通過向nodecontroler的taintManager進行更新、新增、刪除。(所有的controler都差不多)

 

shareInformerFactory種類很多:大(組)類分為APP、autoScaling、core、Extensions、Storage等,每個組裡再細分。

分為{group,version,resource}.(一下部分{,,}代表g,v,r,其中*代表下面的資源)所有資源的IndexInformer中都有Lister()(不同版本的lister()函式不一樣,informer一樣)、Informer()函式。不同版本成員lister watcher函式呼叫的不一樣,主要是呼叫不同版本的函式。

lister()函式就是返回shareIndexInformer的indexer

informer()函式就是將自己將生成的podindexinformer加入到shareinformer的informer陣列中,並返回podindexinformer

(1)如{core,v1,*}組裡ComponentStatuses、ConfigMaps、Endpoints、Events、Namespaces、LimitRanges、Nodes、PersistentVolumes、PersistentVolumeClaims、Pods、PodTemplates、ReplicationControllers、ResourceQuotas、Secrets、Services、ServiceAccounts

(2){Extensions,v1beta1,*}組裡有PodSecurityPolicies、DaemonSets、Deployments、Ingresses、ReplicaSets、ThirdPartyResources

(3){Storage,v1,*}組裡有StorageClasses

         {Storage,V1beta1,*}組裡有StorageClasses

(4){Apps,V1beta1,*}裡面有Deployments、StatefulSets

(5){Autoscaling、V1、*}裡面有HorizontalPodAutoscalers

         {Autoscaling、V2alpha1、*}裡面有HorizontalPodAutoscalers

(6){Batch、V1、*}裡面有Jobs

          {Batch,V2alpha1,*}裡面有CronJobs

(7){Certificates、V1beta1、*}裡面有CertificateSigningRequests

(8){Policy、V1beta1、*}裡面有PodDisruptionBudgets

(9){Rbac、V1alpha1 | V1beta1、*}裡面有ClusterRoles、ClusterRoleBindings、Roles、RoleBindings

(10){Settings、V1alpha1、*}裡面有PodPresets

(11){Storage、V1|V1beta1、*}裡面有StorageClasses

程式碼如core,一下軍返回制定了group,version的resource indexInformer.

func (f *sharedInformerFactory) Apps() apps.Interface {
   return apps.New(f)
}
func (f *sharedInformerFactory) Autoscaling() autoscaling.Interface {
   return autoscaling.New(f)
}
func (f *sharedInformerFactory) Batch() batch.Interface {
   return batch.New(f)
}
func (f *sharedInformerFactory) Certificates() certificates.Interface {
   return certificates.New(f)
}
func (f *sharedInformerFactory) Core() core.Interface {
   return core.New(f)
}
func (f *sharedInformerFactory) Extensions() extensions.Interface {
   return extensions.New(f)
}
func (f *sharedInformerFactory) Policy() policy.Interface {
   return policy.New(f)
}
func (f *sharedInformerFactory) Rbac() rbac.Interface {
   return rbac.New(f)
}
func (f *sharedInformerFactory) Settings() settings.Interface {
   return settings.New(f)
}
func (f *sharedInformerFactory) Storage() storage.Interface {
   return storage.New(f)

 

podInformer怎麼工作的?(or 各種informer怎麼工作的):

    首先建立好podInformer後,呼叫podeindexinformer(shareindexinform)的addHandler後,函式裡建立listener,把listener放入sharedIndexInformer.processor的listener切片中,然後run。所以每新增一個handler就建立一個listener,然後run和pop函式。

    listener.run函式一個for{}迴圈監聽事件chan,新增就呼叫add函式,update就呼叫等。

    listener.pop函式一個for{}迴圈監聽pendingnotifications切片,pop出第一個傳送個chan,讓run去處理。

    listener中的noticications是distribute函式分發到所有listener中。

 

下文紅色部分為各類informer公共部分,都一樣,只有處理函式不一樣,需要我們建立controler自定義。

podInformer則config run建立reflector,將watch到的obj存入fifo佇列,然後processloop讀取佇列,呼叫HandleDeltas函式將obj儲存到podinformer的indexer和cacheMutationDetetor物件(用於deepcopy obj和cache中的obj是否相等檢測)中,然後儲存到呼叫上面distribute介面存入listener中。process run則呼叫所有的listener run和pop,讀取indexer中的值,處理儲存到tainmanager(podinformer.addeventhandler()中指定,初始化listener)中.。建立CacheMutationDetector物件用於判斷cache的obj和copy的obj(通過在新增obj時deepcopy函式copy的)是否一樣,他會一直到用deepEqual函式進行比較,不一樣要發報警資訊和pannic

indexer的處理:是一個threadSafeMap型別,items就是name:obj這樣儲存的,indexer存放的是key:func用於處理obj,存入indices中;indice會為每個func生成的值當做鍵,key當做值;舉例子,key是“namespace”,func是獲取pod的namespace的函式(pod可以在多個namespace下,所以返回值為多個),那麼indices就是indices["namespace"]={index},index = map[string]Index {"default":set("namespace"), "kube-system":set("namespace")}

indexer的作用是用來篩選,根據關心的關鍵字刪選item中的obj返回。比如:podinformer中的indexer中的item是全部的pod,我關心pod,而item中儲存的是全部的pod,我想要default namespace下的pod,則就可以通過ByIndex(“namespace”,“default”)函式返回相應的pod列表。

type threadSafeMap struct {

    lock sync.RWMutex

    items map[string]interface{}

    // indexers maps a name to an IndexFunc

    indexers Indexers

    // indices maps a name to an Index

    indices Indices

}

 

所以podinformer(各類informer)啟動了三個server,

一個controller,用於處理建立reflector,watch資源,向indexer中新增obj,併發送給各種listener,

一個是processor,用於各類listener監聽佇列,處理監聽到的obj,呼叫資源處理函式,如node controller的pod informer把pod新增到tainmanager.  此部分為我們建立自己controller要實現的

一個是cacheMutationDetector是用來是進行cache和copy的obj是否一樣,進行不停檢測,報警和pannic.

 

shareIndexInformer的run函式:

建立fifo用於儲存object,而其podeinformer 的indexer則就為fifo的knowedobject佇列。     

   啟動config,新建reflector,不停過一段時間就呼叫podInformer的listandwatch函式,將結果寫入fifo(threadSafeMap)佇列,存入item,key為func計算出來,value為obj,並把key寫入queue。啟動processLOOP函式不停地pop queue中的obj,呼叫HandleDeltas處理obj.

podInformer的indexer通過HandleDeltas函式進行新增,update,刪除等操作。

type threadSafeMap struct {
   lock  sync.RWMutex
   items map[string]interface{}

   // indexers maps a name to an IndexFunc
   indexers Indexers
   // indices maps a name to an Index
   indices Indices
}
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
   defer utilruntime.HandleCrash()
   fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, s.indexer)
   cfg := &Config{
      Queue:            fifo,
      ListerWatcher:    s.listerWatcher,
      ObjectType:       s.objectType,
      FullResyncPeriod: s.resyncCheckPeriod,
      RetryOnError:     false,
      ShouldResync:     s.processor.shouldResync,
      Process: s.HandleDeltas,
   }
   func() {
      s.startedLock.Lock()
      defer s.startedLock.Unlock()

      s.controller = New(cfg)
      s.controller.(*controller).clock = s.clock
      s.started = true
   }()
   s.stopCh = stopCh
   s.cacheMutationDetector.Run(stopCh)
   s.processor.run(stopCh)
   s.controller.Run(stopCh)
}