1. 程式人生 > 其它 >【轉載】controller-runtime之manager的實現

【轉載】controller-runtime之manager的實現

介紹

在controller-runtime中使用一個 Manager 的介面來管理 Controller,除了控制器其實還可以管理A dmission Webhook,也包括訪問資源物件的client、cache、scheme等,如下圖所示:

Manager 如何使用

首先我們來看看controller-runtime中的Manager 是如何使用的,檢視controller-runtime程式碼倉庫中的示例,示例中關於Manager的使用步驟如下:

1、例項化 manager,引數 config

2、向 manager 新增 scheme

3、向 manager 新增 controller, 該 controller 包含一個 reconciler 結構體,我們需要在 reconciler 結構體實現邏輯處理

4、向 manager 新增 webhook,同樣需要實現邏輯處理

5、啟動 manager.start()

程式碼如下所示:

func main() {
    ctrl.SetLogger(zap.New())

    // 根據 config 例項化 Manager
    // config.GetConfigOrDie() 使用預設的配置~/.kube/config
    mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{})
    if err != nil {
        setupLog.Error(err, "unable to start manager")
        os.Exit(1)
    }

    // in a real controller, we'd create a new scheme for this
    // 將 api 註冊到 Scheme,Scheme 提供了 GVK 到 go type 的對映
    // 如果多個 crd, 需要多次呼叫 AddToScheme
    err = api.AddToScheme(mgr.GetScheme())
    if err != nil {
        setupLog.Error(err, "unable to add scheme")
        os.Exit(1)
    }

    // 註冊 Controller 到 Manager
    // For: 監控的資源,相當於呼叫 Watches(&source.Kind{Type:apiType},&handler.EnqueueRequestFOrObject{})
    // Owns:擁有的下屬資源,如果 corev1.Pod{} 資源屬於 api.ChaosPod{},也將會被監控,相當於呼叫 Watches(&source.Kind{Type: <ForType-apiType>}, &handler.EnqueueRequestForOwner{OwnerType: apiType, IsController: true})
    // reconciler 結構體:繼承 Reconciler,需要實現該結構體和 Reconcile 方法
    // mgr.GetClient()、mgr.GetScheme() 是 Client 和 Scheme,前面的 manager.New 初始化了
    err = ctrl.NewControllerManagedBy(mgr).
        For(&api.ChaosPod{}).
        Owns(&corev1.Pod{}).
        Complete(&reconciler{
            Client: mgr.GetClient(),
            scheme: mgr.GetScheme(),
        })
    if err != nil {
        setupLog.Error(err, "unable to create controller")
        os.Exit(1)
    }

    // 構建 webhook
    err = ctrl.NewWebhookManagedBy(mgr).
        For(&api.ChaosPod{}).
        Complete()
    if err != nil {
        setupLog.Error(err, "unable to create webhook")
        os.Exit(1)
    }

    // 啟動 manager,實際上是啟動 controller
    setupLog.Info("starting manager")
    if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
        setupLog.Error(err, "problem running manager")
        os.Exit(1)
    }
}

Manager是一個用於初始化共享依賴關係的結構,介面定義如下所示:

// pkg/manager/manager.go

// Manager initializes shared dependencies such as Caches and Clients, and provides them to Runnables.
// A Manager is required to create Controllers.
// Manager 初始化共享的依賴關係,比如Caches 和 Client,並將它們提供給 Runnables
type Manager interface {
    // Cluster holds a variety of methods to interact with a cluster.
    // Cluster 擁有多種與叢集互動的方法
    cluster.Cluster

    // Add will set requested dependencies on the component, and cause the component to be
    // started when Start is called.  Add will inject any dependencies for which the argument
    // implements the inject interface - e.g. inject.Client.
    // Depending on if a Runnable implements LeaderElectionRunnable interface, a Runnable can be run in either
    // non-leaderelection mode (always running) or leader election mode (managed by leader election if enabled).
    // Add 將在組建上設定所需的依賴關係,並在呼叫 Start 時啟動元件
    // Add 將注入介面的依賴關係,比如:注入inject.Client
    // 根據 Runnable 是否實現了 LeaderElectionRunnable 介面判斷
    // Runnable 可以在非 LeaderElection 模式(始終執行)或 LeaderElection 模式(如果啟用了 LeaderElection,則由 LeaderElection 管理)下執行
    Add(Runnable) error

    // Elected is closed when this manager is elected leader of a group of
    // managers, either because it won a leader election or because no leader
    // election was configured.
    // leader 選舉
    // 當贏得選舉或者為設定選舉則關閉
    Elected() <-chan struct{}

    // AddMetricsExtraHandler adds an extra handler served on path to the http server that serves metrics.
    // Might be useful to register some diagnostic endpoints e.g. pprof. Note that these endpoints meant to be
    // sensitive and shouldn't be exposed publicly.
    // If the simple path -> handler mapping offered here is not enough, a new http server/listener should be added as
    // Runnable to the manager via Add method.
    AddMetricsExtraHandler(path string, handler http.Handler) error

    // AddHealthzCheck allows you to add Healthz checker
    AddHealthzCheck(name string, check healthz.Checker) error

    // AddReadyzCheck allows you to add Readyz checker
    AddReadyzCheck(name string, check healthz.Checker) error

    // Start starts all registered Controllers and blocks until the context is cancelled.
    // Returns an error if there is an error starting any controller.
    //
    // If LeaderElection is used, the binary must be exited immediately after this returns,
    // otherwise components that need leader election might continue to run after the leader
    // lock was lost.
    // Start 啟動所有已註冊的控制器,並一直執行,直到停止通道關閉
    // 如果啟動任何控制器都出錯,則返回錯誤
    // 如果使用了 LeaderElection,則必須在此返回後立即退出二進位制檔案
    // 否則需要 Leader 選舉的元件可能會在 Leader 鎖丟失後繼續執行
    Start(ctx context.Context) error

    // GetWebhookServer returns a webhook.Server
    GetWebhookServer() *webhook.Server

    // GetLogger returns this manager's logger.
    GetLogger() logr.Logger

    // GetControllerOptions returns controller global configuration options.
    // GetControllerOptions 控制器全域性配置選項
    GetControllerOptions() v1alpha1.ControllerConfigurationSpec
}

Manager 可以關閉 Runnable 的生命週期(新增/啟動),如果不通過 Manager 啟動(需要處理各種常見的依賴關係)。

Manager 還保持共同的依賴性:client、cache、scheme等。

  • 提供了 getter(例如GetClient)
  • 簡單的注入機制(runtime/inject

此外還支援領導人選舉,只需用選項指定即可,還提供了一個用於優雅關閉的訊號處理程式。

Manager 例項化

檢視 Manager 的例項化 New 函式的實現:

// pkg/manager/manager.go

// New returns a new Manager for creating Controllers.
// New 返回用於建立控制器的新 Manager
func New(config *rest.Config, options Options) (Manager, error) {
    // Set default values for options fields
    // 設定選項欄位的預設值
    options = setOptionsDefaults(options)

    // 構造叢集
    cluster, err := cluster.New(config, func(clusterOptions *cluster.Options) {
        clusterOptions.Scheme = options.Scheme
        clusterOptions.MapperProvider = options.MapperProvider
        clusterOptions.Logger = options.Logger
        clusterOptions.SyncPeriod = options.SyncPeriod
        clusterOptions.Namespace = options.Namespace
        clusterOptions.NewCache = options.NewCache
        clusterOptions.NewClient = options.NewClient
        clusterOptions.ClientDisableCacheFor = options.ClientDisableCacheFor
        clusterOptions.DryRunClient = options.DryRunClient
        clusterOptions.EventBroadcaster = options.EventBroadcaster //nolint:staticcheck
    })
    if err != nil {
        return nil, err
    }

    // Create the recorder provider to inject event recorders for the components.
    // TODO(directxman12): the log for the event provider should have a context (name, tags, etc) specific
    // to the particular controller that it's being injected into, rather than a generic one like is here.
    recorderProvider, err := options.newRecorderProvider(config, cluster.GetScheme(), options.Logger.WithName("events"), options.makeBroadcaster)
    if err != nil {
        return nil, err
    }

    // Create the resource lock to enable leader election
    var leaderConfig *rest.Config
    var leaderRecorderProvider *intrec.Provider

    if options.LeaderElectionConfig == nil {
        leaderConfig = rest.CopyConfig(config)
        leaderRecorderProvider = recorderProvider
    } else {
        leaderConfig = rest.CopyConfig(options.LeaderElectionConfig)
        leaderRecorderProvider, err = options.newRecorderProvider(leaderConfig, cluster.GetScheme(), options.Logger.WithName("events"), options.makeBroadcaster)
        if err != nil {
            return nil, err
        }
    }

    resourceLock, err := options.newResourceLock(leaderConfig, leaderRecorderProvider, leaderelection.Options{
        LeaderElection:             options.LeaderElection,
        LeaderElectionResourceLock: options.LeaderElectionResourceLock,
        LeaderElectionID:           options.LeaderElectionID,
        LeaderElectionNamespace:    options.LeaderElectionNamespace,
    })
    if err != nil {
        return nil, err
    }

    // Create the metrics listener. This will throw an error if the metrics bind
    // address is invalid or already in use.
    metricsListener, err := options.newMetricsListener(options.MetricsBindAddress)
    if err != nil {
        return nil, err
    }

    // By default we have no extra endpoints to expose on metrics http server.
    metricsExtraHandlers := make(map[string]http.Handler)

    // Create health probes listener. This will throw an error if the bind
    // address is invalid or already in use.
    healthProbeListener, err := options.newHealthProbeListener(options.HealthProbeBindAddress)
    if err != nil {
        return nil, err
    }

    errChan := make(chan error)
    runnables := newRunnables(options.BaseContext, errChan)

    return &controllerManager{
        stopProcedureEngaged:          pointer.Int64(0),
        cluster:                       cluster,
        runnables:                     runnables,
        errChan:                       errChan,
        recorderProvider:              recorderProvider,
        resourceLock:                  resourceLock,
        metricsListener:               metricsListener,
        metricsExtraHandlers:          metricsExtraHandlers,
        controllerOptions:             options.Controller,
        logger:                        options.Logger,
        elected:                       make(chan struct{}),
        port:                          options.Port,
        host:                          options.Host,
        certDir:                       options.CertDir,
        webhookServer:                 options.WebhookServer,
        leaseDuration:                 *options.LeaseDuration,
        renewDeadline:                 *options.RenewDeadline,
        retryPeriod:                   *options.RetryPeriod,
        healthProbeListener:           healthProbeListener,
        readinessEndpointName:         options.ReadinessEndpointName,
        livenessEndpointName:          options.LivenessEndpointName,
        gracefulShutdownTimeout:       *options.GracefulShutdownTimeout,
        internalProceduresStop:        make(chan struct{}),
        leaderElectionStopped:         make(chan struct{}),
        leaderElectionReleaseOnCancel: options.LeaderElectionReleaseOnCancel,
    }, nil
}

New 函式中就是為 Manager 執行初始化工作,最後返回的是一個 controllerManager 的例項,這是因為該結構體是 Manager 介面的一個實現,所以 Manager 的真正操作都是這個結構體去實現的。

接下來最重要的是註冊 Controller 到 Manager 的過程:

err = ctrl.NewControllerManagedBy(mgr).
        For(&api.ChaosPod{}).
        Owns(&corev1.Pod{}).
        Complete(&reconciler{
            Client: mgr.GetClient(),
            scheme: mgr.GetScheme(),
        })

builder.ControllerManagedBy 函式返回一個新的控制器構造器 Builder 物件,生成的控制器將由所提供的管理器 Manager 啟動,函式實現很簡單:

// pkg/builder/controller.go

// Builder builds a Controller.
// Builder 構造一個控制器
type Builder struct {
	forInput         ForInput
	ownsInput        []OwnsInput
	watchesInput     []WatchesInput
	mgr              manager.Manager
	globalPredicates []predicate.Predicate
	ctrl             controller.Controller
	ctrlOptions      controller.Options
	name             string
}

// ControllerManagedBy returns a new controller builder that will be started by the provided Manager.
func ControllerManagedBy(m manager.Manager) *Builder {
	return &Builder{mgr: m}
}

可以看到controller-runtime封裝了一個Builder的結構體用來生成Controller,將Manager傳遞給這個構造器,然後就是呼叫構造器的For函數了:

// pkg/builder/controller.go

// ForInput represents the information set by For method.
// ForInput 標識 For 方法設定的資訊
type ForInput struct {
    object           client.Object
    predicates       []predicate.Predicate
    objectProjection objectProjection
    err              error
}

// For defines the type of Object being *reconciled*, and configures the ControllerManagedBy to respond to create / delete /
// update events by *reconciling the object*.
// This is the equivalent of calling
// Watches(&source.Kind{Type: apiType}, &handler.EnqueueRequestForObject{}).
// For 函式定義了被調諧的物件型別
// 並配置 ControllerManagerBy 通過調諧物件來響應 create/delete/update 事件
// 呼叫 For 函式相當於呼叫:
// Watches(&source.Kind{Type: apiType}, &handler.EnqueueRequestForObject{}).
func (blder *Builder) For(object client.Object, opts ...ForOption) *Builder {
    if blder.forInput.object != nil {
        blder.forInput.err = fmt.Errorf("For(...) should only be called once, could not assign multiple objects for reconciliation")
        return blder
    }
    input := ForInput{object: object}
    for _, opt := range opts {
        opt.ApplyToFor(&input)
    }

    blder.forInput = input
    return blder
}

For 函式就是用來定義我們要處理的物件型別的,接著呼叫了 Owns 函式:

// pkg/builder/controller.go

// Owns defines types of Objects being *generated* by the ControllerManagedBy, and configures the ControllerManagedBy to respond to
// create / delete / update events by *reconciling the owner object*.  This is the equivalent of calling
// Watches(&source.Kind{Type: <ForType-forInput>}, &handler.EnqueueRequestForOwner{OwnerType: apiType, IsController: true}).
// Owns 定義了 ControllerManagerBy 生成的物件型別
// 並配置 ControllerManagerBy 通過調協所有者物件來響應 create/delete/update 事件
// 這相當於呼叫:
// Watches(&source.Kind{Type: <ForType-forInput>}, &handler.EnqueueRequestForOwner{OwnerType: apiType, IsController: true})
func (blder *Builder) Owns(object client.Object, opts ...OwnsOption) *Builder {
    input := OwnsInput{object: object}
    for _, opt := range opts {
        opt.ApplyToOwns(&input)
    }

    blder.ownsInput = append(blder.ownsInput, input)
    return blder
}

Owns 函式就是來配置我們監聽的資源物件的子資源,如果想要協調資源則需要呼叫 Owns 函式進行配置,然後就是最重要的 Complete 函數了:

// pkg/builder/controller.go

// Build builds the Application Controller and returns the Controller it created.
// Build 構建應用程式 ControllerManagedBy 並返回它建立的 Controller
func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, error) {
    if r == nil {
        return nil, fmt.Errorf("must provide a non-nil Reconciler")
    }
    if blder.mgr == nil {
        return nil, fmt.Errorf("must provide a non-nil Manager")
    }
    if blder.forInput.err != nil {
        return nil, blder.forInput.err
    }
    // Checking the reconcile type exist or not
    if blder.forInput.object == nil {
        return nil, fmt.Errorf("must provide an object for reconciliation")
    }

    // Set the ControllerManagedBy
    // 配置 ControllerManagedBy
    if err := blder.doController(r); err != nil {
        return nil, err
    }

    // Set the Watch
    // 設定 Watch
    if err := blder.doWatch(); err != nil {
        return nil, err
    }

    return blder.ctrl, nil
}

Complete 函式通過呼叫 Build 函式來構建 Controller,其中比較重要的就是 doControllerdoWatch 兩個函式,doController 就是去真正例項化 Controller 的函式:

// pkg/builder/controller.go

func (blder *Builder) doController(r reconcile.Reconciler) error {
    globalOpts := blder.mgr.GetControllerOptions()

    ctrlOptions := blder.ctrlOptions
    if ctrlOptions.Reconciler == nil {
        ctrlOptions.Reconciler = r
    }

    // Retrieve the GVK from the object we're reconciling
    // to prepopulate logger information, and to optionally generate a default name.
    // 從我們正在調協的物件中檢索GVK
    gvk, err := getGvk(blder.forInput.object, blder.mgr.GetScheme())
    if err != nil {
        return err
    }

    // Setup concurrency.
    // 設定併發
    if ctrlOptions.MaxConcurrentReconciles == 0 {
        groupKind := gvk.GroupKind().String()

        if concurrency, ok := globalOpts.GroupKindConcurrency[groupKind]; ok && concurrency > 0 {
            ctrlOptions.MaxConcurrentReconciles = concurrency
        }
    }

    // Setup cache sync timeout.
    // 設定快取同步超市時間
    if ctrlOptions.CacheSyncTimeout == 0 && globalOpts.CacheSyncTimeout != nil {
        ctrlOptions.CacheSyncTimeout = *globalOpts.CacheSyncTimeout
    }

    // 根據GVK獲取控制器名
    controllerName := blder.getControllerName(gvk)

    // Setup the logger.
    // 設定日誌 Logger
    if ctrlOptions.LogConstructor == nil {
        log = blder.mgr.GetLogger().WithValues(
            "controller", controllerName,
            "controllerGroup", gvk.Group,
            "controllerKind", gvk.Kind,
        )

        lowerCamelCaseKind := strings.ToLower(gvk.Kind[:1]) + gvk.Kind[1:]

        ctrlOptions.LogConstructor = func(req *reconcile.Request) logr.Logger {
            log := log
            if req != nil {
                log = log.WithValues(
                    lowerCamelCaseKind, klog.KRef(req.Namespace, req.Name),
                    "namespace", req.Namespace, "name", req.Name,
                )
            }
            return log
        }
    }

    // Build the controller and return.
    // 構造 Controller
    // var newController = controller.New
    blder.ctrl, err = newController(controllerName, blder.mgr, ctrlOptions)
    return err
}

上面的函式通過獲取資源物件的 GVK 來獲取 Controller 的名稱,最後通過一個 newController 函式(controller.New 的別名)來例項化一個真正的 Controller:

// pkg/builder/controller.go

// New returns a new Controller registered with the Manager.  The Manager will ensure that shared Caches have
// been synced before the Controller is Started.
// New 返回一個在 Manager 註冊的 Controller
// Manager 將確保共享快取在控制器啟動前已經同步
func New(name string, mgr manager.Manager, options Options) (Controller, error) {
	c, err := NewUnmanaged(name, mgr, options)
	if err != nil {
		return nil, err
	}

	// Add the controller as a Manager components
	// 將 controller 作為 manager 的元件
	return c, mgr.Add(c)
}

// NewUnmanaged returns a new controller without adding it to the manager. The
// caller is responsible for starting the returned controller.
// NewUnmanaged 返回一個新的控制器,而不將其新增到 manager 中
// 呼叫者負責啟動返回的控制器
func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller, error) {
	if options.Reconciler == nil {
		return nil, fmt.Errorf("must specify Reconciler")
	}

	if len(name) == 0 {
		return nil, fmt.Errorf("must specify Name for Controller")
	}

	if options.LogConstructor == nil {
		log := mgr.GetLogger().WithValues(
			"controller", name,
		)
		options.LogConstructor = func(req *reconcile.Request) logr.Logger {
			log := log
			if req != nil {
				log = log.WithValues(
					"object", klog.KRef(req.Namespace, req.Name),
					"namespace", req.Namespace, "name", req.Name,
				)
			}
			return log
		}
	}

	if options.MaxConcurrentReconciles <= 0 {
		options.MaxConcurrentReconciles = 1
	}

	if options.CacheSyncTimeout == 0 {
		options.CacheSyncTimeout = 2 * time.Minute
	}

	if options.RateLimiter == nil {
		options.RateLimiter = workqueue.DefaultControllerRateLimiter()
	}

	// Inject dependencies into Reconciler
	// 在 Reconciler 中注入依賴關係
	if err := mgr.SetFields(options.Reconciler); err != nil {
		return nil, err
	}

	// Create controller with dependencies set
	// 建立 Controller 並配置依賴關係
	return &controller.Controller{
		Do: options.Reconciler,
		MakeQueue: func() workqueue.RateLimitingInterface {
			return workqueue.NewNamedRateLimitingQueue(options.RateLimiter, name)
		},
		MaxConcurrentReconciles: options.MaxConcurrentReconciles,
		CacheSyncTimeout:        options.CacheSyncTimeout,
		SetFields:               mgr.SetFields,
		Name:                    name,
		LogConstructor:          options.LogConstructor,
		RecoverPanic:            options.RecoverPanic,
	}, nil
}

可以看到NewUnmanaged函式才是真正例項化 Controller 的地方,終於和前文的 Controller 聯絡起來來,Controller 例項化完成後,又通過 mgr.Add(c) 函式將控制器新增到 Manager 中去進行管理,所以我們還需要去檢視下 Manager 的 Add 函式的實現,當然是看 controllerManager 中的具體實現:

// pkg/manager/manager.go

// Runnable allows a component to be started.
// It's very important that Start blocks until
// it's done running.
// Runnable 允許一個元件被啟動
type Runnable interface {
    // Start starts running the component.  The component will stop running
    // when the context is closed. Start blocks until the context is closed or
    // an error occurs.
    Start(context.Context) error
}

//  pkg/manager/internal.go

// Add sets dependencies on i, and adds it to the list of Runnables to start.
// Add 設定i的依賴,並將其他新增到 Runnables 列表啟動
func (cm *controllerManager) Add(r Runnable) error {
    cm.Lock()
    defer cm.Unlock()
    return cm.add(r)
}

func (cm *controllerManager) add(r Runnable) error {
    // Set dependencies on the object
    // 設定物件的依賴
    if err := cm.SetFields(r); err != nil {
        return err
    }
    return cm.runnables.Add(r)
}

// pkg/manager/runnable_group.go

// Add adds a runnable to closest group of runnable that they belong to.
//
// Add should be able to be called before and after Start, but not after StopAndWait.
// Add should return an error when called during StopAndWait.
// The runnables added before Start are started when Start is called.
// The runnables added after Start are started directly.
// Add將runnable新增到它們所屬的最近的runnable組。
// Add應該能夠在Start之前和之後呼叫,但不能在StopAndWait之後呼叫。
// 在StopAndWait期間呼叫Add時應返回錯誤。
// 呼叫Start時,啟動在Start之前新增的可執行項。
// 啟動後新增的可執行項直接啟動。
func (r *runnables) Add(fn Runnable) error {
    switch runnable := fn.(type) {
    case hasCache:
        return r.Caches.Add(fn, func(ctx context.Context) bool {
            return runnable.GetCache().WaitForCacheSync(ctx)
        })
    case *webhook.Server:
        return r.Webhooks.Add(fn, nil)
    case LeaderElectionRunnable:
        if !runnable.NeedLeaderElection() {
            return r.Others.Add(fn, nil)
        }
        return r.LeaderElection.Add(fn, nil)
    default:
        return r.LeaderElection.Add(fn, nil)
    }
}

controllerManager 的 Add 函式傳遞的是一個 Runnable 引數,Runnable 是一個介面,用來表示可以啟動的一個元件,而恰好 Controller 實際上就實現了這個介面的 Start 函式,所以可以通過 Add 函式來新增 Controller 例項,在 Add 函式中除了依賴注入之外,還根據 Runnable 來判斷元件是否支援選舉功能,支援則將元件加入到 leaderElectionRunnables 列表中,否則加入到 nonLeaderElectionRunnables 列表中,這點非常重要,涉及到後面控制器的啟動方式。

啟動過Manager

如果 Manager 已經啟動了,現在呼叫 Add 函式來新增 Runnable,則需要立即呼叫 startRunnable 函式啟動控制器,startRunnable 函式就是在一個 goroutine 中去呼叫 Runnable 的 Start 函式,這裡就相當於呼叫 Controller 的 Start 函式來啟動控制器了。

到這裡就例項化 Controller 完成了,回到前面 Builder 的 build 函式中,doController 函式呼叫完成,接著是 doWatch 函式的實現:

// pkg/builder/controller.go

func (blder *Builder) doWatch() error {
    // Reconcile type
    // 調協型別
    typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
    if err != nil {
        return err
    }
    src := &source.Kind{Type: typeForSrc}
    hdler := &handler.EnqueueRequestForObject{}
    allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
    if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
        return err
    }

    // Watches the managed types
    // Watches 管理的型別(子型別)
    for _, own := range blder.ownsInput {
        typeForSrc, err := blder.project(own.object, own.objectProjection)
        if err != nil {
            return err
        }
        src := &source.Kind{Type: typeForSrc}
        hdler := &handler.EnqueueRequestForOwner{
            OwnerType:    blder.forInput.object,
            IsController: true,
        }
        allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
        allPredicates = append(allPredicates, own.predicates...)
        if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
            return err
        }
    }

    // Do the watch requests
    // 執行 watch 請求
    for _, w := range blder.watchesInput {
        allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
        allPredicates = append(allPredicates, w.predicates...)

        // If the source of this watch is of type *source.Kind, project it.
        if srckind, ok := w.src.(*source.Kind); ok {
            typeForSrc, err := blder.project(srckind.Type, w.objectProjection)
            if err != nil {
                return err
            }
            srckind.Type = typeForSrc
        }

        if err := blder.ctrl.Watch(w.src, w.eventhandler, allPredicates...); err != nil {
            return err
        }
    }
    return nil
}

上面的 doWatch 函式就是去將我們需要調諧的資源物件放到 Controller 中進行 Watch 操作,包括資源物件管理的子型別,都需要去執行 Watch 操作,這就又回到了前面 Controller 的 Watch 操作了,其實就是去註冊 Informer 的事件監聽器,將資料新增到工作佇列中去。這樣到這裡我們就將 Controller 初始化完成,併為我們調諧的資源物件執行了 Watch 操作。

最後是呼叫 Manager 的 Start 函式來啟動 Manager,由於上面我們已經把 Controller 新增到了 Manager 中,所以這裡啟動其實是啟動關聯的 Controller,啟動函式實現如下所示:

// pkg/manager/internal.go

// Start starts the manager and waits indefinitely.
// There is only two ways to have start return:
// An error has occurred during in one of the internal operations,
// such as leader election, cache start, webhooks, and so on.
// Or, the context is cancelled.
// Start 啟動管理器並無限期等待
// 只有兩種情況讓Start 返回:
// 在其中一個內部操作中發生錯誤
// 例如領導人選舉、cache start、webhooks等等。
// 或者 context 取消
func (cm *controllerManager) Start(ctx context.Context) (err error) {
    // 判斷是否啟動,如果已經啟動,則直接返回
    cm.Lock()
    if cm.started {
        cm.Unlock()
        return errors.New("manager already started")
    }
    var ready bool
    defer func() {
        // Only unlock the manager if we haven't reached
        // the internal readiness condition.
        if !ready {
            cm.Unlock()
        }
    }()

    // Initialize the internal context.
    // 初始化內部的 context
    cm.internalCtx, cm.internalCancel = context.WithCancel(ctx)

    // This chan indicates that stop is complete, in other words all runnables have returned or timeout on stop request
    // 此chan表示停止已完成,換句話說,所有可執行程式都已返回或在停止請求時超時
    stopComplete := make(chan struct{})
    defer close(stopComplete)
    // This must be deferred after closing stopComplete, otherwise we deadlock.
    // stopComplete 關閉後必須在 defer 執行下面的操作,否則會出現死鎖
    defer func() {
        // https://hips.hearstapps.com/hmg-prod.s3.amazonaws.com/images/gettyimages-459889618-1533579787.jpg
        stopErr := cm.engageStopProcedure(stopComplete)
        if stopErr != nil {
            if err != nil {
                // Utilerrors.Aggregate allows to use errors.Is for all contained errors
                // whereas fmt.Errorf allows wrapping at most one error which means the
                // other one can not be found anymore.
                err = kerrors.NewAggregate([]error{err, stopErr})
            } else {
                err = stopErr
            }
        }
    }()

    // Add the cluster runnable.
    // 新增叢集 runnable
    if err := cm.add(cm.cluster); err != nil {
        return fmt.Errorf("failed to add cluster to runnables: %w", err)
    }

    // Metrics should be served whether the controller is leader or not.
    // (If we don't serve metrics for non-leaders, prometheus will still scrape
    // the pod but will get a connection refused).
    // Metrics 服務
    if cm.metricsListener != nil {
        cm.serveMetrics()
    }

    // Serve health probes.
    // 健康檢查
    if cm.healthProbeListener != nil {
        cm.serveHealthProbes()
    }

    // First start any webhook servers, which includes conversion, validation, and defaulting
    // webhooks that are registered.
    //
    // WARNING: Webhooks MUST start before any cache is populated, otherwise there is a race condition
    // between conversion webhooks and the cache sync (usually initial list) which causes the webhooks
    // to never start because no cache can be populated.
    if err := cm.runnables.Webhooks.Start(cm.internalCtx); err != nil {
        if !errors.Is(err, wait.ErrWaitTimeout) {
            return err
        }
    }

    // Start and wait for caches.
    // 啟動並等待快取同步
    if err := cm.runnables.Caches.Start(cm.internalCtx); err != nil {
        if !errors.Is(err, wait.ErrWaitTimeout) {
            return err
        }
    }

    // Start the non-leaderelection Runnables after the cache has synced.
    if err := cm.runnables.Others.Start(cm.internalCtx); err != nil {
        if !errors.Is(err, wait.ErrWaitTimeout) {
            return err
        }
    }

    // Start the leader election and all required runnables.
    {
        ctx, cancel := context.WithCancel(context.Background())
        cm.leaderElectionCancel = cancel
        go func() {
            if cm.resourceLock != nil {
                if err := cm.startLeaderElection(ctx); err != nil {
                    cm.errChan <- err
                }
            } else {
                // Treat not having leader election enabled the same as being elected.
                if err := cm.startLeaderElectionRunnables(); err != nil {
                    cm.errChan <- err
                }
                close(cm.elected)
            }
        }()
    }

    ready = true
    cm.Unlock()
    select {
    case <-ctx.Done():
        // We are done
        return nil
    case err := <-cm.errChan:
        // Error starting or running a runnable
        return err
    }
}

上面的啟動函式其實就是去啟動前面我們加入到 Manager 中的 Runnable(Controller),非 LeaderElection 的列表與 LeaderElection 的列表都分別在一個 goroutine 中啟動:

// pkg/manager/runnable_group.go

// Start starts the group and waits for all
// initially registered runnables to start.
// It can only be called once, subsequent calls have no effect.
// Start啟動組並等待所有最初註冊的可執行程式啟動。
// 只能呼叫一次,後續呼叫無效。
func (r *runnableGroup) Start(ctx context.Context) error {
    var retErr error

    r.startOnce.Do(func() {
        defer close(r.startReadyCh)

        // Start the internal reconciler.
        go r.reconcile()

        // Start the group and queue up all
        // the runnables that were added prior.
        r.start.Lock()
        r.started = true
        for _, rn := range r.startQueue {
            rn.signalReady = true
            r.ch <- rn
        }
        r.start.Unlock()

        // If we don't have any queue, return.
        if len(r.startQueue) == 0 {
            return
        }

        // Wait for all runnables to signal.
        for {
            select {
            case <-ctx.Done():
                if err := ctx.Err(); !errors.Is(err, context.Canceled) {
                    retErr = err
                }
            case rn := <-r.startReadyCh:
                for i, existing := range r.startQueue {
                    if existing == rn {
                        // Remove the item from the start queue.
                        r.startQueue = append(r.startQueue[:i], r.startQueue[i+1:]...)
                        break
                    }
                }
                // We're done waiting if the queue is empty, return.
                if len(r.startQueue) == 0 {
                    return
                }
            }
        }
    })

    return retErr
}

可以看到最終還是去呼叫的 Runnable 的 Start 函式來啟動,這裡其實也就是 Controller 的 Start 函式,這個函式相當於啟動一個控制迴圈不斷從工作佇列中消費資料,然後給到一個 Reconciler 介面進行處理,也就是我們要去實現的 Reconcile(Request) (Result, error) 這個業務邏輯函式。

到這裡我們就完成了 Manager 的整個啟動過程,包括 Manager 是如何初始化,如何和 Controller 進行關聯以及如何啟動 Controller 的。