1. 程式人生 > >Kubernetes26--彈性伸縮--HPA原始碼--HPA執行過程

Kubernetes26--彈性伸縮--HPA原始碼--HPA執行過程

HPA由HPAController控制實現,啟動程式碼   kubernetes/cmd/kube-controller-manager/app/controllermanager.go

func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc
controllers["horizontalpodautoscaling"] = startHPAController
func startHPAController(ctx ControllerContext) (http.Handler, bool, error)

構建MetricsClient

func startHPAControllerWithRESTClient(ctx ControllerContext) (http.Handler, bool, error)
func startHPAControllerWithLegacyClient(ctx ControllerContext) (http.Handler, bool, error)

由MetricsClient啟動HPAController

func startHPAControllerWithMetricsClient(ctx ControllerContext, metricsClient metrics.MetricsClient) (http.Handler, bool, error)

構建HorizontalController

go podautoscaler.NewHorizontalController(
		hpaClient.CoreV1(),
		scaleClient,
		hpaClient.AutoscalingV1(),
		ctx.RESTMapper,
		metricsClient,
		ctx.InformerFactory.Autoscaling().V1().HorizontalPodAutoscalers(),
		ctx.InformerFactory.Core().V1().Pods(),
		ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration,
		ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerDownscaleStabilizationWindow.Duration,
		ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerTolerance,
		ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerCPUInitializationPeriod.Duration,
		ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerInitialReadinessDelay.Duration,
	).Run(ctx.Stop)

研究一下具體的Run控制過程

// Run begins watching and syncing.
func (a *HorizontalController) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	defer a.queue.ShutDown()

	klog.Infof("Starting HPA controller")
	defer klog.Infof("Shutting down HPA controller")

	if !controller.WaitForCacheSync("HPA", stopCh, a.hpaListerSynced, a.podListerSynced) {
		return
	}

	// start a single worker (we may wish to start more in the future)
	go wait.Until(a.worker, time.Second, stopCh)

	<-stopCh
}

defer是golang語言中的關鍵字,用於資源的釋放,會在函式返回之前進行呼叫。

如果系統發生panic,則執行相應處理程式並且釋放資源。有關linux kernel panic概念

https://blog.csdn.net/dake_160413/article/details/64443274

defer utilruntime.HandleCrash()

RateLimitingInterface介面

defer a.queue.ShutDown()
queue workqueue.RateLimitingInterface

等待快取同步

if !controller.WaitForCacheSync("HPA", stopCh, a.hpaListerSynced, a.podListerSynced) {
		return
	}

監聽同步物件有兩種  HPA  Pod

// hpaLister is able to list/get HPAs from the shared cache from the informer passed in to
	// NewHorizontalController.
	hpaLister       autoscalinglisters.HorizontalPodAutoscalerLister
	hpaListerSynced cache.InformerSynced

	// podLister is able to list/get Pods from the shared cache from the informer passed in to
	// NewHorizontalController.
	podLister       corelisters.PodLister
	podListerSynced cache.InformerSynced
func WaitForCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...cache.InformerSynced) bool
// WaitForCacheSync waits for caches to populate.  It returns true if it was successful, false
// if the controller should shutdown
func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
	err := wait.PollUntil(syncedPollPeriod,
		func() (bool, error) {
			for _, syncFunc := range cacheSyncs {
				if !syncFunc() {
					return false, nil
				}
			}
			return true, nil
		},
		stopCh)
	if err != nil {
		klog.V(2).Infof("stop requested")
		return false
	}

	klog.V(4).Infof("caches populated")
	return true
}
// InformerSynced is a function that can be used to determine if an informer has synced.  This is useful for determining if caches have synced.
type InformerSynced func() bool

Informer功能應該是觀察者模式,可以新增多個時間,等到快取更新時則通知各個時間

// SharedInformer has a shared data cache and is capable of distributing notifications for changes
// to the cache to multiple listeners who registered via AddEventHandler.

這裡等待HPA以及Pod快取更新事件

最後週期性執行worker函式

// start a single worker (we may wish to start more in the future)
	go wait.Until(a.worker, time.Second, stopCh)

	<-stopCh

Until函式   以固定的週期執行某個函式直到stop channel is closed

// Until loops until stop channel is closed, running f every period.
//
// Until is syntactic sugar on top of JitterUntil with zero jitter factor and
// with sliding = true (which means the timer for period starts after the f
// completes).
func Until(f func(), period time.Duration, stopCh <-chan struct{}) {
	JitterUntil(f, period, 0.0, true, stopCh)
}

work函式

func (a *HorizontalController) worker() {
	for a.processNextWorkItem() {
	}
	klog.Infof("horizontal pod autoscaler controller worker shutting down")
}

processNextWorkItem函式

func (a *HorizontalController) processNextWorkItem() bool {
	key, quit := a.queue.Get()
	if quit {
		return false
	}
	defer a.queue.Done(key)

	err := a.reconcileKey(key.(string))
	if err == nil {
		// don't "forget" here because we want to only process a given HPA once per resync interval
		return true
	}

	a.queue.AddRateLimited(key)
	utilruntime.HandleError(err)
	return true
}

從queue中取一個元素

// Controllers that need to be synced
	queue workqueue.RateLimitingInterface

研究一下workqueue中RateLimitingInterface

// RateLimitingInterface is an interface that rate limits items being added to the queue.
type RateLimitingInterface interface {
	DelayingInterface

	// AddRateLimited adds an item to the workqueue after the rate limiter says it's ok
	AddRateLimited(item interface{})

	// Forget indicates that an item is finished being retried.  Doesn't matter whether it's for perm failing
	// or for success, we'll stop the rate limiter from tracking it.  This only clears the `rateLimiter`, you
	// still have to call `Done` on the queue.
	Forget(item interface{})

	// NumRequeues returns back how many times the item was requeued
	NumRequeues(item interface{}) int
}

其中複合了DelayingInterface   可以延時一段時間新增某個元素

// DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to
// requeue items after failures without ending up in a hot-loop.
type DelayingInterface interface {
	Interface
	// AddAfter adds an item to the workqueue after the indicated duration has passed
	AddAfter(item interface{}, duration time.Duration)
}

Interface介面

type Interface interface {
	Add(item interface{})
	Len() int
	Get() (item interface{}, shutdown bool)
	Done(item interface{})
	ShutDown()
	ShuttingDown() bool
}

具體實現在

client-go/util/workqueue/queue.go

看一下各個方法的說明

// Add marks item as needing processing.
func (q *Type) Add(item interface{}) {
// Len returns the current queue length, for informational purposes only. You
// shouldn't e.g. gate a call to Add() or Get() on Len() being a particular
// value, that can't be synchronized properly.
func (q *Type) Len() int {
// Get blocks until it can return an item to be processed. If shutdown = true,
// the caller should end their goroutine. You must call Done with item when you
// have finished processing it.
func (q *Type) Get() (item interface{}, shutdown bool) {
// Done marks item as done processing, and if it has been marked as dirty again
// while it was being processed, it will be re-added to the queue for
// re-processing.
func (q *Type) Done(item interface{}) {
// ShutDown will cause q to ignore all new items added to it. As soon as the
// worker goroutines have drained the existing items in the queue, they will be
// instructed to exit.
func (q *Type) ShutDown() {

workqueue定義了一個同步工作佇列,可以實現同步控制

回到

key, quit := a.queue.Get()
	if quit {
		return false
	}

workqueue中存放的元素是   NewHorizontalController方法中

queue:                        workqueue.NewNamedRateLimitingQueue(NewDefaultHPARateLimiter(resyncPeriod), "horizontalpodautoscaler")

DefaultHPARateLimiter

// NewDefaultHPARateLimiter creates a rate limiter which limits overall (as per the
// default controller rate limiter), as well as per the resync interval
func NewDefaultHPARateLimiter(interval time.Duration) workqueue.RateLimiter {
	return NewFixedItemIntervalRateLimiter(interval)
}
func NewFixedItemIntervalRateLimiter(interval time.Duration) workqueue.RateLimiter {
	return &FixedItemIntervalRateLimiter{
		interval: interval,
	}
}
// FixedItemIntervalRateLimiter limits items to a fixed-rate interval
type FixedItemIntervalRateLimiter struct {
	interval time.Duration
}

新建一個queue,監聽的元素為   horizontalpodautoscaler

queue:                        workqueue.NewNamedRateLimitingQueue(NewDefaultHPARateLimiter(resyncPeriod), "horizontalpodautoscaler"),
func NewNamedRateLimitingQueue(rateLimiter RateLimiter, name string) RateLimitingInterface {
	return &rateLimitingType{
		DelayingInterface: NewNamedDelayingQueue(name),
		rateLimiter:       rateLimiter,
	}
}
func NewNamedDelayingQueue(name string) DelayingInterface {
	return newDelayingQueue(clock.RealClock{}, name)
}

func newDelayingQueue(clock clock.Clock, name string) DelayingInterface {
	ret := &delayingType{
		Interface:       NewNamed(name),
		clock:           clock,
		heartbeat:       clock.NewTicker(maxWait),
		stopCh:          make(chan struct{}),
		waitingForAddCh: make(chan *waitFor, 1000),
		metrics:         newRetryMetrics(name),
	}

	go ret.waitingLoop()

	return ret
}
func newRetryMetrics(name string) retryMetrics {
	var ret *defaultRetryMetrics
	if len(name) == 0 {
		return ret
	}
	return &defaultRetryMetrics{
		retries: globalMetricsFactory.metricsProvider.NewRetriesMetric(name),
	}
}

可知queue為一個RateLimitingInterface    監聽的資料元素為horizontalpodautoscaler  metrics資料

可以使用kubectl或者yaml檔案來宣告HPA資源物件HorizontalPodAutoscaler,宣告的HPA會放入到一個queue中,然後啟動一個執行緒來不斷取出queue中HPA週期性執行

回到processNextWorkItem方法,每個週期每次取出一個佇列中首HPA來執行(可以研究一下改成多執行緒的控制),等待該HPA完成

key, quit := a.queue.Get()
	if quit {
		return false
	}
	defer a.queue.Done(key)

從佇列中取出待執行的HPA策略的metadata

執行reconcileKey方法   取出名稱空間以及hpa-name

func (a *HorizontalController) reconcileKey(key string) error {
	namespace, name, err := cache.SplitMetaNamespaceKey(key)
	if err != nil {
		return err
	}

	hpa, err := a.hpaLister.HorizontalPodAutoscalers(namespace).Get(name)
	if errors.IsNotFound(err) {
		klog.Infof("Horizontal Pod Autoscaler %s has been deleted in %s", name, namespace)
		delete(a.recommendations, key)
		return nil
	}

	return a.reconcileAutoscaler(hpa, key)
}

根據hpa name 獲取hpa策略,執行具體的演算法來控制叢集副本數量

hpa, err := a.hpaLister.HorizontalPodAutoscalers(namespace).Get(name)
	if errors.IsNotFound(err) {
		klog.Infof("Horizontal Pod Autoscaler %s has been deleted in %s", name, namespace)
		delete(a.recommendations, key)
		return nil
	}

	return a.reconcileAutoscaler(hpa, key)

HorizontalPodAutoscalerLister  監聽器模式

// HorizontalPodAutoscalerLister helps list HorizontalPodAutoscalers.
type HorizontalPodAutoscalerLister interface {
	// List lists all HorizontalPodAutoscalers in the indexer.
	List(selector labels.Selector) (ret []*v1.HorizontalPodAutoscaler, err error)
	// HorizontalPodAutoscalers returns an object that can list and get HorizontalPodAutoscalers.
	HorizontalPodAutoscalers(namespace string) HorizontalPodAutoscalerNamespaceLister
	HorizontalPodAutoscalerListerExpansion
}
// HorizontalPodAutoscalerNamespaceLister helps list and get HorizontalPodAutoscalers.
type HorizontalPodAutoscalerNamespaceLister interface {
	// List lists all HorizontalPodAutoscalers in the indexer for a given namespace.
	List(selector labels.Selector) (ret []*v1.HorizontalPodAutoscaler, err error)
	// Get retrieves the HorizontalPodAutoscaler from the indexer for a given namespace and name.
	Get(name string) (*v1.HorizontalPodAutoscaler, error)
	HorizontalPodAutoscalerNamespaceListerExpansion
}

最終返回HorizontalPodAutoscale,其對應定義了yaml檔案

最後執行演算法來調整叢集副本數量  reconcileAutoscaler

func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.HorizontalPodAutoscaler, key string) error

回到processNextWorkItem方法

func (a *HorizontalController) processNextWorkItem() bool {
	key, quit := a.queue.Get()
	if quit {
		return false
	}
	defer a.queue.Done(key)

	err := a.reconcileKey(key.(string))
	if err == nil {
		// don't "forget" here because we want to only process a given HPA once per resync interval
		return true
	}

	a.queue.AddRateLimited(key)
	utilruntime.HandleError(err)
	return true
}

如果該hpa執行成功則直接返回,否則再次進入佇列,延時執行

接下來具體研究一下hpa演算法細節reconcileAutoscaler

總結:

HorizontalController構建成功,啟動run方法

等待hpa,pod的快取監聽同步   hpa物件放入到queue中

週期性執行worker方法來取出queue中hpa物件,執行相應邏輯,執行成功繼續執行下一個hpa,執行失敗,則該hpa進入queue中延時執行