Kubernetes32--ReplicationController原始碼--RC數量控制
阿新 • • 發佈:2018-12-26
RC啟動,通過一系列配置,進入核心方法
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
獲取RS資訊
rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
判斷RS是否需要同步
rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
獲取標籤選擇器
selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
獲取RS所在名稱空間的所有Pod
allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
過濾掉所有非活躍pod
var filteredPods []*v1.Pod
for _, pod := range allPods {
if controller.IsPodActive(pod) {
filteredPods = append(filteredPods, pod)
}
}
根據標籤選擇器來決定收養或者釋放Pod
filteredPods, err = rsc.claimPods(rs, selector, filteredPods)
如果需要同步並且RS沒有被刪除,則執行調整叢集數量方法
if rsNeedsSync && rs.DeletionTimestamp == nil {
manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
}
調整之後更新RS狀態
newStatus := calculateStatus(rs, filteredPods, manageReplicasErr) // Always updates status as pods come up or die. updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
延時一段時間重新入隊,再次加入到佇列中執行
if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
rsc.enqueueReplicaSetAfter(updatedRS, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
}
核心方法manageReplicas
// manageReplicas checks and updates replicas for the given ReplicaSet.
// Does NOT modify <filteredPods>.
// It will requeue the replica set in case of an error while creating/deleting pods.
func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
計算現在叢集Pod數量值與目標值的差值
diff := len(filteredPods) - int(*(rs.Spec.Replicas))
如果差值小於0,說明應該增加叢集數量
if diff < 0 {
diff *= -1
if diff > rsc.burstReplicas {
diff = rsc.burstReplicas
}
rsc.expectations.ExpectCreations(rsKey, diff)
慢開始批量處理函式,開始慢速直行,如果執行成功則加快執行
successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error
// slowStartBatch tries to call the provided function a total of 'count' times,
// starting slow to check for errors, then speeding up if calls succeed.
//
// It groups the calls into batches, starting with a group of initialBatchSize.
// Within each batch, it may call the function multiple times concurrently.
//
// If a whole batch succeeds, the next batch may get exponentially larger.
// If there are any failures in a batch, all remaining batches are skipped
// after waiting for the current batch to complete.
//
// It returns the number of successful calls to the function.
func slowStartBatch(count int, initialBatchSize int, fn func() error) (int, error) {
慢開始分發函式實現如下:
func slowStartBatch(count int, initialBatchSize int, fn func() error) (int, error) {
remaining := count
successes := 0
for batchSize := integer.IntMin(remaining, initialBatchSize); batchSize > 0; batchSize = integer.IntMin(2*batchSize, remaining) {
errCh := make(chan error, batchSize)
var wg sync.WaitGroup
wg.Add(batchSize)
for i := 0; i < batchSize; i++ {
go func() {
defer wg.Done()
if err := fn(); err != nil {
errCh <- err
}
}()
}
wg.Wait()
curSuccesses := batchSize - len(errCh)
successes += curSuccesses
if len(errCh) > 0 {
return successes, <-errCh
}
remaining -= batchSize
}
return successes, nil
}
batchSize每次迴圈待執行的次數,初始時為initialBatchSize,如果成功,以後加倍
for batchSize := integer.IntMin(remaining, initialBatchSize); batchSize > 0; batchSize = integer.IntMin(2*batchSize, remaining)
執行緒組同步策略
var wg sync.WaitGroup
wg.Add(batchSize)
for i := 0; i < batchSize; i++ {
go func() {
defer wg.Done()
if err := fn(); err != nil {
errCh <- err
}
}()
}
wg.Wait()
如果不能全部執行完,則等待下一個週期繼續同步
// Any skipped pods that we never attempted to start shouldn't be expected.
// The skipped pods will be retried later. The next controller resync will
// retry the slow start process.
if skippedPods := diff - successfulCreations; skippedPods > 0 {
klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for %v %v/%v", skippedPods, rsc.Kind, rs.Namespace, rs.Name)
for i := 0; i < skippedPods; i++ {
// Decrement the expected number of creates because the informer won't observe this pod
rsc.expectations.CreationObserved(rsKey)
}
}
如果diff>0,執行刪除操作
else if diff > 0 {
if diff > rsc.burstReplicas {
diff = rsc.burstReplicas
}
獲取待刪除的Pod列表,根據Pod狀態排序,刪除
func getPodsToDelete(filteredPods []*v1.Pod, diff int) []*v1.Pod {
// No need to sort pods if we are about to delete all of them.
// diff will always be <= len(filteredPods), so not need to handle > case.
if diff < len(filteredPods) {
// Sort the pods in the order such that not-ready < ready, unscheduled
// < scheduled, and pending < running. This ensures that we delete pods
// in the earlier stages whenever possible.
sort.Sort(controller.ActivePods(filteredPods))
}
return filteredPods[:diff]
}
看一下Pod排序規則
func (s ActivePods) Less(i, j int) bool {
// 1. Unassigned < assigned
// If only one of the pods is unassigned, the unassigned one is smaller
if s[i].Spec.NodeName != s[j].Spec.NodeName && (len(s[i].Spec.NodeName) == 0 || len(s[j].Spec.NodeName) == 0) {
return len(s[i].Spec.NodeName) == 0
}
// 2. PodPending < PodUnknown < PodRunning
m := map[v1.PodPhase]int{v1.PodPending: 0, v1.PodUnknown: 1, v1.PodRunning: 2}
if m[s[i].Status.Phase] != m[s[j].Status.Phase] {
return m[s[i].Status.Phase] < m[s[j].Status.Phase]
}
// 3. Not ready < ready
// If only one of the pods is not ready, the not ready one is smaller
if podutil.IsPodReady(s[i]) != podutil.IsPodReady(s[j]) {
return !podutil.IsPodReady(s[i])
}
// TODO: take availability into account when we push minReadySeconds information from deployment into pods,
// see https://github.com/kubernetes/kubernetes/issues/22065
// 4. Been ready for empty time < less time < more time
// If both pods are ready, the latest ready one is smaller
if podutil.IsPodReady(s[i]) && podutil.IsPodReady(s[j]) && !podReadyTime(s[i]).Equal(podReadyTime(s[j])) {
return afterOrZero(podReadyTime(s[i]), podReadyTime(s[j]))
}
// 5. Pods with containers with higher restart counts < lower restart counts
if maxContainerRestarts(s[i]) != maxContainerRestarts(s[j]) {
return maxContainerRestarts(s[i]) > maxContainerRestarts(s[j])
}
// 6. Empty creation time pods < newer pods < older pods
if !s[i].CreationTimestamp.Equal(&s[j].CreationTimestamp) {
return afterOrZero(&s[i].CreationTimestamp, &s[j].CreationTimestamp)
}
return false
}
建立Pod方法
err := rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, controllerRef)
// CreatePodsWithControllerRef creates new pods according to the spec, and sets object as the pod's controller.
CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error
刪除Pod方法
err := rsc.podControl.DeletePod(rs.Namespace, targetPod.Name, rs)
// DeletePod deletes the pod identified by podID.
DeletePod(namespace string, podID string, object runtime.Object) error
計算並更新RS狀態
計算RS狀態
func calculateStatus(rs *apps.ReplicaSet, filteredPods []*v1.Pod, manageReplicasErr error) apps.ReplicaSetStatus {
計算所有標籤符合的副本數量
if templateLabel.Matches(labels.Set(pod.Labels)) {
fullyLabeledReplicasCount++
}
計算Podready的副本數量
if podutil.IsPodReady(pod) {
readyReplicasCount++
// PodReady means the pod is able to service requests and should be added to the
// load balancing pools of all matching services.
PodReady PodConditionType = "Ready"
計算PodAvailable數量
if podutil.IsPodAvailable(pod, rs.Spec.MinReadySeconds, metav1.Now()) {
availableReplicasCount++
}
// IsPodAvailable returns true if a pod is available; false otherwise.
// Precondition for an available pod is that it must be ready. On top
// of that, there are two cases when a pod can be considered available:
// 1. minReadySeconds == 0, or
// 2. LastTransitionTime (is set) + minReadySeconds < current time
func IsPodAvailable(pod *v1.Pod, minReadySeconds int32, now metav1.Time) bool {
if !IsPodReady(pod) {
return false
}
c := GetPodReadyCondition(pod.Status)
minReadySecondsDuration := time.Duration(minReadySeconds) * time.Second
if minReadySeconds == 0 || !c.LastTransitionTime.IsZero() && c.LastTransitionTime.Add(minReadySecondsDuration).Before(now.Time) {
return true
}
return false
}
更新RS狀態
// Always updates status as pods come up or die.
updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
// updateReplicaSetStatus attempts to update the Status.Replicas of the given ReplicaSet, with a single GET/PUT retry.
func updateReplicaSetStatus(c appsclient.ReplicaSetInterface, rs *apps.ReplicaSet, newStatus apps.ReplicaSetStatus) (*apps.ReplicaSet, error) {
1.核心物件介面
// A TTLCache of pod creates/deletes each rc expects to see.
expectations *controller.UIDTrackingControllerExpectations
2.慢開始分發機制
3.Pod排序規則
4.Pod操作 增加以及刪除