Kubernetes28--彈性伸縮--HPA原始碼--控制演算法分析1
程式碼位置 kubernetes/pkg/controller/podautoscaler/horizontal.go
HPA工作機制:
構建HorizontalController,啟動Run方法
func (a *HorizontalController) Run(stopCh <-chan struct{})
呼叫worker方法從hpa佇列中取出隊首hpa來執行
func (a *HorizontalController) worker()
func (a *HorizontalController) processNextWorkItem() bool
核心方法
func (a *HorizontalController) processNextWorkItem() bool { key, quit := a.queue.Get() if quit { return false } defer a.queue.Done(key) err := a.reconcileKey(key.(string)) if err == nil { // don't "forget" here because we want to only process a given HPA once per resync interval return true } a.queue.AddRateLimited(key) utilruntime.HandleError(err) return true }
從佇列中取出一個HPA key,執行該HPA策略,如果執行成功直接返回,執行失敗,則延時重新加入到佇列中
func (a *HorizontalController) reconcileKey(key string) error
控制演算法入口函式
func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.HorizontalPodAutoscaler, key string) error
函式傳參傳入結構體autoscalingv1.HorizontalPodAutoscaler,可知與hpa yaml檔案相對應
apiVersion: autoscaling/v1
kind: HorizontalPodAutoscaler
metadata:
name: my-app-hpa
namespace: default
spec:
scaleTargetRef:
apiVersion: extensions/v1beta1
kind: Deployment
name: hpa-ds
minReplicas: 1
maxReplicas: 10
targetCPUUtilizationPercentage: 50
將傳入物件複製一份,然後將v1版本轉換為v2版本格式
// make a copy so that we never mutate the shared informer cache (conversion can mutate the object)
hpav1 := hpav1Shared.DeepCopy()
// then, convert to autoscaling/v2, which makes our lives easier when calculating metrics
hpaRaw, err := unsafeConvertToVersionVia(hpav1, autoscalingv2.SchemeGroupVersion)
if err != nil {
a.eventRecorder.Event(hpav1, v1.EventTypeWarning, "FailedConvertHPA", err.Error())
return fmt.Errorf("failed to convert the given HPA to %s: %v", autoscalingv2.SchemeGroupVersion.String(), err)
}
轉換為v2版本hpa
hpa := hpaRaw.(*autoscalingv2.HorizontalPodAutoscaler)
v2版本的HorizontalPodAutoscaler物件
type HorizontalPodAutoscaler struct {
metav1.TypeMeta `json:",inline"`
// metadata is the standard object metadata.
// More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#metadata
// +optional
metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
// spec is the specification for the behaviour of the autoscaler.
// More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#spec-and-status.
// +optional
Spec HorizontalPodAutoscalerSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"`
// status is the current information about the autoscaler.
// +optional
Status HorizontalPodAutoscalerStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"`
}
儲存起始hpa狀態
hpaStatusOriginal := hpa.Status.DeepCopy()
取出hpa的namespace,ScaleTargetRef.Kind,ScaleTargetRef.Name,apiVersion等屬性對應要監測叢集的型別以及名稱,型別有replication controller, deployment or replica set.
reference := fmt.Sprintf("%s/%s/%s", hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name)
targetGV, err := schema.ParseGroupVersion(hpa.Spec.ScaleTargetRef.APIVersion)
如果解析出錯,則記錄錯誤,並且返回
if err != nil {
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetScale", err.Error())
setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedGetScale", "the HPA controller was unable to get the target's current scale: %v", err)
a.updateStatusIfNeeded(hpaStatusOriginal, hpa)
return fmt.Errorf("invalid API version in scale target reference: %v", err)
}
// setCondition sets the specific condition type on the given HPA to the specified value with the given reason
// and message. The message and args are treated like a format string. The condition will be added if it is
// not present.
func setCondition(hpa *autoscalingv2.HorizontalPodAutoscaler, conditionType autoscalingv2.HorizontalPodAutoscalerConditionType, status v1.ConditionStatus, reason, message string, args ...interface{}) {
hpa.Status.Conditions = setConditionInList(hpa.Status.Conditions, conditionType, status, reason, message, args...)
}
比較現在的hpa以及原來hpa status判斷是否需要更新
// updateStatusIfNeeded calls updateStatus only if the status of the new HPA is not the same as the old status
func (a *HorizontalController) updateStatusIfNeeded(oldStatus *autoscalingv2.HorizontalPodAutoscalerStatus, newHPA *autoscalingv2.HorizontalPodAutoscaler) error {
// skip a write if we wouldn't need to update
if apiequality.Semantic.DeepEqual(oldStatus, &newHPA.Status) {
return nil
}
return a.updateStatus(newHPA)
}
更新狀態
// updateStatus actually does the update request for the status of the given HPA
func (a *HorizontalController) updateStatus(hpa *autoscalingv2.HorizontalPodAutoscaler) error {
// convert back to autoscalingv1
hpaRaw, err := unsafeConvertToVersionVia(hpa, autoscalingv1.SchemeGroupVersion)
if err != nil {
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedConvertHPA", err.Error())
return fmt.Errorf("failed to convert the given HPA to %s: %v", autoscalingv2.SchemeGroupVersion.String(), err)
}
hpav1 := hpaRaw.(*autoscalingv1.HorizontalPodAutoscaler)
_, err = a.hpaNamespacer.HorizontalPodAutoscalers(hpav1.Namespace).UpdateStatus(hpav1)
if err != nil {
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedUpdateStatus", err.Error())
return fmt.Errorf("failed to update status for %s: %v", hpa.Name, err)
}
klog.V(2).Infof("Successfully updated status for %s", hpa.Name)
return nil
}
func (c *horizontalPodAutoscalers) UpdateStatus(horizontalPodAutoscaler *v1.HorizontalPodAutoscaler) (result *v1.HorizontalPodAutoscaler, err error) {
result = &v1.HorizontalPodAutoscaler{}
err = c.client.Put().
Namespace(c.ns).
Resource("horizontalpodautoscalers").
Name(horizontalPodAutoscaler.Name).
SubResource("status").
Body(horizontalPodAutoscaler).
Do().
Into(result)
return
}
根據之前解析出的group以及kind,判斷資源型別是否存在
targetGK := schema.GroupKind{
Group: targetGV.Group,
Kind: hpa.Spec.ScaleTargetRef.Kind,
}
mappings, err := a.mapper.RESTMappings(targetGK)
if err != nil {
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetScale", err.Error())
setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedGetScale", "the HPA controller was unable to get the target's current scale: %v", err)
a.updateStatusIfNeeded(hpaStatusOriginal, hpa)
return fmt.Errorf("unable to determine resource for scale target reference: %v", err)
}
如果版本指定,則返回指定版本的資源,若沒有指定則返回所有資源型別
// RESTMappings returns all resource mappings for the provided group kind if no
// version search is provided. Otherwise identifies a preferred resource mapping for
// the provided version(s).
RESTMappings(gk schema.GroupKind, versions ...string) ([]*RESTMapping, error)
根據namespace以及name獲取當前物件資源需求物件scale
scale, targetGR, err := a.scaleForResourceMappings(hpa.Namespace, hpa.Spec.ScaleTargetRef.Name, mappings)
// scaleForResourceMappings attempts to fetch the scale for the
// resource with the given name and namespace, trying each RESTMapping
// in turn until a working one is found. If none work, the first error
// is returned. It returns both the scale, as well as the group-resource from
// the working mapping.
func (a *HorizontalController) scaleForResourceMappings(namespace, name string, mappings []*apimeta.RESTMapping) (*autoscalingv1.Scale, schema.GroupResource, error)
記錄狀態
setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "SucceededGetScale", "the HPA controller was able to get the target's current scale")
記錄副本數量 使用map來記錄每一個hpa的最近的副本數量
currentReplicas := scale.Status.Replicas
a.recordInitialRecommendation(currentReplicas, key)
func (a *HorizontalController) recordInitialRecommendation(currentReplicas int32, key string) {
if a.recommendations[key] == nil {
a.recommendations[key] = []timestampedRecommendation{{currentReplicas, time.Now()}}
}
}
// Latest unstabilized recommendations for each autoscaler.
recommendations map[string][]timestampedRecommendation
var metricStatuses []autoscalingv2.MetricStatus
metricDesiredReplicas := int32(0)
metricName := ""
metricTimestamp := time.Time{}
desiredReplicas := int32(0)
rescaleReason := ""
timestamp := time.Now()
rescale := true
如果待控制物件的資源物件數量定義為0,則說明不需要hpa數量控制
if scale.Spec.Replicas == 0 {
// Autoscaling is disabled for this resource
desiredReplicas = 0
rescale = false
setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "ScalingDisabled", "scaling is disabled since the replica count of the target is zero")
計算極端情況
else if currentReplicas > hpa.Spec.MaxReplicas {
rescaleReason = "Current number of replicas above Spec.MaxReplicas"
desiredReplicas = hpa.Spec.MaxReplicas
} else if hpa.Spec.MinReplicas != nil && currentReplicas < *hpa.Spec.MinReplicas {
rescaleReason = "Current number of replicas below Spec.MinReplicas"
desiredReplicas = *hpa.Spec.MinReplicas
} else if currentReplicas == 0 {
rescaleReason = "Current number of replicas must be greater than 0"
desiredReplicas = 1
}
如果currentReplicas大於最多資源限制數量MaxReplicas,則目標數量desiredReplicas = hpa.Spec.MaxReplicas
如果currentReplicas小於最少資源數量MinReplicas,則目標數量 desiredReplicas = *hpa.Spec.MinReplicas
如果叢集數量當前為0,則目標數量先設定為1
根據設定的metrics指標,hpa策略以及資源物件的需求來計算副本的數量
metricDesiredReplicas, metricName, metricStatuses, metricTimestamp, err = a.computeReplicasForMetrics(hpa, scale, hpa.Spec.Metrics)
目標值desiredReplicas初始設為0,比較由某種資源指標計算出來的數值metricDesiredReplicas
如果計算值大於目標值,則目標值等於計算值
如果目標值大於現在值,說明應該擴容,紀錄原因
如果目標值小於現在值,說明應該縮容,紀錄原因
if metricDesiredReplicas > desiredReplicas {
desiredReplicas = metricDesiredReplicas
timestamp = metricTimestamp
rescaleMetric = metricName
}
if desiredReplicas > currentReplicas {
rescaleReason = fmt.Sprintf("%s above target", rescaleMetric)
}
if desiredReplicas < currentReplicas {
rescaleReason = "All metrics below target"
}
計算最終的目標值,如果目標值==現在值,則說明不需要伸縮
desiredReplicas = a.normalizeDesiredReplicas(hpa, key, currentReplicas, desiredReplicas)
rescale = desiredReplicas != currentReplicas
如果需要擴容,則更新scale標籤
scale.Spec.Replicas = desiredReplicas
if rescale {
scale.Spec.Replicas = desiredReplicas
_, err = a.scaleNamespacer.Scales(hpa.Namespace).Update(targetGR, scale)
if err != nil {
a.eventRecorder.Eventf(hpa, v1.EventTypeWarning, "FailedRescale", "New size: %d; reason: %s; error: %v", desiredReplicas, rescaleReason, err.Error())
setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedUpdateScale", "the HPA controller was unable to update the target scale: %v", err)
a.setCurrentReplicasInStatus(hpa, currentReplicas)
if err := a.updateStatusIfNeeded(hpaStatusOriginal, hpa); err != nil {
utilruntime.HandleError(err)
}
return fmt.Errorf("failed to rescale %s: %v", reference, err)
}
setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "SucceededRescale", "the HPA controller was able to update the target scale to %d", desiredReplicas)
a.eventRecorder.Eventf(hpa, v1.EventTypeNormal, "SuccessfulRescale", "New size: %d; reason: %s", desiredReplicas, rescaleReason)
klog.Infof("Successful rescale of %s, old size: %d, new size: %d, reason: %s",
hpa.Name, currentReplicas, desiredReplicas, rescaleReason)
} else {
klog.V(4).Infof("decided not to scale %s to %v (last scale time was %s)", reference, desiredReplicas, hpa.Status.LastScaleTime)
desiredReplicas = currentReplicas
}
更新該資源的數量
_, err = a.scaleNamespacer.Scales(hpa.Namespace).Update(targetGR, scale)
// Update updates the scale of the given scalable resource.
Update(resource schema.GroupResource, scale *autoscalingapi.Scale) (*autoscalingapi.Scale, error)
更新HPA狀態,數量以及狀態等
a.setStatus(hpa, currentReplicas, desiredReplicas, metricStatuses, rescale)
// setStatus recreates the status of the given HPA, updating the current and
// desired replicas, as well as the metric statuses
func (a *HorizontalController) setStatus(hpa *autoscalingv2.HorizontalPodAutoscaler, currentReplicas, desiredReplicas int32, metricStatuses []autoscalingv2.MetricStatus, rescale bool) {
hpa.Status = autoscalingv2.HorizontalPodAutoscalerStatus{
CurrentReplicas: currentReplicas,
DesiredReplicas: desiredReplicas,
LastScaleTime: hpa.Status.LastScaleTime,
CurrentMetrics: metricStatuses,
Conditions: hpa.Status.Conditions,
}
if rescale {
now := metav1.NewTime(time.Now())
hpa.Status.LastScaleTime = &now
}
}
在以上過程中兩個核心方法
metricDesiredReplicas, metricName, metricStatuses, metricTimestamp, err = a.computeReplicasForMetrics(hpa, scale, hpa.Spec.Metrics)
desiredReplicas = a.normalizeDesiredReplicas(hpa, key, currentReplicas, desiredReplicas)
根據指定的資源指標來計算需要的副本數量
// computeReplicasForMetrics computes the desired number of replicas for the metric specifications listed in the HPA,
// returning the maximum of the computed replica counts, a description of the associated metric, and the statuses of
// all metrics computed.
func (a *HorizontalController) computeReplicasForMetrics(hpa *autoscalingv2.HorizontalPodAutoscaler, scale *autoscalingv1.Scale,
metricSpecs []autoscalingv2.MetricSpec) (replicas int32, metric string, statuses []autoscalingv2.MetricStatus, timestamp time.Time, err error)