16.深入k8s:Informer使用及其原始碼分析
阿新 • • 發佈:2020-10-18
![63831060_p0_master1200](https://img.luozhiyun.com/20201018000039.jpg)
> 轉載請宣告出處哦~,本篇文章釋出於luozhiyun的部落格:https://www.luozhiyun.com
>
> 由於這部分的程式碼是在client-go 中,所以使用的原始碼版本是client-go 1.19
這次講解我用了很一些圖,儘可能的把這個模組給描述清楚,如果感覺對你有所幫助不妨發一封郵件激勵一下我~
## Informer機制
### 機制設計
Informer主要有兩個作用:
1. 通過一種叫作 ListAndWatch 的方法,把 APIServer 中的 API 物件快取在了本地,並負責更新和維護這個快取。ListAndWatch通過 APIServer 的 LIST API“獲取”所有最新版本的 API 物件;然後,再通過 WATCH API 來“監聽”所有這些 API 物件的變化;
2. 註冊相應的事件,之後如果監聽到的事件變化就會呼叫事件對應的EventHandler,實現回撥。
Informer執行原理如下:
![image-20201017000845410](https://img.luozhiyun.com/20201018000043.png)
根據流程圖來解釋一下Informer中幾個元件的作用:
* Reflector:用於監控指定的k8s資源,當資源發生變化時,觸發相應的變更事件,如Added事件、Updated事件、Deleted事件,並將器資源物件放到本地DeltaFIFO Queue中;
* DeltaFIFO:DeltaFIFO是一個先進先出的佇列,可以儲存資源物件的操作型別;
* Indexer:用來儲存資源物件並自帶索引功能的本地儲存,Reflector從DeltaFIFO中將消費出來的資源物件儲存至Indexer;
Reflector 包會和 apiServer 建立長連線,並使用 ListAndWatch 方法獲取並監聽某一個資源的變化。List 方法將會獲取某個資源的所有例項,Watch 方法則監聽資源物件的建立、更新以及刪除事件,然後將事件放入到DeltaFIFO Queue中;
然後Informer會不斷的從 Delta FIFO Queue 中 pop 增量事件,並根據事件的型別來決定新增、更新或者是刪除本地快取;接著Informer 根據事件型別來觸發事先註冊好的 Event Handler觸發回撥函式,然後然後將該事件丟到 Work Queue 這個工作佇列中。
### 例項
將到了go-client部分的程式碼,我們可以直接通過例項來進行上手跑動,Informers Example程式碼示例如下:
```go
package main
import (
"flag"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
"log"
"path/filepath"
"time"
)
func main() {
var kubeconfig *string
//如果是windows,那麼會讀取C:\Users\xxx\.kube\config 下面的配置檔案
//如果是linux,那麼會讀取~/.kube/config下面的配置檔案
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
flag.Parse()
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
panic(err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
stopCh := make(chan struct{})
defer close(stopCh)
//表示每分鐘進行一次resync,resync會週期性地執行List操作
sharedInformers := informers.NewSharedInformerFactory(clientset, time.Minute)
informer := sharedInformers.Core().V1().Pods().Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
mObj := obj.(v1.Object)
log.Printf("New Pod Added to Store: %s", mObj.GetName())
},
UpdateFunc: func(oldObj, newObj interface{}) {
oObj := oldObj.(v1.Object)
nObj := newObj.(v1.Object)
log.Printf("%s Pod Updated to %s", oObj.GetName(),nObj.GetName())
},
DeleteFunc: func(obj interface{}) {
mObj := obj.(v1.Object)
log.Printf("Pod Deleted from Store: %s", mObj.GetName())
},
})
informer.Run(stopCh)
}
```
要執行這段程式碼,需要我們將k8s伺服器上的~/.kube程式碼拷貝到本地,我是win10的機器所以拷貝到`C:\Users\xxx\.kube`中。
informers.NewSharedInformerFactory會傳入兩個引數,第1個引數clientset是用於與k8s apiserver互動的客戶端,第2個引數是代表每分鐘會執行一次resync,resync會週期性執行List將所有資源存放再Informer Store中,如果該引數是0,則禁用resync功能。
通過informer.AddEventHandler函式可以為pod資源新增資源事件回撥方法,支援3種資源事件回撥方法:
* AddFunc
* UpdateFunc
* DeleteFunc
通過名稱我們就可以知道是新增、更新、刪除時會回撥這些方法。
在我們初次執行run方法的時候,可以會將監控的k8s上pod存放到本地,並回調AddFunc方法,如下日誌:
```
2020/10/17 15:13:10 New Pod Added to Store: dns-test
2020/10/17 15:13:10 New Pod Added to Store: web-1
2020/10/17 15:13:10 New Pod Added to Store: fluentd-elasticsearch-nwqph
2020/10/17 15:13:10 New Pod Added to Store: kube-flannel-ds-amd64-bjmt2
2020/10/17 15:13:10 New Pod Added to Store: kubernetes-dashboard-65665f84db-jrw6k
2020/10/17 15:13:10 New Pod Added to Store: mongodb
2020/10/17 15:13:10 New Pod Added to Store: web-0
....
```
## 原始碼解析
### 初始化
#### shared Informer初始化
shared Informer初始化的時候會呼叫到informers.NewSharedInformerFactory進行初始化。
檔案位置:informers/factory.go
```go
func NewSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory {
return NewSharedInformerFactoryWithOptions(client, defaultResync)
}
func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
factory := &sharedInformerFactory{
client: client,
namespace: v1.NamespaceAll,
defaultResync: defaultResync,
informers: make(map[reflect.Type]cache.SharedIndexInformer),
startedInformers: make(map[reflect.Type]bool),
customResync: make(map[reflect.Type]time.Duration),
}
// Apply all options
for _, opt := range options {
factory = opt(factory)
}
return factory
}
```
NewSharedInformerFactory方法最終會呼叫到NewSharedInformerFactoryWithOptions初始化一個sharedInformerFactory,在初始化的時候會初始化一個informers,用來快取不同型別的informer。
#### informer 初始化
informer初始化會呼叫sharedInformerFactory的方法進行初始化,並且可以呼叫不同資源的Informer。
```go
podInformer := sharedInformers.Core().V1().Pods().Informer()
nodeInformer := sharedInformers.Node().V1beta1().RuntimeClasses().Informer()
```
定義不同資源的Informer可以用來監控node或pod。
通過呼叫Informer方法會根據型別來建立Informer,同一類資源會共享同一個informer。
檔案路徑:informers/factory.go
```go
func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
//建立informer
return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}
func (f *podInformer) Informer() cache.SharedIndexInformer {
//傳入上面定義的defaultInformer方法,用於建立informer
return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
//獲取informer型別
informerType := reflect.TypeOf(obj)
//查詢map快取,如果存在,那麼直接返回
informer, exists := f.informers[informerType]
if exists {
return informer
}
//根據型別查詢resync的週期
resyncPeriod, exists := f.customResync[informerType]
if !exists {
resyncPeriod = f.defaultResync
}
//呼叫defaultInformer方法建立informer
informer = newFunc(f.client, resyncPeriod)
f.informers[informerType] = informer
return informer
}
```
呼叫InformerFor方法的時候會傳入defaultInformer方法用於建立informer。
InformerFor方法裡面首先會去sharedInformerFactory的map快取中根據型別查詢對應的informer,如果存在那麼直接返回,如果不存在,那麼則會呼叫newFunc方法建立informer,然後設定到informers快取中。
下面我們看一下NewFilteredPodInformer是如何建立Informer的:
檔案位置:informers/core/v1/pod.go
```go
func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
//呼叫apiserver獲取pod列表
return client.CoreV1().Pods(namespace).List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
//呼叫apiserver監控pod列表
return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
},
},
&corev1.Pod{},
resyncPeriod,
indexers,
)
}
```
這裡是真正的建立一個informer,並註冊了List&Watch的回撥函式,list回撥函式的api類似下面這樣:
```go
result = &v1.PodList{}
err = c.client.Get().
Namespace(c.ns).
Resource("pods").
VersionedParams(&opts, scheme.ParameterCodec).
Timeout(timeout).
Do(ctx).
Into(result)
```
構造Informer通過NewSharedIndexInformer完成:
```go
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
realClock := &clock.RealClock{}
sharedIndexInformer := &sharedIndexInformer{
processor: &sharedProcessor{clock: realClock},
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher: lw,
objectType: exampleObject,
resyncCheckPeriod: defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
clock: realClock,
}
return sharedIndexInformer
}
```
sharedIndexInformer裡面會建立sharedProcessor,設定List&Watch的回撥函式,建立了一個indexer,我們這裡看一下NewIndexer是怎麼建立indexer的:
```go
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
return &cache{
cacheStorage: NewThreadSafeStore(indexers, Indices{}),
keyFunc: keyFunc,
}
}
```
NewIndexer方法建立了一個cache,它的keyFunc是DeletionHandlingMetaNamespaceKeyFunc,即接受一個object,生成它的**namepace/name**的字串。cache裡面的資料會存放到cacheStorage中,它是一個threadSafeMap用來儲存資源物件並自帶索引功能的本地儲存。
### 註冊EventHandler事件
EventHandler事件的註冊是通過informer的AddEventHandler方法進行的。在呼叫AddEventHandler方法的時候,傳入一個cache.ResourceEventHandlerFuncs結構體:
檔案位置:tools/cache/shared_informer.go
```go
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
}
func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
s.startedLock.Lock()
defer s.startedLock.Unlock()
...
//初始化監聽器
listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
//如果informer還沒啟動,那麼直接將監聽器加入到processor監聽器列表中
if !s.started {
s.processor.addListener(listener)
return
}
//如果informer已經啟動,那麼需要加鎖
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
s.processor.addListener(listener)
//然後將indexer中快取的資料寫入到listener中
for _, item := range s.indexer.List() {
listener.add(addNotification{newObj: item})
}
}
```
AddEventHandler方法會呼叫到AddEventHandlerWithResyncPeriod方法中,然後呼叫newProcessListener初始化listener。
接著會校驗informer是否已經啟動,如果沒有啟動,那麼直接將監聽器加入到processor監聽器列表中並返回;如果informer已經啟動,那麼需要加鎖將監聽器加入到processor監聽器列表中,然後將indexer中快取的資料寫入到listener中。
需要注意的是listener.add方法會呼叫processorListener的add方法,這個方法會將資料寫入到addCh管道中:
```go
func (p *processorListener) add(notification interface{}) {
p.addCh <- notification
}
```
addCh管道里面數據是用來處理事件回撥的,後面我會說到。
大致的流程如下:
![image-20201017213620364](https://img.luozhiyun.com/20201018000210.png)
### 啟動Informer模組
最後我們在上面的demo中會使用sharedIndexInformer的Run方法來啟動Informer模組。
檔案位置:tools/cache/shared_informer.go
```go
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
//初始化DeltaFIFO佇列
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
})
cfg := &Config{
//設定Queue為DeltaFIFO佇列
Queue: fifo,
//設定List&Watch的回撥函式
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
//設定Resync週期
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
//判斷有哪些監聽器到期需要被Resync
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas,
WatchErrorHandler: s.watchErrorHandler,
}
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
//非同步建立controller
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
//呼叫run方法啟動processor
wg.StartWithChannel(processorStopCh, s.processor.run)
defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true
}()
//啟動controller
s.controller.Run(stopCh)
}
```
這段程式碼主要做了以下幾件事:
1. 呼叫NewDeltaFIFOWithOptions方法初始化DeltaFIFO佇列;
2. 初始化Config結果體,作為建立controller的引數;
3. 非同步建立controller;
4. 呼叫run方法啟動processor;
5. 呼叫run方法啟動controller;
下面我們看看sharedProcessor的run方法做了什麼:
```go
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
func() {
...
//遍歷監聽器
for _, listener := range p.listeners {
//下面兩個方法是核心的事件call back的方法
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
p.listenersStarted = true
}()
...
}
```
run方法會呼叫processorListener的run方法和pop方法,這兩個方法合在一起完成了事件回撥。
```go
func (p *processorListener) add(notification interface{}) {
p.addCh <- notification
}
func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
defer close(p.nextCh) // Tell .run() to stop
var nextCh chan<- interface{}
var notification interface{}
for {
select {
case nextCh <- notification:
// Notification dispatched
var ok bool
notification, ok = p.pendingNotifications.ReadOne()
if !ok { // Nothing to pop
nextCh = nil // Disable this select case
}
case notificationToAdd, ok := <-p.addCh:
if !ok {
return
}
if notification == nil {
notification = notificationToAdd
nextCh = p.nextCh
} else {
p.pendingNotifications.WriteOne(notificationToAdd)
}
}
}
}
```
這段程式碼,我把add方法也貼到這裡了,是因為監聽的事件都是從這個方法傳入的,然後寫入到addCh管道中。
pop方法在select程式碼塊中會獲取addCh管道中的資料,第一個迴圈的時候notification是nil,所以會將nextCh設定為p.nextCh;第二個迴圈的時候會將資料寫入到nextCh中。
當notification不為空的時候是直接將資料存入pendingNotifications快取中的,取也是從pendingNotifications中讀取。
下面我們看看run方法:
```go
func (p *processorListener) run() {
stopCh := make(chan struct{})
wait.Until(func() {
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
}
}
// the only way to get here is if the p.nextCh is empty and closed
close(stopCh)
}, 1*time.Second, stopCh)
}
```
run每秒遍歷一次nextCh中的資料,然後根據不同的notification型別執行不同的回撥方法,這裡會回撥到我們在main方法中註冊的eventHandler。
下面我們再回到sharedIndexInformer的Run方法中往下走,會執行controller的Run方法。
檔案位置:tools/cache/controller.go
```go
func (c *controller) Run(stopCh <-chan struct{}) {
...
//建立Reflector
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
...
//啟動Reflector
wg.StartWithChannel(stopCh, r.Run)
//每秒中迴圈呼叫DeltaFIFO佇列的pop方法,
wait.Until(c.processLoop, time.Second, stopCh)
wg.Wait()
}
```
這裡對應Informer執行原理裡面Informer上部分建立Reflector並進行監聽,和下部分迴圈呼叫DeltaFIFO佇列的pop方法進行分發。
### 啟動Reflector進行監聽
Reflector的Run方法最後會呼叫到Reflector的ListAndWatch方法進行監聽獲取資源。ListAndWatch程式碼會分為兩部分,一部分是List,一部分是Watch。
我們先看List部分程式碼:
程式碼位置:tools/cache/reflector.go
```go
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
...
if err := func() error {
...
go func() {
defer func() {
if r := recover(); r != nil {
panicCh <- r
}
}()
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
//根據引數獲取pod 列表
return r.listerWatcher.List(opts)
}))
...
list, paginatedResult, err = pager.List(context.Background(), options)
...
close(listCh)
}()
...
//獲取資源版本號
resourceVersion = listMetaInterface.GetResourceVersion()
initTrace.Step("Resource version extracted")
//將資源資料轉換成資源物件列表
items, err := meta.ExtractList(list)
...
//將資源物件列表中的資源物件和資源版本號儲存至DeltaFIFO佇列中
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("unable to sync list result: %v", err)
}
...
r.setLastSyncResourceVersion(resourceVersion)
return nil
}(); err != nil {
return err
}
...
}
```
這部分的程式碼會分為如下幾個部分:
1. 呼叫listerWatcher.List方法,獲取資源下的所有物件的資料,這個方法會通過api呼叫到apiServer獲取資源列表,程式碼我在上面已經貼出來了;
2. 呼叫listMetaInterface.GetResourceVersion獲取資源版本號;
3. 呼叫meta.ExtractList方法將資源資料轉換成資源物件列表;
4. 將資源物件列表中的資源物件和資源版本號儲存至DeltaFIFO佇列中;
5. 最後呼叫setLastSyncResourceVersion方法更新資源版本號;
下面看看Watch部分的程式碼:
```go
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
...
for {
...
//呼叫clientset客戶端api與apiServer建立長連線,監控指定資源的變更
w, err := r.listerWatcher.Watch(options)
...
//處理資源的變更事件
if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
...
return nil
}
}
}
```
這裡會迴圈呼叫clientset客戶端api與apiServer建立長連線,監控指定資源的變更,如果監控到有資源變更,那麼會呼叫watchHandler處理資源的變更事件。
```go
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
...
loop:
for {
select {
case <-stopCh:
return errorStopRequested
case err := <-errc:
return err
case event, ok := <-w.ResultChan():
...
// 獲取資源版本號
newResourceVersion := meta.GetResourceVersion()
switch event.Type {
//將新增資源事件新增到DeltaFIFO佇列中
case watch.Added:
err := r.store.Add(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
}
//將更新資源事件新增到DeltaFIFO佇列中
case watch.Modified:
err := r.store.Update(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
}
//將刪除資源事件新增到DeltaFIFO佇列中
case watch.Deleted:
err := r.store.Delete(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
}
...
*resourceVersion = newResourceVersion
r.setLastSyncResourceVersion(newResourceVersion)
eventCount++
}
}
...
}
```
watchHandler方法會根據傳入的資源型別呼叫不同的方法轉換成不同的Delta然後存入到DeltaFIFO佇列中。
### processLoop分發DeltaFIFO佇列中任務
processLoop方法,以1s為週期,週期性的執行。
檔案位置:tools/cache/controller.go
```go
func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
```
這裡會迴圈將DeltaFIFO佇列中資料pop出隊,然後交給Process方法進行處理,Process方法是在上面呼叫sharedIndexInformer的Run方法的資料設定,設定的方法是sharedIndexInformer的HandleDeltas方法。
```go
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
// from oldest to newest
//根據obj的Type型別進行分發
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Replaced, Added, Updated:
s.cacheMutationDetector.AddObject(d.Object)
//如果快取中存在該物件
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
//更新indexr
if err := s.indexer.Update(d.Object); err != nil {
return err
}
isSync := false
switch {
case d.Type == Sync:
// Sync events are only propagated to listeners that requested resync
isSync = true
case d.Type == Replaced:
//新老物件獲取版本號進行比較
if accessor, err := meta.Accessor(d.Object); err == nil {
if oldAccessor, err := meta.Accessor(old); err == nil {
// Replaced events that didn't change resourceVersion are treated as resync events
// and only propagated to listeners that requested resync
isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
}
}
}
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
// 如果快取中不存在該物件
} else {
if err := s.indexer.Add(d.Object); err != nil {
return err
}
s.processor.distribute(addNotification{newObj: d.Object}, false)
}
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}
```
HandleDeltas會與indexer快取互動更新我們從Delta FIFO中取到的內容,之後通過`s.processor.distribute()`進行訊息的分發。
在distribute中,sharedProcesser通過`listener.add(obj)`向每個listener分發該object。而該函式中又執行了`p.addCh <- notification`。
```go
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
if sync {
for _, listener := range p.syncingListeners {
listener.add(obj)
}
} else {
for _, listener := range p.listeners {
listener.add(obj)
}
}
}
```
這裡可以結合上面的`p.wg.Start(listener.run)`和`p.wg.Start(listener.pop)`方法來進行理解,這裡將notification傳入到addCh管道之後會觸發EventHandler事件。
這裡我用一張圖總結一下informer的Run方法流程:
![image-20201017233145402](https://img.luozhiyun.com/20201018000217.png)
至此,我們分析完了informer的所有機制。
## 總結
通過上面分析,我們全面熟悉了k8s是如何通過Informer機制實現ListAndWatch獲取並監視 API 物件變化。
熟悉了Informer與Reflector是如何協同進行資料的傳遞,但是我這裡有點遺憾的是限於篇幅,沒有去詳細的講解DeltaFIFO佇列裡面是如何進行資料的儲存與獲取,實際上這個佇列的實現也是非常的有意思的。
對於Indexer來說,我在文章裡面也只說到了獲取DeltaFIFO佇列的資料後更新到Indexer的ThreadSafeMap中,但是並沒有講ThreadSafeMap這個儲存是如何做的,裡面的索引又是如何建立的,這些各位同學感興趣的也可以去研究一下。
## Reference
https://www.kubernetes.org.cn/2693.html
https://github.com/kubernetes/sample-controller/blob/master/docs/controller-client-go.md
https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/
https://mp.weixin.qq.com/s?__biz=MzU1OTAzNzc5MQ==&mid=2247484052&idx=1&sn=cec9f4a1ee0d21c5b2c51bd147b8af59&chksm=fc1c2ea4cb6ba7b283eef5ac4a45985437c648361831bc3e6dd5f38053be1968b3389386e415&scene=21#wechat_r