k8s informer
以nodecontroller為例:
一、先建立sharedInformers供所有controler使用,是一個工廠Informer.(sharedInformers := informers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)()))。其成員有[]sharedIndexInformer切片。
二、每次建立一個controler如node controler、rccontroler等,都會生成一個相應的sharedIndexInformer(namespaceInformer、podInformer等),每種sharedindexInformer中有對應的listwatch函式。
如pod index informer:
func newPodInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { sharedIndexInformer := cache.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(options meta_v1.ListOptions) (runtime.Object, error) { return client.CoreV1().Pods(meta_v1.NamespaceAll).List(options) }, WatchFunc: func(options meta_v1.ListOptions) (watch.Interface, error) { return client.CoreV1().Pods(meta_v1.NamespaceAll).Watch(options) }, }, &api_v1.Pod{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) return sharedIndexInformer }
三、podInformer通過向nodecontroler的taintManager進行更新、新增、刪除。(所有的controler都差不多)
shareInformerFactory種類很多:大(組)類分為APP、autoScaling、core、Extensions、Storage等,每個組裡再細分。
分為{group,version,resource}.(一下部分{,,}代表g,v,r,其中*代表下面的資源)所有資源的IndexInformer中都有Lister()(不同版本的lister()函式不一樣,informer一樣)、Informer()函式。不同版本成員lister watcher函式呼叫的不一樣,主要是呼叫不同版本的函式。
lister()函式就是返回shareIndexInformer的indexer
informer()函式就是將自己將生成的podindexinformer加入到shareinformer的informer陣列中,並返回podindexinformer
(1)如{core,v1,*}組裡ComponentStatuses、ConfigMaps、Endpoints、Events、Namespaces、LimitRanges、Nodes、PersistentVolumes、PersistentVolumeClaims、Pods、PodTemplates、ReplicationControllers、ResourceQuotas、Secrets、Services、ServiceAccounts
(2){Extensions,v1beta1,*}組裡有PodSecurityPolicies、DaemonSets、Deployments、Ingresses、ReplicaSets、ThirdPartyResources
(3){Storage,v1,*}組裡有StorageClasses
{Storage,V1beta1,*}組裡有StorageClasses
(4){Apps,V1beta1,*}裡面有Deployments、StatefulSets
(5){Autoscaling、V1、*}裡面有HorizontalPodAutoscalers
{Autoscaling、V2alpha1、*}裡面有HorizontalPodAutoscalers
(6){Batch、V1、*}裡面有Jobs
{Batch,V2alpha1,*}裡面有CronJobs
(7){Certificates、V1beta1、*}裡面有CertificateSigningRequests
(8){Policy、V1beta1、*}裡面有PodDisruptionBudgets
(9){Rbac、V1alpha1 | V1beta1、*}裡面有ClusterRoles、ClusterRoleBindings、Roles、RoleBindings
(10){Settings、V1alpha1、*}裡面有PodPresets
(11){Storage、V1|V1beta1、*}裡面有StorageClasses
程式碼如core,一下軍返回制定了group,version的resource indexInformer.
func (f *sharedInformerFactory) Apps() apps.Interface {
return apps.New(f)
}
func (f *sharedInformerFactory) Autoscaling() autoscaling.Interface {
return autoscaling.New(f)
}
func (f *sharedInformerFactory) Batch() batch.Interface {
return batch.New(f)
}
func (f *sharedInformerFactory) Certificates() certificates.Interface {
return certificates.New(f)
}
func (f *sharedInformerFactory) Core() core.Interface {
return core.New(f)
}
func (f *sharedInformerFactory) Extensions() extensions.Interface {
return extensions.New(f)
}
func (f *sharedInformerFactory) Policy() policy.Interface {
return policy.New(f)
}
func (f *sharedInformerFactory) Rbac() rbac.Interface {
return rbac.New(f)
}
func (f *sharedInformerFactory) Settings() settings.Interface {
return settings.New(f)
}
func (f *sharedInformerFactory) Storage() storage.Interface {
return storage.New(f)
podInformer怎麼工作的?(or 各種informer怎麼工作的):
首先建立好podInformer後,呼叫podeindexinformer(shareindexinform)的addHandler後,函式裡建立listener,把listener放入sharedIndexInformer.processor的listener切片中,然後run。所以每新增一個handler就建立一個listener,然後run和pop函式。
listener.run函式一個for{}迴圈監聽事件chan,新增就呼叫add函式,update就呼叫等。
listener.pop函式一個for{}迴圈監聽pendingnotifications切片,pop出第一個傳送個chan,讓run去處理。
listener中的noticications是distribute函式分發到所有listener中。
下文紅色部分為各類informer公共部分,都一樣,只有處理函式不一樣,需要我們建立controler自定義。
podInformer則config run建立reflector,將watch到的obj存入fifo佇列,然後processloop讀取佇列,呼叫HandleDeltas函式將obj儲存到podinformer的indexer和cacheMutationDetetor物件(用於deepcopy obj和cache中的obj是否相等檢測)中,然後儲存到呼叫上面distribute介面存入listener中。process run則呼叫所有的listener run和pop,讀取indexer中的值,處理儲存到tainmanager(podinformer.addeventhandler()中指定,初始化listener)中.。建立CacheMutationDetector物件用於判斷cache的obj和copy的obj(通過在新增obj時deepcopy函式copy的)是否一樣,他會一直到用deepEqual函式進行比較,不一樣要發報警資訊和pannic
indexer的處理:是一個threadSafeMap型別,items就是name:obj這樣儲存的,indexer存放的是key:func用於處理obj,存入indices中;indice會為每個func生成的值當做鍵,key當做值;舉例子,key是“namespace”,func是獲取pod的namespace的函式(pod可以在多個namespace下,所以返回值為多個),那麼indices就是indices["namespace"]={index},index = map[string]Index {"default":set("namespace"), "kube-system":set("namespace")}
indexer的作用是用來篩選,根據關心的關鍵字刪選item中的obj返回。比如:podinformer中的indexer中的item是全部的pod,我關心pod,而item中儲存的是全部的pod,我想要default namespace下的pod,則就可以通過ByIndex(“namespace”,“default”)函式返回相應的pod列表。
type threadSafeMap struct {
lock sync.RWMutex
items map[string]interface{}
// indexers maps a name to an IndexFunc
indexers Indexers
// indices maps a name to an Index
indices Indices
}
所以podinformer(各類informer)啟動了三個server,
一個controller,用於處理建立reflector,watch資源,向indexer中新增obj,併發送給各種listener,
一個是processor,用於各類listener監聽佇列,處理監聽到的obj,呼叫資源處理函式,如node controller的pod informer把pod新增到tainmanager. 此部分為我們建立自己controller要實現的
一個是cacheMutationDetector是用來是進行cache和copy的obj是否一樣,進行不停檢測,報警和pannic.
shareIndexInformer的run函式:
建立fifo用於儲存object,而其podeinformer 的indexer則就為fifo的knowedobject佇列。
啟動config,新建reflector,不停過一段時間就呼叫podInformer的listandwatch函式,將結果寫入fifo(threadSafeMap)佇列,存入item,key為func計算出來,value為obj,並把key寫入queue。啟動processLOOP函式不停地pop queue中的obj,呼叫HandleDeltas處理obj.
podInformer的indexer通過HandleDeltas函式進行新增,update,刪除等操作。
type threadSafeMap struct {
lock sync.RWMutex
items map[string]interface{}
// indexers maps a name to an IndexFunc
indexers Indexers
// indices maps a name to an Index
indices Indices
}
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, s.indexer)
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas,
}
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
s.stopCh = stopCh
s.cacheMutationDetector.Run(stopCh)
s.processor.run(stopCh)
s.controller.Run(stopCh)
}