1. 程式人生 > 其它 >記一次在deployment中新增灰度暫停功能

記一次在deployment中新增灰度暫停功能

本文主要聊聊如何在k8s deployment中新增灰度暫停功能。因為是基於deployment原本支援的RollingUpdate更新方式 和 pause進行設計,所以文章中大篇幅會對deployment原始碼閱讀分析。
k8s v1.16

deployment 目前處理邏輯

首先deployment是k8s暴露給使用者的宣告式API,使用者通過定義spec(期待模板資訊) 和 replicas(例項數)來告知期望狀態, deploymentController作為控制迴圈將監聽對應資源 盡力調整為使用者期望狀態。
k8s提供多種資源,各有特定的Controller,共同包含在kube-controller-manager元件中,執行在master節點上,與apiServer通訊。
而驅動這些controller運作的重要部分為Informer,主要負責監聽api-server的物件變化後同步到cache,並交給controller.queue去處理。

如何觸發deployment更新流程

以下涉及到的主要結構體關係圖大致如下

k8s的各元件使用Cobra庫開發,入口為cmd/kube-controller-manager/controller-manager.go

	command := app.NewControllerManagerCommand()
	logs.InitLogs()
	defer logs.FlushLogs()
	
	if err := command.Execute(); err != nil {
		os.Exit(1)
	}

初始化command後,command.Execute()將執行command.Run

定義的方法,Run的部分程式碼如下:

		if err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(controllerContext.LoopMode), unsecuredMux); err != nil {
			klog.Fatalf("error starting controllers: %v", err)
		}

		controllerContext.InformerFactory.Start(controllerContext.Stop)
		close(controllerContext.InformersStarted)

以上,主要呼叫三個函式,呼叫順序依次為NewControllerInitializers()StartControllersInformerFactory.Start,逐個看下:

//step1:
// NewConrollerInitializers返回map[Type]ControllerFunc,包含所有型別控制器啟動func
// step2:
// 依次為每個型別呼叫啟動每個型別的Controller,以下為deployment的
func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {
	dc, err := deployment.NewDeploymentController(
		ctx.InformerFactory.Apps().V1().Deployments(),
		ctx.InformerFactory.Apps().V1().ReplicaSets(),
		ctx.InformerFactory.Core().V1().Pods(),
		ctx.ClientBuilder.ClientOrDie("deployment-controller"),
	)
	go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)
	return nil, true, nil
}

// step3
// 啟動總的sharedInformerFactory
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
	f.lock.Lock()
	defer f.lock.Unlock()

	for informerType, informer := range f.informers {
		if !f.startedInformers[informerType] {
			go informer.Run(stopCh)
			f.startedInformers[informerType] = true
		}
	}
}

上面step2中,關於ctx.InformerFactory.Apps().V1().Deployments()的部分,將呼叫以下,返回型別為deploymentInformer

func (v *version) Deployments() DeploymentInformer {
	return &deploymentInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}

外層NewDeploymentController又呼叫以下,

	dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    dc.addDeployment,
		UpdateFunc: dc.updateDeployment,
		// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
		DeleteFunc: dc.deleteDeployment,
	})
	// 這兩個都是func。
	dc.syncHandler = dc.syncDeployment
	dc.enqueueDeployment = dc.enqueue

先看Informer函式

// 以下為多層巢狀呼叫,不是順序呼叫
//呼叫InformerFor(),第一個引數為Deployment型別物件,第二個引數呼叫defaultInformer
func (f *deploymentInformer) Informer() cache.SharedIndexInformer {
	return f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer)
}

// defaultInformer()呼叫NewFilteredDeploymentInformer
func (f *deploymentInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
	return NewFilteredDeploymentInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}

// NewFilteredDeploymentInformer返回sharedIndexInformer型別,包括ListFunc、WatchFun的初始化
func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
	return cache.NewSharedIndexInformer(
		&cache.ListWatch{
			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.AppsV1().Deployments(namespace).List(options)
			},
			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.AppsV1().Deployments(namespace).Watch(options)
			},
		},
		&appsv1.Deployment{},
		resyncPeriod,
		indexers,
	)
}
	
//InformerFor接受第一個引數型別(例如Deployment),第二個引數newFunc(建立cache.SharedIndexInformer)。
// 功能為 根據newFunc為Deployment建立特有的SharedIndexInformer,並將Map對應存入sharedInformerFactoy.informers
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
	f.lock.Lock()
	defer f.lock.Unlock()

	informerType := reflect.TypeOf(obj)
	informer, exists := f.informers[informerType]
	if exists {
		return informer
	}

	resyncPeriod, exists := f.customResync[informerType]
	if !exists {
		resyncPeriod = f.defaultResync
	}

	informer = newFunc(f.client, resyncPeriod)
	f.informers[informerType] = informer

	return informer
}

再看下AddEventHandler(),將呼叫AddEventHandlerWithResyncPeriod

// 如下,在Informer初始化時,已經向 sharedIndexInformer.processor.listeners[]中添加了回撥函式
func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
	// 將返回processorListener,其中p.handler為引數幾種回撥函式(AddFunc/DeleteFunc/UpdateFunc)
	listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
	s.processor.addListener(listener)
}

上面step2中,下一步是dc.Run,主要程式碼為

	for i := 0; i < workers; i++ {
		go wait.Until(dc.worker, time.Second, stopCh)
	}

dc.worker呼叫以下

func (dc *DeploymentController) processNextWorkItem() bool {
	key, quit := dc.queue.Get()
	if quit {
		return false
	}
	defer dc.queue.Done(key)

	err := dc.syncHandler(key.(string))
	dc.handleErr(err, key)

	return true
}

關於Informer啟動

最後看下step3中的sharedInformerFactory.Start(),會對每個informer執行Run()。這裡涉及到Informer的結構與功能

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
    //step1. 初始化sharedIndexInformer.Controller
	fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
	cfg := &Config{
		Queue:            fifo,
		ListerWatcher:    s.listerWatcher,
		ObjectType:       s.objectType,
		FullResyncPeriod: s.resyncCheckPeriod,
		RetryOnError:     false,
		ShouldResync:     s.processor.shouldResync,
		Process: s.HandleDeltas,
	}

	func() {
		s.controller = New(cfg)
		s.controller.(*controller).clock = s.clock
		s.started = true
	}()

	processorStopCh := make(chan struct{})
	var wg wait.Group
	defer wg.Wait()              // Wait for Processor to stop
	defer close(processorStopCh) // Tell Processor to stop
	// step2. 執行啟動mutationDetector.Run,以processorStopCh為退出標誌
	wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
	// step3. 執行processor.Run,以processorStopCh為退出標誌
	wg.StartWithChannel(processorStopCh, s.processor.run)

	defer func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()
		s.stopped = true // Don't want any new listeners
	}()
	// step4. 執行啟動controller.Run, 以stopCh為退出標誌
	s.controller.Run(stopCh)
}

先看step1,初始化sharedIndexInformer.controller

// keyFunc()引數為函式 根據物件返回ns/name資訊
// knownObjects引數為informer.indexer 其初始化為NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)將建立一個cache物件
/*
cache{
		cacheStorage: NewThreadSafeStore(indexers, Indices{}),
		keyFunc:      keyFunc,
	}
*/	
func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
	f := &DeltaFIFO{
		items:        map[string]Deltas{},
		queue:        []string{},
		keyFunc:      keyFunc,
		knownObjects: knownObjects,
	}
	f.cond.L = &f.lock
	return f
}

cfg := &Config{
		Queue:            fifo,              //step1初始化的deltaFIFO
		ListerWatcher:    s.listerWatcher,  //初始化時是某型別的List/Watch方法
		ObjectType:       s.objectType,     //型別
		FullResyncPeriod: s.resyncCheckPeriod, //cache資料全量重入一次佇列的時間間隔?
		RetryOnError:     false,
		ShouldResync:     s.processor.shouldResync,
		Process: s.HandleDeltas,
	}
	s.controller = New(cfg)    //controller裡基本只包含config 
	

// config.Process處理函式功能如下
/* 處理obj中儲存的需要處理的物件
    1. 處理型別為sync/add/update時,如果物件存在於cache,則取出並更新。 如果不存在,則新增到cache。 處理型別為delete時,直接刪除cache物件
    2. 然後都呼叫distribute,把處理物件新增到sharedIndexInformer.sharedProcesser.listener陣列中每個元素的addCh中
*/
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
	// from oldest to newest
	for _, d := range obj.(Deltas) {
		switch d.Type {
		case Sync, Added, Updated:
			isSync := d.Type == Sync
			s.cacheMutationDetector.AddObject(d.Object)
			if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
				s.indexer.Update(d.Object)
				s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
			} else {
				err := s.indexer.Add(d.Object)				           
				s.processor.distribute(addNotification{newObj: d.Object}, isSync)
			}
		case Deleted:
			s.indexer.Delete(d.Object)
			s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
		}
	}
	return nil
}

在看step2. 執行啟動mutationDetector.Run,以processorStopCh為退出標誌
這個沒太明白是什麼

然後看step3,呼叫processor.Run()

func (p *sharedProcessor) run(stopCh <-chan struct{}) {
	func() {
		for _, listener := range p.listeners {
			p.wg.Start(listener.run)
			p.wg.Start(listener.pop)
		}
		p.listenersStarted = true
	}()
	<-stopCh

// listener.run如下,將for迴圈獲取processLister.nextCh的內容,並根據事件型別呼叫對應的回撥函式
func (p *processorListener) run() {
	wait.Until(func() {
		err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
			for next := range p.nextCh {
				switch notification := next.(type) {
				case updateNotification:
					p.handler.OnUpdate(notification.oldObj, notification.newObj)
				case addNotification:
					p.handler.OnAdd(notification.newObj)
				case deleteNotification:
					p.handler.OnDelete(notification.oldObj)
				default:
					utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
				}
			}
			return true, nil
		})
		// the only way to get here is if the p.nextCh is empty and closed
		if err == nil {
			close(stopCh)
		}
	}, 1*time.Minute, stopCh)
}

最後是step4,s.controller.Run()。informer的主要執行邏輯都在這裡

func (c *controller) Run(stopCh <-chan struct{}) {
   ...
   // reflector儲存了之前config中初始化的部分
   r := NewReflector(
   	c.config.ListerWatcher,
   	c.config.ObjectType,
   	c.config.Queue,
   	c.config.FullResyncPeriod,
   )
   c.reflector = r
   // r.Run中主要是呼叫ListWatcher介面,也是單起協程來迴圈處理的
   wg.StartWithChannel(stopCh, r.Run)
   // 主要從之前定義的佇列中或者item並根據註冊的處理方法處理
   wait.Until(c.processLoop, time.Second, stopCh)
}

// 關於r.Run,最終呼叫
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) 

// 單獨看下c.ProcessLoop函式
// 如下,將開啟for迴圈,從queue中pop物件並對其執行c.config.Process函式(即初始化時註冊的 HandleDeltas)
func (c *controller) processLoop() {
   for {
   	obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
      ...
   }
}

總結(以deploymentController為例):
kube-controller-manager啟動為:
1. 依次初始化各型別的controller,controller中會向全域性sharedInformerFactory註冊一些關注的Informer,例如deploymentController啟動時會註冊DeploymentInformer、ReplicasSetInformer、PodInformer三種。 其他controller如果需要,則無需重複註冊。
2. 啟動controller,啟動一個loop迴圈執行processNextWorkItem,即從deployment.queue中獲取item,並呼叫syncHandler處理(syncHandler被初始化為syncDeployment函式)
3. 啟動informer,informer中包含兩個重要部分
1) controller
啟動reflector, 主要工作是呼叫List介面更新一次cache(在資料量大時,這裡會做切片),然後迴圈呼叫watcher獲取物件變更資訊經過hash處理後存入deltaqueue。繼而啟動controller.processLoop,主要工作從deltafifo拿出節點執行HandlerDeltas。HandlerDeltas一則將資料更新到cache, 二則將資料分發給processor
1) sharedProcessor
processor這邊由addChannal接收來自controller分發的資料,processor中有使用者註冊多種型別Event的回撥處理函式。啟動prcessor.run中,將不斷從addChannal中 獲取資料,並新增到buffer中。 另一個select從buffer中取資料後,呼叫已註冊的相應的回撥函式。 這些回撥函式基本都有一個共同的操作就是呼叫enqueueDeployment()將deployment物件資訊入隊到deploy.queue中,供第2部分邏輯pop執行sync.

syncDeployment 同步邏輯

syncDeployment程式碼閱讀
(其中會講到 滾動更新過程的步長計算邏輯)

如何在deploy中新增灰度暫停

看這裡之前請讀清楚上面內容
如上,deploymentController將對每個更新後的deployment物件執行syncDeployment,其中有程式碼:

func (dc *DeploymentController) syncDeployment(key string) error {
	...
    //暫停態時,執行sync同步狀態
	if d.Spec.Paused {
		return dc.sync(d, rsList)
	}
	...
	//根據兩種釋出策略檢查並更新deployment到最新狀態
	switch d.Spec.Strategy.Type {
	case apps.RecreateDeploymentStrategyType:
		return dc.rolloutRecreate(d, rsList, podMap)
	case apps.RollingUpdateDeploymentStrategyType:
		return dc.rolloutRolling(d, rsList)
	}

滾動更新是一個多次滾動的過程,一個deployment的滾動更新通常會被多次執行syncDeployment,由程式碼又知:如果遇到deployment.spec.paused標誌,將執行return dc.sync()從而不會進行下一次步長更新。

所以這次的灰度暫停,設計思路為:使用者通過deployment.annotation設定期望灰度值,在到達灰度期望值後,設定paused來阻止下一次步長更新。

初版設計及測試

灰度數量通過annotation指定,下面函式獲取灰度值
pkg/controller/deployment/util/deployment_util.go中新增邏輯

func Canary(deployment apps.Deployment) int32 {
	//TODO 註釋規範化 canaryStr支援數字和百分號
	canaryStr := deployment.Annotations["canary"]
	canary, _ := strconv.ParseInt(canaryStr, 10, 64)
	return int32(canary)
}

在計算擴容數量時加入下列程式碼,會同時參考灰度期望值,保證本次擴容數量不超過灰度期望值。
pkg/controller/deploy/rolling.go新增

//rolloutRolling函式新增
if deploymentutil.IsCanaryComplete(d, allRSs, newRS) {
		if err := dc.CanaryPauseDeployment(d); err != nil {
			return err
		}
	}

//scaleDownOldReplicaSetsForRollingUpdate函式新增。如果newRs到達灰度值,那麼需要調整maxUnavailable為0,防止舊例項縮容太多。(目的為:灰度暫停後,新例項+舊例項=期望例項數)
newRs := deploymentutil.FindNewReplicaSet(deployment, allRSs); if newRs != nil {
		if *(newRs.Spec.Replicas) == deploymentutil.Canary(*deployment) {
				maxUnavailable = 0
			}
		}

pkg/controller/deployment/util/deployment_util.go

//NewRSNewReplicas函式新增。計算newRs擴容數量時,參考灰度值。
	// TODO(完成) canary和計算值間選擇較小的
		canary := Canary(*deployment)
		oldActiveRs, _ := FindOldReplicaSets(deployment, allRSs)
		if canary != 0 && len(oldActiveRs) > 0 && canary < *(deployment.Spec.Replicas) && *(newRS.Spec.Replicas) <= canary {
			scaleUpCount = int32(integer.IntMin(int(scaleUpCount), int(canary-*(newRS.Spec.Replicas))))
		}

// 判斷達到灰度完成條件
func IsCanaryComplete(deployment *apps.Deployment, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet) bool {
	var isCanaryComplete = false

	canaryCount := Canary(*deployment)
	if deployment.Spec.Strategy.Type == apps.RollingUpdateDeploymentStrategyType && canaryCount != 0 {
		ReplicasCount := GetReplicaCountForReplicaSets(allRSs)
		if *(newRS.Spec.Replicas) == canaryCount && ReplicasCount == *deployment.Spec.Replicas {
			isCanaryComplete = true
		}
	}
	return isCanaryComplete
}

最後在達到灰度條件時,打暫停標誌

pkg/controller/deployment/sync.go新增

//新增函式
func (dc *DeploymentController) CanaryPauseDeployment(deployment *apps.Deployment) error {
	dpCopy := deployment.DeepCopy()
	dpCopy.Spec.Paused = true
	_, err := dc.client.AppsV1().Deployments(dpCopy.Namespace).Update(dpCopy)
	if err == nil {
		dc.eventRecorder.Eventf(deployment, v1.EventTypeNormal, "CanaryPauseDeployment", "Pause Deployment %s", deployment.Name)
	}
	return err
}

遇到問題及原因(sync函式引起的比例擴縮問題)

在以上改動後,重新編譯運行了kube-controller-manager元件,此時 kubectl edit deployment的模板資訊使其滾動更新。

此時例項數為10,maxSurge為2,maxUavalible為1,灰度數量為3.
期望狀態為: 新例項為3,舊例項為7, deployment.spec.paused為true
實際狀態為: 新例項為3,舊例項為9,deployment.spec.paused為true

重讀程式碼,發現是在暫停發起後,

錯誤原因解決方案

pkg/controller/deployment/sync.go新增

//減去邏輯 scale函式
allRSsReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs)

		allowedSize := int32(0)
		if *(deployment.Spec.Replicas) > 0 {
			allowedSize = *(deployment.Spec.Replicas) + deploymentutil.MaxSurge(*deployment)

deploymentReplicasToAdd := allowedSize - allRSsReplicas

//新增邏輯
//TODO(完成) allowedSize設定為目前deployment數-replicas參考數
		var deploymentReplicasToAdd int32
		oldDesired, ok := deploymentutil.GetDesiredReplicasAnnotation(newRS)
		if ok {
			deploymentReplicasToAdd = *(deployment.Spec.Replicas) - oldDesired
		} else {
			deploymentReplicasToAdd = 0
		}

statefulSet灰度對比 partation實現