Kubernetes client-go Informer 原始碼分析
概述ControllerController 的初始化Controller 的啟動processLoopHandleDeltas()SharedIndexInformersharedIndexerInformersharedProcessorprocessorListenersharedProcessor.addListener()sharedProcessor.distribute()sharedProcessor.run()sharedIndexInformer.Run()SharedInformerFactoryNewSharedInformerFactorysharedInformerFactory.Start()
概述
原始碼版本:kubernetes master 分支 commit-fe62fc(2021年10月14日)
Informer 這個詞的出鏡率很高,我們在很多文章裡都可以看到 Informer 的身影,但是我們在原始碼裡真的去找一個叫做 Informer 的物件,卻又發現找不到一個單純的 Informer,但是有很多結構體或者接口裡包含了 Informer 這個詞……
和 Reflector、Workqueue 等元件不同,Informer 相對來說更加模糊,讓人初讀原始碼時感覺迷惑。今天我們一起來揭開 Informer 等面紗,看下到底什麼是 Informer。
在《Kubernetes client-go 原始碼分析 - 開篇》
Controller
Informer 通過一個 controller 物件來定義,本身很簡單,長這樣:
- client-go/tools/cache/controller.go:89
1typecontrollerstruct{
2configConfig
3reflector*Reflector
4 reflectorMutexsync.RWMutex
5clockclock.Clock
6}
這裡有我們熟悉的 Reflector,可以猜到 Informer 啟動的時候會去執行 Reflector,從而通過 Reflector 實現 list-watch apiserver,更新“事件”到 DeltaFIFO 中用於進一步處理。Config 物件等會再看,我們繼續看下 controller 對應的介面:
- client-go/tools/cache/controller.go:98
1typeControllerinterface{
2Run(stopCh<-chanstruct{})
3HasSynced()bool
4LastSyncResourceVersion()string
5}
這裡的核心明顯是 Run(stopCh <-chan struct{})
方法,Run 負責兩件事情:
- 構造 Reflector 利用 ListerWatcher 的能力將物件事件更新到 DeltaFIFO;
- 從 DeltaFIFO 中 Pop 物件然後呼叫 ProcessFunc 來處理;
Controller 的初始化
Controller 的 New 方法很簡單:
- client-go/tools/cache/controller.go:116
1funcNew(c*Config)Controller{
2ctlr:=&controller{
3config:*c,
4clock:&clock.RealClock{},
5}
6returnctlr
7}
這裡沒有太多的邏輯,主要是傳遞了一個 Config 進來,可以猜到核心邏輯是 Config 從何而來以及後面如何使用。我們先向上跟一下 Config 從哪裡來,New() 的呼叫有幾個地方,我們不去看 newInformer()
分支的程式碼,因為實際開發中主要是使用 SharedIndexInformer,兩個入口初始化 Controller 的邏輯類似,我們直接跟更實用的一個分支,看 func (s *sharedIndexInformer) Run(stopCh <-chan struct{})
方法中如何呼叫的 New()
:
- client-go/tools/cache/shared_informer.go:368
1func(s*sharedIndexInformer)Run(stopCh<-chanstruct{}){
2//……
3fifo:=NewDeltaFIFOWithOptions(DeltaFIFOOptions{
4KnownObjects:s.indexer,
5EmitDeltaTypeReplaced:true,
6})
7
8cfg:=&Config{
9Queue:fifo,
10ListerWatcher:s.listerWatcher,
11ObjectType:s.objectType,
12FullResyncPeriod:s.resyncCheckPeriod,
13RetryOnError:false,
14ShouldResync:s.processor.shouldResync,
15
16Process:s.HandleDeltas,
17WatchErrorHandler:s.watchErrorHandler,
18}
19
20func(){
21s.startedLock.Lock()
22defers.startedLock.Unlock()
23
24s.controller=New(cfg)
25s.controller.(*controller).clock=s.clock
26s.started=true
27}()
28//……
29s.controller.Run(stopCh)
30}
上面只保留了主要程式碼,我們後面會分析 SharedIndexInformer,所以這裡先不糾結 SharedIndexInformer 的細節,我們從這裡可以看到 SharedIndexInformer 的 Run() 過程裡會構造一個 Config,然後建立 Controller,最後呼叫 Controller 的 Run() 方法。另外這裡也可以看到我們前面系列文章裡分析過的 DeltaFIFO、ListerWatcher 等,這裡還有一個比較重要的是 Process:s.HandleDeltas,
這一行,Process 屬性的型別是 ProcessFunc,這裡可以看到具體的 ProcessFunc 是 HandleDeltas 方法。
Controller 的啟動
上面提到 Controller 的初始化本身沒有太多的邏輯,主要是構造了一個 Config 物件傳遞進來,所以 Controller 啟動的時候肯定會有這個 Config 的使用邏輯,我們具體來看:
- client-go/tools/cache/controller.go:127
1func(c*controller)Run(stopCh<-chanstruct{}){
2deferutilruntime.HandleCrash()
3gofunc(){
4<-stopCh
5c.config.Queue.Close()
6}()
7//利用Config裡的配置構造Reflector
8r:=NewReflector(
9c.config.ListerWatcher,
10c.config.ObjectType,
11c.config.Queue,
12c.config.FullResyncPeriod,
13)
14r.ShouldResync=c.config.ShouldResync
15r.WatchListPageSize=c.config.WatchListPageSize
16r.clock=c.clock
17ifc.config.WatchErrorHandler!=nil{
18r.watchErrorHandler=c.config.WatchErrorHandler
19}
20
21c.reflectorMutex.Lock()
22c.reflector=r
23c.reflectorMutex.Unlock()
24
25varwgwait.Group
26//啟動Reflector
27wg.StartWithChannel(stopCh,r.Run)
28//執行Controller的processLoop
29wait.Until(c.processLoop,time.Second,stopCh)
30wg.Wait()
31}
這裡的邏輯很簡單,構造 Reflector 後執行起來,然後執行 c.processLoop
,所以很明顯,Controller 的業務邏輯肯定隱藏在 processLoop 方法裡,我們繼續來看。
processLoop
- client-go/tools/cache/controller.go:181
1func(c*controller)processLoop(){
2for{
3obj,err:=c.config.Queue.Pop(PopProcessFunc(c.config.Process))
4iferr!=nil{
5iferr==ErrFIFOClosed{
6return
7}
8ifc.config.RetryOnError{
9c.config.Queue.AddIfNotPresent(obj)
10}
11}
12}
13}
這裡的邏輯是從 DeltaFIFO 中 Pop 出一個物件丟給 PopProcessFunc 處理,如果失敗了就 re-enqueue 到 DeltaFIFO 中。我們前面提到過這裡的 PopProcessFunc 實現是 HandleDeltas()
方法,所以這裡的主要邏輯就轉到了 HandleDeltas()
是如何實現的了。
HandleDeltas()
這裡我們先回顧下 DeltaFIFO 的儲存結構,看下這個圖:
然後再看原始碼,這裡的邏輯主要是遍歷一個 Deltas 裡的所有 Delta,然後根據 Delta 的型別來決定如何操作 Indexer,也就是更新本地 cache,同時分發相應的通知。
- client-go/tools/cache/shared_informer.go:537
1func(s*sharedIndexInformer)HandleDeltas(objinterface{})error{
2s.blockDeltas.Lock()
3defers.blockDeltas.Unlock()
4//對於每個Deltas來說,裡面存了很多的Delta,也就是對應不同Type的多個Object,這裡的遍歷會從舊往新走
5for_,d:=rangeobj.(Deltas){
6switchd.Type{
7//除了Deleted外所有情況
8caseSync,Replaced,Added,Updated:
9//記錄變更,沒有太多實際作用
10s.cacheMutationDetector.AddObject(d.Object)
11//通過indexer從cache裡查詢當前Object,如果存在
12ifold,exists,err:=s.indexer.Get(d.Object);err==nil&&exists{
13//更新indexer裡的物件
14iferr:=s.indexer.Update(d.Object);err!=nil{
15returnerr
16}
17
18isSync:=false
19switch{
20cased.Type==Sync:
21isSync=true
22cased.Type==Replaced:
23ifaccessor,err:=meta.Accessor(d.Object);err==nil{
24ifoldAccessor,err:=meta.Accessor(old);err==nil{
25isSync=accessor.GetResourceVersion()==oldAccessor.GetResourceVersion()
26}
27}
28}
29//分發一個更新通知
30s.processor.distribute(updateNotification{oldObj:old,newObj:d.Object},isSync)
31//如果本地cache裡沒有這個Object,則新增
32}else{
33iferr:=s.indexer.Add(d.Object);err!=nil{
34returnerr
35}
36//分發一個新增通知
37s.processor.distribute(addNotification{newObj:d.Object},false)
38}
39//如果是刪除操作,則從indexer裡刪除這個Object,然後分發一個刪除通知
40caseDeleted:
41iferr:=s.indexer.Delete(d.Object);err!=nil{
42returnerr
43}
44s.processor.distribute(deleteNotification{oldObj:d.Object},false)
45}
46}
47returnnil
48}
這裡涉及到一個知識點:s.processor.distribute(addNotification{newObj: d.Object}, false)
中 processor 是什麼?如何分發通知的?誰來接收通知?
我們回到 ProcessFunc 的實現上,除了 sharedIndexInformer 的 HandleDeltas()
方法外,還有一個基礎版本:
- client-go/tools/cache/controller.go:446
1Process:func(objinterface{})error{
2for_,d:=rangeobj.(Deltas){
3obj:=d.Object
4iftransformer!=nil{
5varerrerror
6obj,err=transformer(obj)
7iferr!=nil{
8returnerr
9}
10}
11
12switchd.Type{
13caseSync,Replaced,Added,Updated:
14ifold,exists,err:=clientState.Get(obj);err==nil&&exists{
15iferr:=clientState.Update(obj);err!=nil{
16returnerr
17}
18h.OnUpdate(old,obj)
19}else{
20iferr:=clientState.Add(obj);err!=nil{
21returnerr
22}
23h.OnAdd(obj)
24}
25caseDeleted:
26iferr:=clientState.Delete(obj);err!=nil{
27returnerr
28}
29h.OnDelete(obj)
30}
31}
32returnnil
33},
這裡可以看到邏輯簡單很多,除了更新 cache 外,呼叫了 h.OnAdd(obj)/h.OnUpdate(old, obj)/h.OnDelete(obj)
等方法,這裡的 h 是 ResourceEventHandler 型別的,也就是 Process 過程直接呼叫了 ResourceEventHandler 的相應方法,這樣就已經邏輯閉環了,ResourceEventHandler 的這幾個方法裡做一些簡單的過濾後,會將這些物件的 key 丟到 workqueue,進而就觸發了自定義調諧函式的執行。
換言之,sharedIndexInformer 中實現的 ProcessFunc 是一個進階版本,不滿足於簡單呼叫 ResourceEventHandler 對應方法來完成 Process 邏輯,所以到這裡基礎的 Informer 邏輯已經閉環了,我們後面繼續來看 sharedIndexInformer 中又對 Informer 做了哪些“增強”
SharedIndexInformer
我們在 Operator 開發中,如果不使用 controller-runtime 庫,也就是不通過 Kubebuilder 等工具來生成腳手架時,經常會用到 SharedInformerFactory,比如典型的 sample-controller 中的 main() 函式:
- sample-controller/main.go:40
1funcmain(){
2klog.InitFlags(nil)
3flag.Parse()
4
5stopCh:=signals.SetupSignalHandler()
6
7cfg,err:=clientcmd.BuildConfigFromFlags(masterURL,kubeconfig)
8iferr!=nil{
9klog.Fatalf("Errorbuildingkubeconfig:%s",err.Error())
10}
11
12kubeClient,err:=kubernetes.NewForConfig(cfg)
13iferr!=nil{
14klog.Fatalf("Errorbuildingkubernetesclientset:%s",err.Error())
15}
16
17exampleClient,err:=clientset.NewForConfig(cfg)
18iferr!=nil{
19klog.Fatalf("Errorbuildingexampleclientset:%s",err.Error())
20}
21
22kubeInformerFactory:=kubeinformers.NewSharedInformerFactory(kubeClient,time.Second*30)
23exampleInformerFactory:=informers.NewSharedInformerFactory(exampleClient,time.Second*30)
24
25controller:=NewController(kubeClient,exampleClient,
26kubeInformerFactory.Apps().V1().Deployments(),
27exampleInformerFactory.Samplecontroller().V1alpha1().Foos())
28
29kubeInformerFactory.Start(stopCh)
30exampleInformerFactory.Start(stopCh)
31
32iferr=controller.Run(2,stopCh);err!=nil{
33klog.Fatalf("Errorrunningcontroller:%s",err.Error())
34}
35}
這裡可以看到我們依賴於 kubeInformerFactory.Apps().V1().Deployments()
提供一個 Informer,這裡的 Deployments()
方法返回的是一個 DeploymentInformer 型別,DeploymentInformer 是什麼呢?如下
- client-go/informers/apps/v1/deployment.go:37
1typeDeploymentInformerinterface{
2Informer()cache.SharedIndexInformer
3Lister()v1.DeploymentLister
4}
可以看到所謂的 DeploymentInformer 由 “Informer” 和 “Lister” 組成,也就是說我們編碼時用到的 Informer 本質就是一個 SharedIndexInformer
- client-go/tools/cache/shared_informer.go:186
1typeSharedIndexInformerinterface{
2SharedInformer
3AddIndexers(indexersIndexers)error
4GetIndexer()Indexer
5}
這裡的 Indexer 就很熟悉了,SharedInformer 又是啥呢?
- client-go/tools/cache/shared_informer.go:133
1typeSharedInformerinterface{
2//可以新增自定義的ResourceEventHandler
3AddEventHandler(handlerResourceEventHandler)
4//附帶resync間隔配置,設定為0表示不關心resync
5AddEventHandlerWithResyncPeriod(handlerResourceEventHandler,resyncPeriodtime.Duration)
6//這裡的Store指的是Indexer
7GetStore()Store
8//過時了,沒有用
9GetController()Controller
10//通過Run來啟動
11Run(stopCh<-chanstruct{})
12//這裡和resync邏輯沒有關係,表示Indexer至少更新過一次全量的物件
13HasSynced()bool
14//最後一次拿到的RV
15LastSyncResourceVersion()string
16//用於每次ListAndWatch連線斷開時回撥,主要就是日誌記錄的作用
17SetWatchErrorHandler(handlerWatchErrorHandler)error
18}
sharedIndexerInformer
接下來就該看下 SharedIndexInformer 介面的實現了,sharedIndexerInformer 定義如下:
- client-go/tools/cache/shared_informer.go:287
1typesharedIndexInformerstruct{
2indexerIndexer
3controllerController
4processor*sharedProcessor
5cacheMutationDetectorMutationDetector
6listerWatcherListerWatcher
7//表示當前Informer期望關注的型別,主要是GVK資訊
8objectTyperuntime.Object
9//reflector的resync計時器計時間隔,通知所有的listener執行resync
10resyncCheckPeriodtime.Duration
11defaultEventHandlerResyncPeriodtime.Duration
12clockclock.Clock
13started,stoppedbool
14startedLocksync.Mutex
15blockDeltassync.Mutex
16watchErrorHandlerWatchErrorHandler
17}
這裡的 Indexer、Controller、ListerWatcher 等都是我們熟悉的元件,sharedProcessor 我們在前面遇到了,需要重點關注一下。
sharedProcessor
sharedProcessor 中維護了 processorListener 集合,然後分發通知物件到這些 listeners,先看下結構定義:
- client-go/tools/cache/shared_informer.go:588
1typesharedProcessorstruct{
2listenersStartedbool
3listenersLocksync.RWMutex
4listeners[]*processorListener
5syncingListeners[]*processorListener
6clockclock.Clock
7wgwait.Group
8}
馬上就會有一個疑問了,processorListener 是什麼?
processorListener
- client-go/tools/cache/shared_informer.go:690
1typeprocessorListenerstruct{
2nextChchaninterface{}
3addChchaninterface{}
4//核心屬性
5handlerResourceEventHandler
6pendingNotificationsbuffer.RingGrowing
7requestedResyncPeriodtime.Duration
8resyncPeriodtime.Duration
9nextResynctime.Time
10resyncLocksync.Mutex
11}
可以看到 processorListener 裡有一個 ResourceEventHandler,這是我們認識的元件。processorListener 有三個主要方法:
add(notification interface{})
pop()
run()
一個個來看吧。
run()
- client-go/tools/cache/shared_informer.go:775
1func(p*processorListener)run(){
2stopCh:=make(chanstruct{})
3wait.Until(func(){
4fornext:=rangep.nextCh{
5switchnotification:=next.(type){
6caseupdateNotification:
7p.handler.OnUpdate(notification.oldObj,notification.newObj)
8caseaddNotification:
9p.handler.OnAdd(notification.newObj)
10casedeleteNotification:
11p.handler.OnDelete(notification.oldObj)
12default:
13utilruntime.HandleError(fmt.Errorf("unrecognizednotification:%T",next))
14}
15}
16close(stopCh)
17},1*time.Second,stopCh)
18}
這裡的邏輯很清晰,從 nextCh 裡拿通知,然後根據其型別去呼叫 ResourceEventHandler 相應的 OnAdd/OnUpdate/OnDelete
方法。
add() 和 pop()
- client-go/tools/cache/shared_informer.go:741
1func(p*processorListener)add(notificationinterface{}){
2//將通知放到addCh中,所以下面pop()方法裡先執行到的case是第二個
3p.addCh<-notification
4}
5
6func(p*processorListener)pop(){
7deferutilruntime.HandleCrash()
8deferclose(p.nextCh)//Tell.run()tostop
9
10varnextChchan<-interface{}
11varnotificationinterface{}
12for{
13select{
14//下面獲取到的通知,新增到nextCh裡,供run()方法中消費
15casenextCh<-notification:
16varokbool
17//從pendingNotifications裡消費通知,生產者在下面case裡
18notification,ok=p.pendingNotifications.ReadOne()
19if!ok{
20nextCh=nil
21}
22//邏輯從這裡開始,從addCh裡提取通知
23casenotificationToAdd,ok:=<-p.addCh:
24if!ok{
25return
26}
27ifnotification==nil{
28notification=notificationToAdd
29nextCh=p.nextCh
30}else{
31//新新增的通知丟到pendingNotifications
32p.pendingNotifications.WriteOne(notificationToAdd)
33}
34}
35}
36}
也就是說 processorListener 提供了一定的緩衝機制來接收 notification,然後去消費這些 notification 呼叫 ResourceEventHandler 相關方法。
然後接著繼續看 sharedProcessor 的幾個主要方法。
sharedProcessor.addListener()
addListener 會直接呼叫 listener 的 run()
和 pop()
方法,這兩個方法的邏輯我們上面已經分析過
- client-go/tools/cache/shared_informer.go:597
1func(p*sharedProcessor)addListener(listener*processorListener){
2p.listenersLock.Lock()
3deferp.listenersLock.Unlock()
4
5p.addListenerLocked(listener)
6ifp.listenersStarted{
7p.wg.Start(listener.run)
8p.wg.Start(listener.pop)
9}
10}
sharedProcessor.distribute()
distribute 的邏輯就是呼叫 sharedProcessor 內部維護的所有 listner 的 add()
方法
- client-go/tools/cache/shared_informer.go:613
1func(p*sharedProcessor)distribute(objinterface{},syncbool){
2p.listenersLock.RLock()
3deferp.listenersLock.RUnlock()
4
5ifsync{
6for_,listener:=rangep.syncingListeners{
7listener.add(obj)
8}
9}else{
10for_,listener:=rangep.listeners{
11listener.add(obj)
12}
13}
14}
sharedProcessor.run()
run()
的邏輯和前面的 addListener() 類似,也就是呼叫 listener 的 run()
和 pop()
方法
- client-go/tools/cache/shared_informer.go:628
1func(p*sharedProcessor)run(stopCh<-chanstruct{}){
2func(){
3p.listenersLock.RLock()
4deferp.listenersLock.RUnlock()
5for_,listener:=rangep.listeners{
6p.wg.Start(listener.run)
7p.wg.Start(listener.pop)
8}
9p.listenersStarted=true
10}()
11<-stopCh
12p.listenersLock.RLock()
13deferp.listenersLock.RUnlock()
14for_,listener:=rangep.listeners{
15close(listener.addCh)
16}
17p.wg.Wait()
18}
到這裡基本就知道 sharedProcessor 的能力了,繼續往下看。
sharedIndexInformer.Run()
繼續來看 sharedIndexInformer 的 Run()
方法,這裡面已經幾乎沒有陌生的內容了。
- client-go/tools/cache/shared_informer.go:368
1func(s*sharedIndexInformer)Run(stopCh<-chanstruct{}){
2deferutilruntime.HandleCrash()
3
4ifs.HasStarted(){
5klog.Warningf("ThesharedIndexInformerhasstarted,runmorethanonceisnotallowed")
6return
7}
8//DeltaFIFO就很熟悉了
9fifo:=NewDeltaFIFOWithOptions(DeltaFIFOOptions{
10KnownObjects:s.indexer,
11EmitDeltaTypeReplaced:true,
12})
13//Config的邏輯也在上面遇到過了
14cfg:=&Config{
15Queue:fifo,
16ListerWatcher:s.listerWatcher,
17ObjectType:s.objectType,
18FullResyncPeriod:s.resyncCheckPeriod,
19RetryOnError:false,
20ShouldResync:s.processor.shouldResync,
21
22Process:s.HandleDeltas,
23WatchErrorHandler:s.watchErrorHandler,
24}
25
26func(){
27s.startedLock.Lock()
28defers.startedLock.Unlock()
29//前文分析過這裡的New()函式邏輯了
30s.controller=New(cfg)
31s.controller.(*controller).clock=s.clock
32s.started=true
33}()
34
35processorStopCh:=make(chanstruct{})
36varwgwait.Group
37deferwg.Wait()
38deferclose(processorStopCh)
39wg.StartWithChannel(processorStopCh,s.cacheMutationDetector.Run)
40//processor的run方法
41wg.StartWithChannel(processorStopCh,s.processor.run)
42
43deferfunc(){
44s.startedLock.Lock()
45defers.startedLock.Unlock()
46s.stopped=true//Don'twantanynewlisteners
47}()
48//controller的Run()
49s.controller.Run(stopCh)
50}
到這裡也就基本知道了 sharedIndexInformer 的邏輯了,再往上層走就剩下一個 SharedInformerFactory 了,繼續看吧~
SharedInformerFactory
我們前面提到過 SharedInformerFactory,現在具體來看一下 SharedInformerFactory 是怎麼實現的。先看介面定義:
- client-go/informers/factory.go:187
1typeSharedInformerFactoryinterface{
2internalinterfaces.SharedInformerFactory
3ForResource(resourceschema.GroupVersionResource)(GenericInformer,error)
4WaitForCacheSync(stopCh<-chanstruct{})map[reflect.Type]bool
5
6Admissionregistration()admissionregistration.Interface
7Internal()apiserverinternal.Interface
8Apps()apps.Interface
9Autoscaling()autoscaling.Interface
10Batch()batch.Interface
11Certificates()certificates.Interface
12Coordination()coordination.Interface
13Core()core.Interface
14Discovery()discovery.Interface
15Events()events.Interface
16Extensions()extensions.Interface
17Flowcontrol()flowcontrol.Interface
18Networking()networking.Interface
19Node()node.Interface
20Policy()policy.Interface
21Rbac()rbac.Interface
22Scheduling()scheduling.Interface
23Storage()storage.Interface
24}
這裡涉及到幾個點:
- internalinterfaces.SharedInformerFactory
這也是一個介面,比較簡短:
1typeSharedInformerFactoryinterface{
2Start(stopCh<-chanstruct{})
3InformerFor(objruntime.Object,newFuncNewInformerFunc)cache.SharedIndexInformer
4}
可以看到熟悉的 SharedIndexInformer
- ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
這裡接收一個 GVR,返回了一個 GenericInformer,看下什麼是 GenericInformer:
1typeGenericInformerinterface{
2Informer()cache.SharedIndexInformer
3Lister()cache.GenericLister
4}
也很簡短。
- Apps() apps.Interface 等
後面一堆方法是類似的,我們以 Apps() 為例來看下怎麼回事。這裡的 Interface 定義如下:
- client-go/informers/apps/interface.go:29
1typeInterfaceinterface{
2//V1providesaccesstosharedinformersforresourcesinV1.
3V1()v1.Interface
4//V1beta1providesaccesstosharedinformersforresourcesinV1beta1.
5V1beta1()v1beta1.Interface
6//V1beta2providesaccesstosharedinformersforresourcesinV1beta2.
7V1beta2()v1beta2.Interface
8}
顯然應該繼續看下 v1.Interface 是個啥。
- client-go/informers/apps/v1/interface.go:26
1typeInterfaceinterface{
2//ControllerRevisionsreturnsaControllerRevisionInformer.
3ControllerRevisions()ControllerRevisionInformer
4//DaemonSetsreturnsaDaemonSetInformer.
5DaemonSets()DaemonSetInformer
6//DeploymentsreturnsaDeploymentInformer.
7Deployments()DeploymentInformer
8//ReplicaSetsreturnsaReplicaSetInformer.
9ReplicaSets()ReplicaSetInformer
10//StatefulSetsreturnsaStatefulSetInformer.
11StatefulSets()StatefulSetInformer
12}
到這裡已經有看著很眼熟的 Deployments() DeploymentInformer
之類的程式碼了,DeploymentInformer 我們剛才看過內部結構,長這樣:
1typeDeploymentInformerinterface{
2Informer()cache.SharedIndexInformer
3Lister()v1.DeploymentLister
4}
到這裡也就不難理解 SharedInformerFactory 的作用了,它提供了所有 API group-version 的資源對應的 SharedIndexInformer,也就不難理解開頭我們引用的 sample-controller 中的這行程式碼:
1kubeInformerFactory.Apps().V1().Deployments()
通過其可以拿到一個 Deployment 資源對應的 SharedIndexInformer。
NewSharedInformerFactory
繼續看下 SharedInformerFactory 是如何建立的
- client-go/informers/factory.go:96
1funcNewSharedInformerFactory(clientkubernetes.Interface,defaultResynctime.Duration)SharedInformerFactory{
2returnNewSharedInformerFactoryWithOptions(client,defaultResync)
3}
可以看到引數非常簡單,主要是需要一個 Clientset,畢竟 ListerWatcher 的能力本質還是 client 提供的。
- client-go/informers/factory.go:109
1funcNewSharedInformerFactoryWithOptions(clientkubernetes.Interface,defaultResynctime.Duration,options...SharedInformerOption)SharedInformerFactory{
2factory:=&sharedInformerFactory{
3client:client,
4namespace:v1.NamespaceAll,//空字串""
5defaultResync:defaultResync,
6informers:make(map[reflect.Type]cache.SharedIndexInformer),//可以存放不同型別的SharedIndexInformer
7startedInformers:make(map[reflect.Type]bool),
8customResync:make(map[reflect.Type]time.Duration),
9}
10
11for_,opt:=rangeoptions{
12factory=opt(factory)
13}
14
15returnfactory
16}
接著是如何啟動
sharedInformerFactory.Start()
- client-go/informers/factory.go:128
1func(f*sharedInformerFactory)Start(stopCh<-chanstruct{}){
2f.lock.Lock()
3deferf.lock.Unlock()
4
5forinformerType,informer:=rangef.informers{
6//同類型只會呼叫一次,Run()的邏輯我們前面介紹過了
7if!f.startedInformers[informerType]{
8goinformer.Run(stopCh)
9f.startedInformers[informerType]=true
10}
11}
12}
小結
今天我們一個基礎 Informer - Controller 開始介紹,先分析了 Controller 的能力,也就是其通過構造 Reflector 並啟動從而能夠獲取指定型別資源的“更新”事件,然後通過事件構造 Delta 放到 DeltaFIFO 中,進而在 processLoop 中從 DeltaFIFO 裡 pop Deltas 來處理,一方面將物件通過 Indexer 同步到本地 cache,也就是一個 ThreadSafeStore,一方面呼叫 ProcessFunc 來處理這些 Delta。
然後 SharedIndexInformer 提供了構造 Controller 的能力,通過 HandleDeltas() 方法實現上面提到的 ProcessFunc,同時還引入了 sharedProcessor 在 HandleDeltas() 中用於事件通知的處理。sharedProcessor 分發事件通知的時候,接收方是內部繼續抽象出來的 processorListener,在 processorListener 中完成了 ResourceEventHandler 具體回撥函式的呼叫。
最後 SharedInformerFactory 又進一步封裝了提供所有 api 資源對應的 SharedIndexInformer 的能力。也就是說一個 SharedIndexInformer 可以處理一種型別的資源,比如 Deployment 或者 Pod 等,而通過 SharedInformerFactory 可以輕鬆構造任意已知型別的 SharedIndexInformer。另外這裡用到了 Clientset 提供的訪問所有 api 資源的能力,通過其也就能夠完整實現整套 Informer 邏輯了。
此前我們已經陸續分析了:
- Kubernetes client-go 原始碼分析 - 開篇
- Kubernetes client-go 原始碼分析 - workqueue
- Kubernetes client-go 原始碼分析 - DeltaFIFO
- Kubernetes client-go 原始碼分析 - Indexer & ThreadSafeStore
- Kubernetes client-go 原始碼分析 - ListWatcher
- Kubernetes client-go 原始碼分析 - Reflector
各種“元件”分工明確,最終匯聚在 “Informer” 裡,實現了一套複雜而優雅的資源處理能力,至此自定義控制器涉及邏輯中 client-go 部分就基本分析完了!
【完整目錄參見>>> 《深入理解 K8S 原理與實現》系列目錄 <<<】
(轉載請保留本文原始連結 https://www.danielhu.cn)