【轉載】Controller-runtime控制器的實現
介紹
controller-runtime框架實際上是社群幫我們封裝的一個控制器處理的框架,底層核心實現原理和我們自定義一個controller控制器邏輯是一樣的,只是在這個基礎上新增了一些概念,開發者直接使用這個框架去開發控制器會更加簡單方便。
包括kubebuilder、operator-sdk這些框架其實都是在controller-runtime基礎上做了一層封裝,方便開發者快速生成專案的腳手架而已。
下面我們就來分析下controller-runtime是如何實現的控制器處理。
Controller實現
首先我們還是去檢視下控制器的定義以及控制器是如何啟動的。控制器的定義結構體如下所示:
// pkg/internal/controller/controller.go // Controller implements controller.Controller. type Controller struct { // Name is used to uniquely identify a Controller in tracing, logging and monitoring. Name is required. // Name 是用於追蹤、記錄、監控的唯一標識,必填欄位 Name string // MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1. // 可以執行的最大併發 Reconciles 數量,預設值為 1 MaxConcurrentReconciles int // Reconciler is a function that can be called at any time with the Name / Namespace of an object and // ensures that the state of the system matches the state specified in the object. // Defaults to the DefaultReconcileFunc. // Reconciler 是一個可以隨時呼叫物件的 Name/Namespace 的函式 // 確保系統狀態與物件狀態中指定的狀態一致,預設為 DefaultReconcileFunc 函式 Do reconcile.Reconciler // MakeQueue constructs the queue for this controller once the controller is ready to start. // This exists because the standard Kubernetes workqueues start themselves immediately, which // leads to goroutine leaks if something calls controller.New repeatedly. // 一旦控制器準備好啟動,MakeQueue就會為此控制器構造佇列。 // 這是因為標準的Kubernetes工作佇列會立即啟動,如果有東西反覆呼叫controller.New,就會導致goroutine洩漏。 MakeQueue func() workqueue.RateLimitingInterface // Queue is an listeningQueue that listens for events from Informers and adds object keys to // the Queue for processing // Queue 通過監聽來自 Informer 的事件,新增物件鍵到佇列中進行處理 // MakeQueue 屬性就是用來構造這個工作佇列的 Queue workqueue.RateLimitingInterface // SetFields is used to inject dependencies into other objects such as Sources, EventHandlers and Predicates // Deprecated: the caller should handle injected fields itself. // SetFields 用來將依賴關係注入到其他物件,比如 Sources、EventHandlers、Predicates // 不推薦:呼叫者應該自己處理注入的欄位 SetFields func(i interface{}) error // mu is used to synchronize Controller setup // 控制器同步訊號量 mu sync.Mutex // Started is true if the Controller has been Started // 控制器是否已經啟動 Started bool // ctx is the context that was passed to Start() and used when starting watches. // // According to the docs, contexts should not be stored in a struct: https://golang.org/pkg/context, // while we usually always strive to follow best practices, we consider this a legacy case and it should // undergo a major refactoring and redesign to allow for context to not be stored in a struct. // ctx 是傳遞給 Start() 並在啟動 watch 時候的上下文 ctx context.Context // CacheSyncTimeout refers to the time limit set on waiting for cache to sync // Defaults to 2 minutes if not set. // CacheSyncTimeout 指等待快取同步的時間限制 // 如果沒有設定,預設兩分鐘 CacheSyncTimeout time.Duration // startWatches maintains a list of sources, handlers, and predicates to start when the controller is started. // startWatches 維護了一個 sources、handlers 以及 predicates 列表以方便在控制器啟動的時候啟動 startWatches []watchDescription // LogConstructor is used to construct a logger to then log messages to users during reconciliation, // or for example when a watch is started. // Note: LogConstructor has to be able to handle nil requests as we are also using it // outside the context of a reconciliation. LogConstructor func(request *reconcile.Request) logr.Logger // RecoverPanic indicates whether the panic caused by reconcile should be recovered. // RecoverPanic 標識是否恢復由 reconcile 引發的 panic RecoverPanic bool }
上面的結構體就是 controller-runtime 中定義的控制器結構體,我們可以看到結構體中仍然有一個限速的工作佇列,但是看上去沒有資源物件的 Informer 或者 Indexer 的資料,實際上這裡是通過 startWatches 屬性做了一層封裝,該屬性是一個 watchDescription 佇列,一個 watchDescription 包含了所有需要 watch 的資訊:
// pkg/internal/controller/controller.go // watchDescription contains all the information necessary to start a watch. // watchDescription 包含所有啟動 watch 操作所需的資訊 type watchDescription struct { src source.Source handler handler.EventHandler predicates []predicate.Predicate }
整個控制器中足重要的兩個函式就是 Watch 和 Start,下面我們就來分析下它們是如何實現的。
Watch 函式實現
// pkg/internal/controller/controller.go
// Watch implements controller.Controller.
func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
c.mu.Lock()
defer c.mu.Unlock()
// Inject Cache into arguments
// 注入 Cache 到引數中
if err := c.SetFields(src); err != nil {
return err
}
if err := c.SetFields(evthdler); err != nil {
return err
}
for _, pr := range prct {
if err := c.SetFields(pr); err != nil {
return err
}
}
// Controller hasn't started yet, store the watches locally and return.
// Controller 還沒有啟動,把 watches 存放到本地然後返回
//
// These watches are going to be held on the controller struct until the manager or user calls Start(...).
// 這些 watches 會被儲存到控制器結構體中,直到呼叫 Start(...)函式
if !c.Started {
c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct})
return nil
}
c.LogConstructor(nil).Info("Starting EventSource", "source", src)
// 呼叫 src 的 Start 函式
return src.Start(c.ctx, evthdler, c.Queue, prct...)
}
上面的Watch函式可以看到最終是去呼叫Source這個引數的Start函式。
Source是事件的源,如對資源物件進行 Create、Update、Delete操作,需要由event.EventHandlers
將reconcile.Requests
入佇列進行處理。
- 使用 Kind 來處理來自叢集的事件(如Pod建立、Pod更新、Deployment更新)
- 使用 Channel 來處理來自叢集外部的事件(如Github Webhook回撥、輪訓外部URL)
// pkg/source/source.go
type Source interface {
// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
// to enqueue reconcile.Requests.
// Start 是一個內部函式
// 只應該由Controller 呼叫,向 Informer 註冊一個 EventHandler
// 將 reconcile.Request 放入佇列
Start(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
}
我們可以看到source.Source
是一個介面,它是Controller.Watch
的一個引數,所以要看具體的看Source.Start
函式是如何實現的,我們需要去看傳入Controller.Watch
的引數,在controller-runtime中呼叫控制器的Watch函式的入口實際上位於pkg/builder/controller.go
檔案的doWatch()
函式:
// pkg/builder/controller.go
func (blder *Builder) doWatch() error {
// Reconcile type
src := &source.Kind{Type: blder.forInput.object}
hdler := &handler.EnqueueRequestForObject{}
allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
err := blder.ctrl.Watch(src, hdler, allPredicates...)
if err != nil {
return err
}
......
return nil
}
可以看到Watch的第一個變數是一個source.Kind
的型別,該結構體就實現了上面的source.Source
介面:
// pkg/source/source.go
// Kind is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create).
// Kind 用於提供來自叢集內部的事件源,這些事件來自於 Watches(例如 Pod Create 事件)
type Kind struct {
// Type is the type of object to watch. e.g. &v1.Pod{}
// Type 是 watch 物件的型別,比如 &v1.Pod{}
Type client.Object
// cache used to watch APIs
// cache 用於 watch 的 APIs 介面
cache cache.Cache
// started may contain an error if one was encountered during startup. If its closed and does not
// contain an error, startup and syncing finished.
// 如果在啟動過程中遇到錯誤,started可能會包含錯誤。
// 如果其已關閉且不包含錯誤,則啟動和同步已完成。
started chan error
startCancel func()
}
// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
// to enqueue reconcile.Requests.
// Start 是內部的,只應由控制器呼叫,以便向Informer 註冊 EventHandler,使其進入 reconcile.Requests的佇列
func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface,
prct ...predicate.Predicate) error {
// Type should have been specified by the user.
// Type 在使用之前必須提前指定
if ks.Type == nil {
return fmt.Errorf("must specify Kind.Type")
}
// cache should have been injected before Start was called
// cache 也是需要在呼叫 Start 之前被注入
if ks.cache == nil {
return fmt.Errorf("must call CacheInto on Kind before calling Start")
}
// cache.GetInformer will block until its context is cancelled if the cache was already started and it can not
// sync that informer (most commonly due to RBAC issues).
// 如果快取已啟動且無法同步該 informer(通常是由於RBAC問題),則將一直阻止,直到其上下文被取消。
ctx, ks.startCancel = context.WithCancel(ctx)
ks.started = make(chan error)
go func() {
var (
i cache.Informer
lastErr error
)
// Tries to get an informer until it returns true,
// an error or the specified context is cancelled or expired.
// 嘗試獲取 informer 直到其返回true、返回錯誤、ctx 被取消或者過期
if err := wait.PollImmediateUntilWithContext(ctx, 10*time.Second, func(ctx context.Context) (bool, error) {
// Lookup the Informer from the Cache and add an EventHandler which populates the Queue
// 從 Cache 中獲取 Informer,並新增一個事件處理程式到佇列
i, lastErr = ks.cache.GetInformer(ctx, ks.Type)
if lastErr != nil {
kindMatchErr := &meta.NoKindMatchError{}
switch {
case errors.As(lastErr, &kindMatchErr):
log.Error(lastErr, "if kind is a CRD, it should be installed before calling Start",
"kind", kindMatchErr.GroupKind)
case runtime.IsNotRegisteredError(lastErr):
log.Error(lastErr, "kind must be registered to the Scheme")
default:
log.Error(lastErr, "failed to get informer from cache")
}
return false, nil // Retry.
}
return true, nil
}); err != nil {
if lastErr != nil {
ks.started <- fmt.Errorf("failed to get informer from cache: %w", lastErr)
return
}
ks.started <- err
return
}
i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
if !ks.cache.WaitForCacheSync(ctx) {
// Would be great to return something more informative here
ks.started <- errors.New("cache did not sync")
}
close(ks.started)
}()
return nil
}
從上面的具體實現我們就可以看出來 Controller.Watch 函式就是實現的獲取資源物件的 Informer 以及註冊事件監聽函式。
Informer 是通過 cache 獲取的,cache 是在呼叫 Start 函式之前注入進來的,這裡其實我們不用太關心;
下面的 AddEventHandler 函式中是一個 internal.EventHandler 結構體,那這個結構體比如會實現 client-go 中提供的 ResourceEventHandler 介面,也就是我們熟悉的 OnAdd、OnUpdate、OnDelete 幾個函式:
// pkg/source/internal/eventsource.go
// EventHandler adapts a handler.EventHandler interface to a cache.ResourceEventHandler interface.
// EventHandler 實現了 cache.ResourceEventHandler 介面
type EventHandler struct {
EventHandler handler.EventHandler
Queue workqueue.RateLimitingInterface
Predicates []predicate.Predicate
}
// OnAdd creates CreateEvent and calls Create on EventHandler.
// OnAdd建立CreateEvent並在EventHandler上呼叫Create。
func (e EventHandler) OnAdd(obj interface{}) {
// kubernetes 物件被建立的事件
c := event.CreateEvent{}
// Pull Object out of the object
// 斷言 runtime.Object
if o, ok := obj.(client.Object); ok {
c.Object = o
} else {
log.Error(nil, "OnAdd missing Object",
"object", obj, "type", fmt.Sprintf("%T", obj))
return
}
// Predicates 用於事件過濾,迴圈呼叫 Predicates 的 Create 函式
for _, p := range e.Predicates {
if !p.Create(c) {
return
}
}
// Invoke create handler
// 呼叫 EventHandler 的 Create 函式
e.EventHandler.Create(c, e.Queue)
}
// OnUpdate creates UpdateEvent and calls Update on EventHandler.
// OnUpdate 建立UpdateEvent並在EventHandler上呼叫Update。
func (e EventHandler) OnUpdate(oldObj, newObj interface{}) {
// 更新事件
u := event.UpdateEvent{}
if o, ok := oldObj.(client.Object); ok {
u.ObjectOld = o
} else {
log.Error(nil, "OnUpdate missing ObjectOld",
"object", oldObj, "type", fmt.Sprintf("%T", oldObj))
return
}
// Pull Object out of the object
if o, ok := newObj.(client.Object); ok {
u.ObjectNew = o
} else {
log.Error(nil, "OnUpdate missing ObjectNew",
"object", newObj, "type", fmt.Sprintf("%T", newObj))
return
}
for _, p := range e.Predicates {
if !p.Update(u) {
return
}
}
// Invoke update handler
e.EventHandler.Update(u, e.Queue)
}
// OnDelete creates DeleteEvent and calls Delete on EventHandler.
func (e EventHandler) OnDelete(obj interface{}) {
d := event.DeleteEvent{}
// Deal with tombstone events by pulling the object out. Tombstone events wrap the object in a
// DeleteFinalStateUnknown struct, so the object needs to be pulled out.
// Copied from sample-controller
// This should never happen if we aren't missing events, which we have concluded that we are not
// and made decisions off of this belief. Maybe this shouldn't be here?
var ok bool
if _, ok = obj.(client.Object); !ok {
// If the object doesn't have Metadata, assume it is a tombstone object of type DeletedFinalStateUnknown
// 假設物件沒有 Metadata,假設是一個 DeletedFinalStateUnknown 型別的物件
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
log.Error(nil, "Error decoding objects. Expected cache.DeletedFinalStateUnknown",
"type", fmt.Sprintf("%T", obj),
"object", obj)
return
}
// Set obj to the tombstone obj
obj = tombstone.Obj
}
// Pull Object out of the object
if o, ok := obj.(client.Object); ok {
d.Object = o
} else {
log.Error(nil, "OnDelete missing Object",
"object", obj, "type", fmt.Sprintf("%T", obj))
return
}
for _, p := range e.Predicates {
if !p.Delete(d) {
return
}
}
// Invoke delete handler
e.EventHandler.Delete(d, e.Queue)
}
上面的EventHandler結構體實現了client-go中的ResourceEventHandler
介面,實現過程中我們可以看到呼叫了Predicates
進行事件過濾,過濾後才是真正的事件處理,不過真正的事件處理也不是在這裡實現的,而是通過Controller.Watch
函式傳遞進來的handler.EventHandler
處理的,這個函式通過前面的doWatch()函式可以看出它是一個&handler.EnqueueRequestForObject{}
物件,所以真正的事件處理邏輯是這個函式去實現的:
// pkg/handler/enqueue.go
// EnqueueRequestForObject enqueues a Request containing the Name and Namespace of the object that is the source of the Event.
// (e.g. the created / deleted / updated objects Name and Namespace). handler.EnqueueRequestForObject is used by almost all
// Controllers that have associated Resources (e.g. CRDs) to reconcile the associated Resource.
// EnqueueRequestForObject 是一個包含了作為事件源的物件的 Name 和 Namespace 的入佇列的 Request
//(例如,created/deleted/updated 物件的 Name 和 Namespace)
// handler.EnqueueRequestForObject 幾乎被所有關聯資源(如 CRD)的控制器使用,以協調關聯的資源
type EnqueueRequestForObject struct{}
// Create implements EventHandler.
// Create 函式的實現
func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
if evt.Object == nil {
enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
return
}
// 新增一個 Request 物件到工作佇列
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.Object.GetName(),
Namespace: evt.Object.GetNamespace(),
}})
}
// Update implements EventHandler.
// Update 函式實現
func (e *EnqueueRequestForObject) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
switch {
// 如果新的物件不為空,新增到工作佇列中
case evt.ObjectNew != nil:
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.ObjectNew.GetName(),
Namespace: evt.ObjectNew.GetNamespace(),
}})
// 如果舊的物件存在,新增到工作佇列中
case evt.ObjectOld != nil:
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.ObjectOld.GetName(),
Namespace: evt.ObjectOld.GetNamespace(),
}})
default:
enqueueLog.Error(nil, "UpdateEvent received with no metadata", "event", evt)
}
}
// Delete implements EventHandler.
// Delete 函式的實現
func (e *EnqueueRequestForObject) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
if evt.Object == nil {
enqueueLog.Error(nil, "DeleteEvent received with no metadata", "event", evt)
return
}
// 因為前面關於物件的刪除狀態已經處理了,所以這裡直接放入佇列中即可
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.Object.GetName(),
Namespace: evt.Object.GetNamespace(),
}})
}
通過 EnqueueRequestForObject
的 Create/Update/Delete 實現可以看出我們放入到工作佇列中的元素不是以前預設的元素唯一的 KEY,而是經過封裝的 reconcile.Request
物件,當然通過這個物件也可以很方便獲取物件的唯一標識 KEY。
總結起來就是 Controller.Watch 函式就是來實現之前自定義控制器中的 Informer 初始化以及事件監聽函式的註冊。
Start 函式的實現
上面我們分析了控制器的 Watch 函式的實現,下面我們來分析另外一個重要的函式Controller.Start
函式的實現。
// pkg/internal/controller/controller.go
// Start implements controller.Controller.
func (c *Controller) Start(ctx context.Context) error {
// use an IIFE to get proper lock handling
// but lock outside to get proper handling of the queue shutdown
c.mu.Lock()
// 先判斷控制器是否已經啟動了,如果是直接返回錯誤
if c.Started {
return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times")
}
c.initMetrics()
// Set the internal context.
// 設定內部的 ctx
c.ctx = ctx
// 呼叫 MakeQueue()函式生成工作佇列
c.Queue = c.MakeQueue()
go func() {
<-ctx.Done()
c.Queue.ShutDown()
}()
wg := &sync.WaitGroup{}
err := func() error {
defer c.mu.Unlock()
// TODO(pwittrock): Reconsider HandleCrash
defer utilruntime.HandleCrash()
// NB(directxman12): launch the sources *before* trying to wait for the
// caches to sync so that they have a chance to register their intendeded
// caches.
// NB(directxman12): 在試圖等待快取同步之前啟動 sources
// 這樣它們有機會註冊它們的目標快取
for _, watch := range c.startWatches {
c.LogConstructor(nil).Info("Starting EventSource", "source", fmt.Sprintf("%s", watch.src))
if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
return err
}
}
// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
// 啟動 SharedIndexInformer 工廠,開始填充 SharedIndexInformer 快取
c.LogConstructor(nil).Info("Starting Controller")
for _, watch := range c.startWatches {
syncingSource, ok := watch.src.(source.SyncingSource)
if !ok {
continue
}
if err := func() error {
// use a context with timeout for launching sources and syncing caches.
sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
defer cancel()
// WaitForSync waits for a definitive timeout, and returns if there
// is an error or a timeout
// 等待 Informer 同步完成
if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)
c.LogConstructor(nil).Error(err, "Could not wait for Cache to sync")
return err
}
return nil
}(); err != nil {
return err
}
}
// All the watches have been started, we can reset the local slice.
//
// We should never hold watches more than necessary, each watch source can hold a backing cache,
// which won't be garbage collected if we hold a reference to it.
// 所有的 watches 已經重啟,充值
c.startWatches = nil
// Launch workers to process resources
// 啟動 workers 來處理資源
c.LogConstructor(nil).Info("Starting workers", "worker count", c.MaxConcurrentReconciles)
wg.Add(c.MaxConcurrentReconciles)
for i := 0; i < c.MaxConcurrentReconciles; i++ {
go func() {
defer wg.Done()
// Run a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the reconcileHandler is never invoked concurrently with the same object.
for c.processNextWorkItem(ctx) {
}
}()
}
c.Started = true
return nil
}()
if err != nil {
return err
}
<-ctx.Done()
c.LogConstructor(nil).Info("Shutdown signal received, waiting for all workers to finish")
wg.Wait()
c.LogConstructor(nil).Info("All workers finished")
return nil
}
Start
函式和自定義控制器中的啟動迴圈比較類似,都是先等待資源物件的informer同步完成,然後啟動workers來處理資源物件,而且worker函式都是一樣的實現方式:
// pkg/internal/controller/controller.go
// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the reconcileHandler.
// processNextWorkItem將通過呼叫reconcileHandler從工作佇列中讀取單個工作項,並嘗試對其進行處理。
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
// 從佇列中彈出元素
obj, shutdown := c.Queue.Get()
if shutdown {
// Stop working
// 佇列關閉了,直接返回 false
return false
}
// We call Done here so the workqueue knows we have finished
// processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do
// not call Forget if a transient error occurs, instead the item is
// put back on the workqueue and attempted again after a back-off
// period.
// 我們在此處呼叫Done,以便工作佇列知道我們已完成對該項的處理。
// 如果不希望此工作項重新排隊,我們還必須記住呼叫“Forget”。
// 例如,如果發生暫時性錯誤,我們不會呼叫“Forget”,而是將該項放回工作佇列,並在一段退避期後再次嘗試。
defer c.Queue.Done(obj)
ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1)
defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1)
// 呼叫 reconcileHandler 進行元素處理
c.reconcileHandler(ctx, obj)
return true
}
func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {
// Update metrics after processing each item
// 處理完每個元素後更新指標
reconcileStartTS := time.Now()
defer func() {
c.updateMetrics(time.Since(reconcileStartTS))
}()
// Make sure that the object is a valid request.
// 確保物件是一個有效的 request 物件
req, ok := obj.(reconcile.Request)
if !ok {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
// 工作佇列中的元素無效,則呼叫 Forget 函式
// 歐澤會進入一個迴圈嘗試處理一個無效的元素
c.Queue.Forget(obj)
c.LogConstructor(nil).Error(nil, "Queue item was not a Request", "type", fmt.Sprintf("%T", obj), "value", obj)
// Return true, don't take a break
return
}
log := c.LogConstructor(&req)
log = log.WithValues("reconcileID", uuid.NewUUID())
ctx = logf.IntoContext(ctx, log)
// RunInformersAndControllers the syncHandler, passing it the Namespace/Name string of the
// resource to be synced.
// RunInformersAndControllers 的 syncHandler,傳遞給它要同步的資源的 namespace/name 的字串
// 呼叫Reconciler 函式來處理這個元素。也就是我們真正去編寫業務邏輯的地方
result, err := c.Reconcile(ctx, req)
switch {
case err != nil:
// 如果業務邏輯處理出錯,則重新新增到限速佇列中
c.Queue.AddRateLimited(req)
// Metrics 指標記錄
ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc()
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Inc()
log.Error(err, "Reconciler error")
// 如果調協函式 Reconcile 處理結果中包含大於0的RequestAfter
case result.RequeueAfter > 0:
// The result.RequeueAfter request will be lost, if it is returned
// along with a non-nil error. But this is intended as
// We need to drive to stable reconcile loops before queuing due
// to result.RequestAfter
// 需要注意如果 result.RequeueAfter 與一個非 nil 的錯誤一起返回,則 result.RequeueAfter會丟失
// 忘記元素
c.Queue.Forget(obj)
// 加入佇列
c.Queue.AddAfter(req, result.RequeueAfter)
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Inc()
case result.Requeue:
// 加入佇列
c.Queue.AddRateLimited(req)
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Inc()
default:
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
// 最後如果沒有發生錯誤,我們就會 Forget 這個元素
// 這樣直到傳送另一個變化它就不會再被排隊了
c.Queue.Forget(obj)
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelSuccess).Inc()
}
}
上面的reconcileHandler
函式就是我們真正執行元素業務處理的地方,函式中包含了事件處理以及錯誤處理,真正的事件處理是通過c.Do.Reconcile(req)
暴露給開發者的,所以對於開發者來說,只需要在Reconcile
函式中去處理業務邏輯就可以了。
根據c.Do.Reconcile(req)
函式的返回值來判斷是否將元素重新加入佇列中進行處理:
- 如果返回 error 錯誤,則將元素重新新增到限速佇列中
- 如果返回的
result.RequeueAfter>0
,則先將元素忘記,然後在result.RequeueAfter
時間後加入佇列中 - 如果返回
result.Requeue
,則直接將元素重新加入到限速佇列中 - 如果正常返回,則忘記這個元素