1. 程式人生 > >kubernetes1.9原始碼閱讀 replication controller的Informer機制_Kubernetes中文社群

kubernetes1.9原始碼閱讀 replication controller的Informer機制_Kubernetes中文社群

replication controller是kube-controller-manager中一個重要的控制器,主要是rs進行控制,確保pods的數量恰好和rs的規定一致。因此replication controller主要對這兩類進行watch,一類是replicationset,另一類是pods。本文是replication controller的原始碼閱讀筆記,會包括client-go的Informer機制,希望幫助開始閱讀kubernetes原始碼的小夥伴們,更希望與對kubernetes原始碼閱讀感興趣的小夥伴兒們交流,有錯誤的地方也希望能指出,共同進步。

入口程式

cmd/kube-controller-manager/controller-manager.go main

1. 呼叫options.NewCMServer,構建CMServer;

2. 呼叫app.Run方法,執行CMServer;

啟動CMServer

cmd/kube-controller-manager/app/controllermanager.go Run

1. 呼叫createClients, 建立apiserver客戶端,通過REST方式訪問APIserver提供的API服務;

2. 啟動協程呼叫go startHTTP,執行http Server;

3. 呼叫record.NewBroadcaster,建立eventBroadcaster物件,接收EventBroadcaster傳送的event,輸出到logging中,並輸出到EventSink,並使用recorder記錄”controller-mananger”的事件;

4. 呼叫CreateControllerContext,在CreateControllerContext方法中,會呼叫informers.NewSharedInformerFactory,建立sharedInformerFactory(client-go/informers/factory.go)物件;

5. 呼叫StartControllers,啟動Controllers;

6. 呼叫ctx.InformerFactory.Start,在這裡是呼叫sharedInformerFactory.start(client-go/informers/factory.go);

7. saTokenControllerInitFunc和NewControllerInitializers定義了controllers的InitFunc;

startReplicationController

cmd/kube-controller-manager/app/core.go startReplicationController

1. 協程啟動呼叫replicationcontroller.NewReplicationManager,構建ReplicationManager;

(1) 呼叫ctx.InformerFactory.Core().V1().Pods(),這裡呼叫sharedInformerFactory(client-go/informers/factory.go)物件的Core().V1().Pods()方法,將會構建PodInformer物件,

(2) 以此方式,建立ReplicationControllersInformer;

2. 執行ReplicationManager;

NewReplicationManager

pkg/controller/replication/replication_set.go NewReplicationManager

1. 在NewReplicationManager中,

(1) 首先,呼叫record.NewBroadcaster,建立eventBroadcaster物件,呼叫eventBroadcaster.StartLogging,接收EventBroadcaster傳送的event,輸出到logging中;呼叫 eventBroadcaster.StartRecordingToSink,event輸出到EventSink,並呼叫eventBroadcaster.NewRecorder記錄”replication-controller”的事件;

(2) 將呼叫NewBaseController;

2. 在NewBaseController方法中,

(1) 構建ReplicaSetController物件,包括了podControl,它定義了對Pod的操作,是由RealPodControl去呼叫apiserver完成建立實現;

(2) 將呼叫 rsInformer.Informer().AddEventHandler,這將呼叫rsInformer的建構函式NewReplicaSetInformer,rsInformer將event handler包裝成listerner,然後新增到s.processor.listeners中,並定義物件處理的回撥函式AddFunc、UpdateFunc、DeleteFunc;

(3) 同時,呼叫rsInformer的Lister方法;

(4) 最後,呼叫rsInformer.Informer().HasSynced,判斷是否快取完成;

(5) 以此方式,呼叫podInformer.Informer().AddEventHandler、podInformer的Lister方法及podInformer.Informer().HasSynced;

(6) 設定rsc.syncHandler;syncHandler負責pod與rc的同步,確保Pod副本數與rc規定的相同;

PodInformer

client-go/informers/core/v1/pod.go NewPodInformer

1. 構建cache.listWatch物件,定義了ListFunc和WatchFunc;

2. 呼叫cache.NewSharedIndexInformer;

NewSharedIndexInformer

client-go/tools/cache/shared_informer.go NewSharedIndexInformer

執行ReplicationManager

pkg/controller/replicaset/replica_set.go Run

1. 呼叫controller.WaitForCacheSync方法,在controller.WaitForCacheSync中,將呼叫ca che.WaitForCacheSync;

2. 呼叫rsc.worker, 將啟動workers呼叫rsc.syncHandler,syncHandler負責pod與rc的同步,確保Pod副本數與rc規定的相同;

sharedInformerFactory.Start

client-go/informers/factory.go Start

1. 呼叫Informer.Run,這裡呼叫SharedIndexInformer.Run;

SharedIndexInformer.Run

client-go/tools/cache/shared_informer.go Run

1. 呼叫NewDeltaFIFO,建立queue;

2. 定義Deltas處理函式s.HandleDeltas;

3. 呼叫New(cfg),構建sharedIndexInformer的controller;

4. 呼叫s.cacheMutationDetector.Run,檢查快取物件是否變化;

5. 呼叫s.processor.run,將呼叫sharedProcessor.run,會呼叫Listener.run和Listener.pop,執行處理queue的函式;

6. 呼叫s.controller.Run,構建Reflector,進行對etcd的快取;

sharedIndexedInformer.controller.Run

client-go/tools/cache/controller.go Run

1. 呼叫NewReflector,構建Reflector;

(1) Reflector物件,包括ListerWatcher、ObjectType、Queue、FullResyncPeriod;

2. 呼叫r.run,將呼叫reflector.ListAndWatch,執行r.List、r.watch、r.watchHandler,進行對etcd的快取;

3. 呼叫c.processLoop,reflector向queue裡面新增資料,processLoop會不停去消費這裡這些資料;

controller.processLoop

client-go/tools/cache/controller.go processLoop

1. cache.PopProcessFunc(c.config.Process)將前面Process函式傳遞進去;

DeltaFIFO.Pop

client-go/tools/cache/delta_fifo.go Pop

1. 主要從f.items取出object,然後呼叫process函式進行處理;

處理DeltaFIFO

client-go/tools/cache/shared_informer.go HandleDeltas

1. 呼叫s.process.distribute,將呼叫Listener.add,負責將watch的資源傳到listener;

Listener.add/pop/run

client-go/tools/cache/shared_informer.go sharedProcessor.run/add/pop;

1. listenser的add函式負責將notify裝進pendingNotifications;

2. pop函式取出pendingNotifications的第一個nofify,輸出到nextCh channel;

3. run函式則負責取出notify,然後根據notify的型別(增加、刪除、更新)觸發相應的處理函式,這些函式在ReplicaSetController註冊,分別是:rsc.addPod、rsc.updatePod、rsc.deletePod、rsc.enqueueReplicaSet、rsc.updateRS、rsc.enqueueReplicaSet;

rsc.addPod

pkg/controller/replicaset/replica_set.go addPod

1. 首先會根據pod返回rc,當pod不屬於任何rc時,則返回。找到rc以後,更新rm.expectations.CreationObserved這個rc的期望值,也就是假如一個rc有4個pod,現在檢測到建立了一個pod,則會將這個rc的期望值減少,變為3。然後將這個rc放入佇列;

2. 呼叫rsc.enqueueReplicaSet,將呼叫rsc.queue.Add;

rsc.worker

pkg/controller/replicaset/replica_set.go worker()

1. 呼叫rsc.syncHandler,這裡會呼叫rsc.syncReplicaSet,syncReplicaSet負責pod與rc的同步,確保Pod副本數與rc規定的相同;