【kubernetes/k8s原始碼分析】 client-go包之Informer原始碼分析
阿新 • • 發佈:2018-11-06
Informer 簡介
Informer 是 Client-go 中的一個核心工具包。如果 Kubernetes 的某個元件,需要 List/Get Kubernetes 中的 Object(包括pod,service等等),可以直接使用 Informer 例項中的 Lister()方法(包含 Get 和 List 方法)。Informer 最基本 的功能就是 List/Get Kubernetes 中的 Object,還可以監聽事件並觸發回撥函式等
- Informer 的 Lister() 方法(List/Get) ,Informer 不會去請求 Kubernetes API,而是查詢快取在本地記憶體中的資料(由 Informer 自己維護)。好處是Informer 可以快速地返回結果,也減少對 Kubernetes API 的直接呼叫引起的額外開銷。
- Informer 只會呼叫 Kubernetes List 和 Watch 的 API。Informer 在初始化的時,先呼叫 Kubernetes List API 獲得資源的全部 ,快取在記憶體中,然後呼叫 Watch API 去 watch 這種 resource,維護這份快取保持一致性
- Informer 可以自定義回撥函式,ResourceEventHandler,只需實現 OnAdd OnUpdate OnDelete 方法,分別對應 informer 監聽建立、更新和刪除事件型別。
原始碼分析
1. 初始化過程
路徑cmd/kube-controller-manager/app/controllermanager.go
- Run方法-> CreateControllerContext -> ctx.InformerFactory.Start
- 初始化SharedInformerFactory,並賦值到ctx上下文中
func CreateControllerContext(s *options.CMServer, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}) (ControllerContext, error) { versionedClient := rootClientBuilder.ClientOrDie("shared-informers") sharedInformers := informers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)()) availableResources, err := GetAvailableResources(rootClientBuilder) if err != nil { return ControllerContext{}, err } cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile) if err != nil { return ControllerContext{}, fmt.Errorf("cloud provider could not be initialized: %v", err) } ctx := ControllerContext{ ClientBuilder: clientBuilder, InformerFactory: sharedInformers, Options: *s, AvailableResources: availableResources, Cloud: cloud, Stop: stop, InformersStarted: make(chan struct{}), } return ctx, nil }
- 呼叫SharedInformerFactory的start方法
// Start initializes all requested informers.
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()
for informerType, informer := range f.informers {
if !f.startedInformers[informerType] {
go informer.Run(stopCh)
f.startedInformers[informerType] = true
}
}
}
2. sharedIndexInformer函式
路徑: client-go/tools/cache/shared_informer.go
- 初始化DeltaFIFO,建立queue
- s.cacheMutationDetector.Run檢查快取物件是否變化
- processorStopCh, s.processor.run呼叫Listener.run和Listener.pop,處理queue
- s.controller.Run(stopCh)構建Reflector,先list然後在watch
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, s.indexer)
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas,
}
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
wg.StartWithChannel(processorStopCh, s.processor.run)
defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true // Don't want any new listeners
}()
s.controller.Run(stopCh)
}
2.1 NewDeltaFIFO函式
路徑:client-go/tools/cache/delta_fifo.go
- DeltaFIFO可以理解生產消費的佇列,Reflector為生產者
func NewDeltaFIFO(keyFunc KeyFunc, compressor DeltaCompressor, knownObjects KeyListerGetter) *DeltaFIFO {
f := &DeltaFIFO{
items: map[string]Deltas{},
queue: []string{},
keyFunc: keyFunc,
deltaCompressor: compressor,
knownObjects: knownObjects,
}
f.cond.L = &f.lock
return f
}
2.2 CacheMutationDetector介面Run方法
defaultCacheMutationDetector實現了介面
func (d *defaultCacheMutationDetector) Run(stopCh <-chan struct{}) {
// we DON'T want protection from panics. If we're running this code, we want to die
for {
d.CompareObjects()
select {
case <-stopCh:
return
case <-time.After(d.period):
}
}
}
2.3 Process的run方法
呼叫Listener.run和Listener.pop,處理queue
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
func() {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
}()
<-stopCh
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
}
p.wg.Wait() // Wait for all .pop() and .run() to stop
}
2.3.1 呼叫listener.run方法
主要是根據updateNotification addNotification deleteNotification,對應相應的函式進行處理
func (p *processorListener) run() {
defer utilruntime.HandleCrash()
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
}
}
}
2.3.3 呼叫listener.pop方法
func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
defer close(p.nextCh) // Tell .run() to stop
var nextCh chan<- interface{}
var notification interface{}
for {
select {
case nextCh <- notification:
// Notification dispatched
var ok bool
notification, ok = p.pendingNotifications.ReadOne()
if !ok { // Nothing to pop
nextCh = nil // Disable this select case
}
case notificationToAdd, ok := <-p.addCh:
if !ok {
return
}
if notification == nil { // No notification to pop (and pendingNotifications is empty)
// Optimize the case - skip adding to pendingNotifications
notification = notificationToAdd
nextCh = p.nextCh
} else { // There is already a notification waiting to be dispatched
p.pendingNotifications.WriteOne(notificationToAdd)
}
}
}
}
2.4 controller的Run函式
- 主要是生成物件Reflector,呼叫其Run方法
- processLoop函式
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.clock = c.clock
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
var wg wait.Group
defer wg.Wait()
wg.StartWithChannel(stopCh, r.Run)
wait.Until(c.processLoop, time.Second, stopCh)
}
2.4.1 Reflector的Run函式
Reflector主要對接kubernetes API,呼叫list/watch方法,更新快取(先list所有在進行watch操作i)
// Run starts a watch and handles watch events. Will restart the watch if it is closed.
// Run will exit when stopCh is closed.
func (r *Reflector) Run(stopCh <-chan struct{}) {
glog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
wait.Until(func() {
if err := r.ListAndWatch(stopCh); err != nil {
utilruntime.HandleError(err)
}
}, r.period, stopCh)
}
2.4.2 ProcessLoop主要呼叫HandleDeltas
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Added, Updated:
isSync := d.Type == Sync
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
if err := s.indexer.Update(d.Object); err != nil {
return err
}
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
if err := s.indexer.Add(d.Object); err != nil {
return err
}
s.processor.distribute(addNotification{newObj: d.Object}, isSync)
}
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}