深入分析Kubernetes DaemonSet Controller
Author: [email protected] | Version: Kubernetes 1.13
摘要:DaemonSet是Kubernetes中使用者最常用的物件之一,我們用它來部署Nodes上守護應用,比如日誌元件、節點監控元件等。從使用者的使用角度來講,DaemonSet看似簡單,但實際上它涉及的點非常多,比如DaemonSet Pod滿足什麼條件才能在Node上執行、Node出現MemoryPressure或者其他異常Condition時是否能執行、排程的邏輯是怎樣的、滾動更新的邏輯是怎樣的等等,本文講從DaemonSet Controller的原始碼著手,分析其中關鍵邏輯。
DaemonSet Controller
DaemonSet Controller Struct
DaemonSet Controller的核心結構包括:
burstReplcas int
: 每次sync時,Create和Delete Pods的數量上限,程式碼中寫死為250。queue workqueue.RateLimitingInterface
: 存放待同步DaemonSet Key(namespaces/name)的Delaying Queue。syncHandler func(dsKey string) error
: 負責同步DaemonSet Queue中物件,包括Replicas管理、UpdateStrategy升級、更新DaemonSet Status等工作,是DaemonSet Controller中最核心的邏輯。expectations controller.ControllerExpectationsInterface
: 維護每個DaemonSet物件每次Sync期望Create/Delete Pods數的TTLCache。suspendedDaemonPods map[string]sets.String
: key為NodeName,value是DaemonSet集合,這些DaemonSet包含該Node上'wantToRun & !shouldSchedule'的Pod。- wantToRun: 為True,當DaemonSet Controller去Simulate排程時,Predicate(主要是GeneralPredicates和PodToleratesNodeTaints)時忽略如下PredicateFailureError(都是些資源類的Error)時成功,有其他PredicateFailureError為False。如果DaemonSet的Spec中指定了NodeName,則根據其是否與node.Name匹配成功來決定wantToRun的值。
- ErrDiskConflict;
- ErrVolumeZoneConflict;
- ErrMaxVolumeCountExceeded;
- ErrNodeUnderMemoryPressure;
- ErrNodeUnderDiskPressure;
- InsufficientResourceError;
- shouldSchedule:
- 如果DaemonSet的Spec中指定了NodeName,則根據其是否與node.Name匹配成功來決定shouldSchedule的值。
- 如果Predicate時出現所有型別的PredicateFailureError之一,則shouldSchedule都為false。
- 如果出現InsufficientResourceError,則shouldSchedule也為false。
- wantToRun: 為True,當DaemonSet Controller去Simulate排程時,Predicate(主要是GeneralPredicates和PodToleratesNodeTaints)時忽略如下PredicateFailureError(都是些資源類的Error)時成功,有其他PredicateFailureError為False。如果DaemonSet的Spec中指定了NodeName,則根據其是否與node.Name匹配成功來決定wantToRun的值。
failedPodsBackoff *flowcontrol.Backoff
: DaemonSet Controller Run時會啟動一個協程,每隔2*MaxDuration(2*15Min)
會強制進行一次failedPods GC清理。每次syncDaemonSet處理該刪除的Pods時,會按照1s,2s,4s,8s,.....15min的Backoff機制做一定的delay處理,實現流控的效果。防止kubelet拒絕某些DaemonSet Pods後,馬上又被拒絕,如此就會出現很多無效的迴圈,因此加入了Backoff機制。
DaemonSet Controller的建立和啟動
NewDaemonSetsController負責建立Controller,其中很重要的工作就是註冊以下Informer的EventHandler:
- daemonSetInformer: AddFunc/DeleteFunc/UpdateFunc最終其實都主要是enqueue DaemonSet;
- historyInformer:
- AddFunc: addHistory;
- UpdateFunc: updateHistory;
- DeleteFunc: deleteHistory;
- podInformer:
- AddFunc: addPod;
- UpdateFunc: updatePod;
- DeleteFunc: deletePod;
- nodeInformer:
- AddFunc: addNode;
- UpdateFunc: updateNode;
DamonSet Controller Run啟動時,主要幹兩件事:
-
啟動2個workers協程,每個worker負責從queue中取DaemonSet Key進行sync。
-
啟動1個failedPodsBackoff GC協程,每隔1Min清理一次叢集中所有DaemonSet/Node對應的Failed Pods。
只有deletePod時,才會requeueSuspendedDaemonPods。-- 為什麼?
DaemonSet的同步
worker會從queue中取待同步的DamonSet Key,呼叫syncDaemonSet完成自動管理,syncDaemonSet是DaemonSet管理的核心入口。
pkg/controller/daemon/daemon_controller.go:1208
func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
...
ds, err := dsc.dsLister.DaemonSets(namespace).Get(name)
if errors.IsNotFound(err) {
klog.V(3).Infof("daemon set has been deleted %v", key)
dsc.expectations.DeleteExpectations(key)
return nil
}
if err != nil {
return fmt.Errorf("unable to retrieve ds %v from store: %v", key, err)
}
everything := metav1.LabelSelector{}
if reflect.DeepEqual(ds.Spec.Selector, &everything) {
dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, SelectingAllReason, "This daemon set is selecting all pods. A non-empty selector is required.")
return nil
}
// Don't process a daemon set until all its creations and deletions have been processed.
// For example if daemon set foo asked for 3 new daemon pods in the previous call to manage,
// then we do not want to call manage on foo until the daemon pods have been created.
...
if ds.DeletionTimestamp != nil {
return nil
}
// Construct histories of the DaemonSet, and get the hash of current history
cur, old, err := dsc.constructHistory(ds)
if err != nil {
return fmt.Errorf("failed to construct revisions of DaemonSet: %v", err)
}
hash := cur.Labels[apps.DefaultDaemonSetUniqueLabelKey]
if !dsc.expectations.SatisfiedExpectations(dsKey) {
// Only update status. Don't raise observedGeneration since controller didn't process object of that generation.
return dsc.updateDaemonSetStatus(ds, hash, false)
}
err = dsc.manage(ds, hash)
if err != nil {
return err
}
// Process rolling updates if we're ready.
if dsc.expectations.SatisfiedExpectations(dsKey) {
switch ds.Spec.UpdateStrategy.Type {
case apps.OnDeleteDaemonSetStrategyType:
case apps.RollingUpdateDaemonSetStrategyType:
err = dsc.rollingUpdate(ds, hash)
}
if err != nil {
return err
}
}
err = dsc.cleanupHistory(ds, old)
if err != nil {
return fmt.Errorf("failed to clean up revisions of DaemonSet: %v", err)
}
return dsc.updateDaemonSetStatus(ds, hash, true)
}
核心的流程如下:
- 首先檢查該DaemonSet物件在本地Store中是否被刪除,如果是,則從expectations中刪除該DaemonSet對應的資料。
- 檢查該DaemonSet物件的LabelSelector是否為空,如果是,則syncDaemonSet返回結束,不進行同步,那麼DaemonSet對應的Pod也不會被建立了。
- 如果其DeletionTimestamp非空,意味著使用者觸發了刪除,則syncDaemonSet返回結束,不進行同步。DaemonSet對應的Pod交由GC Controller去完成刪除。
- 然後constructHistory獲取該DaemonSet的Current ControllerRevision和所有Old ControllerRevisions,並確保所有ControllerRevisions都打上Label: "controller-revision-hash: ControllerRevision.Name",更新Current ControllerRevision的Revision = maxRevision(old) + 1。
- 檢查當前expectations是否已經滿足,當不滿足時,只更新DaemonSet Status,同步流程結束。
- expectations中add和del都不大於0,表示Controller expectations已經實現,則當前expectations已經滿足。
- expectations已經超時,超時時間是5min(不可配置),如果超時,則表示需要進行同步。
- 如果expectations中還沒有該DaemonSet的資訊,則表示也滿足了,將觸發DaemonSet同步。
- 此處updateDaemonSetStatus會更新該Daemonset.Status的如下欄位,注意不會更新ObservedGeneration(也沒發生變化)。
- DesiredNumberScheduled:使用者期望排程的DaemonSet Pods數量,對應前面提到的wantToRun為true的pods數量。
- CurrentNumberScheduled:使用者期望排程的,並且當前已經執行在Node上的Pods數量。
- NumberMisscheduled:使用者不期望排程的(wantToRun為false),並且已經執行在對應Node上Pods數量,即已經錯誤排程的Pods數量。
- NumberReady:CurrentNumberScheduled中,Pod Type Ready Condition為true的Pods數量。
- UpdatedNumberScheduled:CurrentNumberScheduled中,Pod Label
controller-revision-hash
對應的hash值與Current ControllerRevision的該hash值相等的Pods數量,即Pod Template已經更新的Pods數量。 - NumberAvailable:CurrentNumberScheduled中,Pod Type Ready Condition為true,並且Available(Ready時間超過minReadySeconds)的Pods數量。
- NumberUnavailable:desiredNumberScheduled - numberAvailable。
- 呼叫manage進行DaemonSet Pod的管理:計算待刪除和建立的Pod列表,然後呼叫syncNodes分批次(1,2,4,8,..)的完成Pod的建立和刪除。如果syncNodes之前發現某些Node上對應DaemonSet Pod是Failed,那麼syncNodes後返回error。syncNode會將expectations中的add/del都歸零甚至負數,只有這樣,才會在syncDaemonSet中呼叫manage進行Pod管理。
- 如果manage返回error,則syncDaemonSet流程結束。否則會繼續下面的流程。
- 檢查當前expectations是否已經滿足,如果滿足,則根據UpdateStrategy觸發DaemonSet更新:
- 如果UpdateStrategy是OnDelete,則等待使用者delete Pod,觸發對應的DaemonSet的enqueue,在syncNodes時更新最新的Pod Template建立新Pod。
- 如果UpdateStrategy是RollingUpdate,則呼叫rollingUpdate進行滾動更新,後面會詳細分析。
- 如果DaemonSet更新成功,則根據需要(Old ControllerRevisions數量是否超過Spec.RevisionHistoryLimit,預設為10)清理超過RevisionHistoryLimit的最老的ControllerRevisions。
- updateDaemonSetStatus會更新該Daemonset.Status,跟前面不同的是,這裡還需要更新Status.ObservedGeneration。
DaemonSet Pod的排程
在Kubernetes 1.12之前的版本中,預設由DaemonSet Controller完成Daemon Pods的排程工作,即由DaemonSet Controller給待排程Pod的spec.nodeName
設定值,然後對應Node的kubelet watch到該事件,再在本節點建立DaemonSet Pod。在Kubernetes 1.12+,預設啟用了ScheduleDaemonSetPods
FeatureGate, DaemonSet的排程就交由default scheduler完成。
DamonSet Pods Should Be On Node
在manage daemonset時,通過呼叫podsShouldBeOnNode
來計算出希望在該Node上啟動的DaemonSet Pods(nodesNeedingDaemonPods)、希望在該Node上刪除的DaemonSet Pods(podsToDelete),以及在該Node上已經Failed DamonSetPods數量,然後在syncNodes中根據這三個資訊,去建立、刪除對應的Pods。
func (dsc *DaemonSetsController) manage(ds *apps.DaemonSet, hash string) error {
// Find out the pods which are created for the nodes by DaemonSet.
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
...
for _, node := range nodeList {
nodesNeedingDaemonPodsOnNode, podsToDeleteOnNode, failedPodsObservedOnNode, err := dsc.podsShouldBeOnNode(
node, nodeToDaemonPods, ds)
if err != nil {
continue
}
nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, nodesNeedingDaemonPodsOnNode...)
podsToDelete = append(podsToDelete, podsToDeleteOnNode...)
failedPodsObserved += failedPodsObservedOnNode
}
// Label new pods using the hash label value of the current history when creating them
if err = dsc.syncNodes(ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil {
return err
}
...
return nil
}
podsShouldBeOnNode
是如何計算出nodesNeedingDaemonPods、podsToDelete、failedPodsObserved的呢?—— 通過呼叫nodeShouldRunDaemonPod(node *v1.Node, ds *apps.DaemonSet)
計算出如下三個狀態值:
- wantToRun: 當DaemonSet Controller去Simulate排程時,Predicate(主要是GeneralPredicates和PodToleratesNodeTaints)時忽略如下PredicateFailureError(都是些資源類的Error)時為True,有其他PredicateFailureError為False。如果DaemonSet的Spec中指定了NodeName,則根據其是否與node.Name匹配成功來決定wantToRun的值。 - ErrDiskConflict; - ErrVolumeZoneConflict; - ErrMaxVolumeCountExceeded; - ErrNodeUnderMemoryPressure; - ErrNodeUnderDiskPressure; - InsufficientResourceError;
- shouldSchedule:
- 如果DaemonSet的Spec中指定了NodeName,則根據其是否與node.Name匹配成功來決定shouldSchedule的值。 - 如果Predicate時出現所有型別的PredicateFailureError之一,則shouldSchedule都為false。 - 如果出現InsufficientResourceError,則shouldSchedule也為false。 failedPodsBackoff *flowcontrol.Backoff
: 按照1s,2s,4s,8s,...的backoff週期去處理(刪除重建)Failed DaemonSet Pods,實現流控的效果。DaemonSet Controller Run時會啟動一個協程,每隔2*MaxDuration(2*15Min)
會強制進行一次failedPods GC清理。
- shouldContinueRunning,如下情況之一出現,則該值為false,其他情況為true。
- ErrNodeSelectorNotMatch,
- ErrPodNotMatchHostName,
- ErrNodeLabelPresenceViolated,
- ErrPodNotFitsHostPorts:
- ErrTaintsTolerationsNotMatch,如果是No Execute型別的Taint/Toleration匹配,則為true,否則為false,也就是說會忽略NoExecute型別的Taint/Toleration匹配。
- ErrPodAffinityNotMatch,
- ErrServiceAffinityViolated,
- unknown predicate failure reason
然後根據這三個狀態值,得到nodesNeedingDaemonPods []string、podsToDelete []string、failedPodsObserved int
。
// podsShouldBeOnNode figures out the DaemonSet pods to be created and deleted on the given node:
func (dsc *DaemonSetsController) podsShouldBeOnNode(
node *v1.Node,
nodeToDaemonPods map[string][]*v1.Pod,
ds *apps.DaemonSet,
) (nodesNeedingDaemonPods, podsToDelete []string, failedPodsObserved int, err error) {
wantToRun, shouldSchedule, shouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(node, ds)
if err != nil {
return
}
daemonPods, exists := nodeToDaemonPods[node.Name]
dsKey, _ := cache.MetaNamespaceKeyFunc(ds)
dsc.removeSuspendedDaemonPods(node.Name, dsKey)
switch {
case wantToRun && !shouldSchedule:
// If daemon pod is supposed to run, but can not be scheduled, add to suspended list.
dsc.addSuspendedDaemonPods(node.Name, dsKey)
case shouldSchedule && !exists:
// If daemon pod is supposed to be running on node, but isn't, create daemon pod.
nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, node.Name)
case shouldContinueRunning:
// If a daemon pod failed, delete it
// If there's non-daemon pods left on this node, we will create it in the next sync loop
var daemonPodsRunning []*v1.Pod
for _, pod := range daemonPods {
if pod.DeletionTimestamp != nil {
continue
}
if pod.Status.Phase == v1.PodFailed {
failedPodsObserved++
// This is a critical place where DS is often fighting with kubelet that rejects pods.
// We need to avoid hot looping and backoff.
backoffKey := failedPodsBackoffKey(ds, node.Name)
now := dsc.failedPodsBackoff.Clock.Now()
inBackoff := dsc.failedPodsBackoff.IsInBackOffSinceUpdate(backoffKey, now)
if inBackoff {
delay := dsc.failedPodsBackoff.Get(backoffKey)
klog.V(4).Infof("Deleting failed pod %s/%s on node %s has been limited by backoff - %v remaining",
pod.Namespace, pod.Name, node.Name, delay)
dsc.enqueueDaemonSetAfter(ds, delay)
continue
}
dsc.failedPodsBackoff.Next(backoffKey, now)
msg := fmt.Sprintf("Found failed daemon pod %s/%s on node %s, will try to kill it", pod.Namespace, pod.Name, node.Name)
klog.V(2).Infof(msg)
// Emit an event so that it's discoverable to users.
dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, FailedDaemonPodReason, msg)
podsToDelete = append(podsToDelete, pod.Name)
} else {
daemonPodsRunning = append(daemonPodsRunning, pod)
}
}
// If daemon pod is supposed to be running on node, but more than 1 daemon pod is running, delete the excess daemon pods.
// Sort the daemon pods by creation time, so the oldest is preserved.
if len(daemonPodsRunning) > 1 {
sort.Sort(podByCreationTimestampAndPhase(daemonPodsRunning))
for i := 1; i < len(daemonPodsRunning); i++ {
podsToDelete = append(podsToDelete, daemonPodsRunning[i].Name)
}
}
case !shouldContinueRunning && exists:
// If daemon pod isn't supposed to run on node, but it is, delete all daemon pods on node.
for _, pod := range daemonPods {
podsToDelete = append(podsToDelete, pod.Name)
}
}
return nodesNeedingDaemonPods, podsToDelete, failedPodsObserved, nil
}
// nodeShouldRunDaemonPod checks a set of preconditions against a (node,daemonset) and returns a summary.
func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *apps.DaemonSet) (wantToRun, shouldSchedule, shouldContinueRunning bool, err error) {
newPod := NewPod(ds, node.Name)
// Because these bools require an && of all their required conditions, we start
// with all bools set to true and set a bool to false if a condition is not met.
// A bool should probably not be set to true after this line.
wantToRun, shouldSchedule, shouldContinueRunning = true, true, true
// If the daemon set specifies a node name, check that it matches with node.Name.
if !(ds.Spec.Template.Spec.NodeName == "" || ds.Spec.Template.Spec.NodeName == node.Name) {
return false, false, false, nil
}
reasons, nodeInfo, err := dsc.simulate(newPod, node, ds)
if err != nil {
klog.Warningf("DaemonSet Predicates failed on node %s for ds '%s/%s' due to unexpected error: %v", node.Name, ds.ObjectMeta.Namespace, ds.ObjectMeta.Name, err)
return false, false, false, err
}
// TODO(k82cn): When 'ScheduleDaemonSetPods' upgrade to beta or GA, remove unnecessary check on failure reason,
// e.g. InsufficientResourceError; and simplify "wantToRun, shouldSchedule, shouldContinueRunning"
// into one result, e.g. selectedNode.
var insufficientResourceErr error
for _, r := range reasons {
klog.V(4).Infof("DaemonSet Predicates failed on node %s for ds '%s/%s' for reason: %v", node.Name, ds.ObjectMeta.Namespace, ds.ObjectMeta.Name, r.GetReason())
switch reason := r.(type) {
case *predicates.InsufficientResourceError:
insufficientResourceErr = reason
case *predicates.PredicateFailureError:
var emitEvent bool
// we try to partition predicates into two partitions here: intentional on the part of the operator and not.
switch reason {
// intentional
case
predicates.ErrNodeSelectorNotMatch,
predicates.ErrPodNotMatchHostName,
predicates.ErrNodeLabelPresenceViolated,
// this one is probably intentional since it's a workaround for not having
// pod hard anti affinity.
predicates.ErrPodNotFitsHostPorts:
return false, false, false, nil
case predicates.ErrTaintsTolerationsNotMatch:
// DaemonSet is expected to respect taints and tolerations
fitsNoExecute, _, err := predicates.PodToleratesNodeNoExecuteTaints(newPod, nil, nodeInfo)
if err != nil {
return false, false, false, err
}
if !fitsNoExecute {
return false, false, false, nil
}
wantToRun, shouldSchedule = false, false
// unintentional
case
predicates.ErrDiskConflict,
predicates.ErrVolumeZoneConflict,
predicates.ErrMaxVolumeCountExceeded,
predicates.ErrNodeUnderMemoryPressure,
predicates.ErrNodeUnderDiskPressure:
// wantToRun and shouldContinueRunning are likely true here. They are
// absolutely true at the time of writing the comment. See first comment
// of this method.
shouldSchedule = false
emitEvent = true
// unexpected
case
predicates.ErrPodAffinityNotMatch,
predicates.ErrServiceAffinityViolated:
klog.Warningf("unexpected predicate failure reason: %s", reason.GetReason())
return false, false, false, fmt.Errorf("unexpected reason: DaemonSet Predicates should not return reason %s", reason.GetReason())
default:
klog.V(4).Infof("unknown predicate failure reason: %s", reason.GetReason())
wantToRun, shouldSchedule, shouldContinueRunning = false, false, false
emitEvent = true
}
if emitEvent {
dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, FailedPlacementReason, "failed to place pod on %q: %s", node.ObjectMeta.Name, reason.GetReason())
}
}
}
// only emit this event if insufficient resource is the only thing
// preventing the daemon pod from scheduling
if shouldSchedule && insufficientResourceErr != nil {
dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, FailedPlacementReason, "failed to place pod on %q: %s", node.ObjectMeta.Name, insufficientResourceErr.Error())
shouldSchedule = false
}
return
}
-
如果
shouldSchedule && !exists
,則會把該Pod加入到nodesNeedingDaemonPods
中。 -
如果
shouldContinueRunning && pod.DeletionTimestamp == nil && pod.Status.Phase == v1.PodFailed
則檢查是否在流控週期(15min, hardcode)中,如果已經超過流控週期,會把該Pod加入到podsToDelete
中,否則將再次入佇列。 -
如果
shouldContinueRunning && pod.DeletionTimestamp == nil && pod.Status.Phase != v1.PodFailed
則會把該Pod加入到daemonPodsRunning
中記錄著該DamonSet在該Node上正在執行的非Failed的Pods,如果daemonPodsRunning
不止一個,則需要按照建立時間排序,將不是最早建立的其他所有DaemonSet Pods都加入到podsToDelete
中。
在nodeShouldRunDaemonPod
中呼叫simulate
模擬排程返回Pod和Node的匹配結果,根據algorithm.PredicateFailureReason
結果知道wantToRun,shouldSchedule,shouldContinueRunning的值。下面我們看看simulate中的排程邏輯。
// Predicates checks if a DaemonSet's pod can be scheduled on a node using GeneralPredicates
// and PodToleratesNodeTaints predicate
func Predicates(pod *v1.Pod, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
var predicateFails []algorithm.PredicateFailureReason
// If ScheduleDaemonSetPods is enabled, only check nodeSelector, nodeAffinity and toleration/taint match.
if utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) {
fit, reasons, err := checkNodeFitness(pod, nil, nodeInfo)
if err != nil {
return false, predicateFails, err
}
if !fit {
predicateFails = append(predicateFails, reasons...)
}
return len(predicateFails) == 0, predicateFails, nil
}
critical := kubelettypes.IsCriticalPod(pod)
fit, reasons, err := predicates.PodToleratesNodeTaints(pod, nil, nodeInfo)
if err != nil {
return false, predicateFails, err
}
if !fit {
predicateFails = append(predicateFails, reasons...)
}
if critical {
// If the pod is marked as critical and support for critical pod annotations is enabled,
// check predicates for critical pods only.
fit, reasons, err = predicates.EssentialPredicates(pod, nil, nodeInfo)
} else {
fit, reasons, err = predicates.GeneralPredicates(pod, nil, nodeInfo)
}
if err != nil {
return false, predicateFails, err
}
if !fit {
predicateFails = append(predicateFails, reasons...)
}
return len(predicateFails) == 0, predicateFails, nil
}
- 如果是啟用了
ScheduleDaemonSetPods
FeatureGate,則Predicate邏輯如下。這裡並沒有真正的完成排程,只是做了三個predicate檢查,最終的排程還是會交給default scheduler。default scheduler又是如何控制DaemonSet Pod和Node繫結關係的呢,先買個關子。- PodFitsHost: 檢查Pod.spec.nodeName非空時是否與Node Name匹配;
- PodMatchNodeSelector: 檢查Pod的NodeSelector和NodeAffinity是否與Node匹配;
- PodToleratesNodeTaints: 檢查Pod的NoExecute和NoSchedule型別的Toleration是否與Node Taint匹配。
- 如果是沒啟用
ScheduleDaemonSetPods
FeatureGate,則Predicate邏輯如下。這裡並沒有真正的完成排程,只是做了幾個predicate檢查,最終的排程還是會交給DaemonSet Controller。- PodToleratesNodeTaints:檢查Pod的NoExecute和NoSchedule型別的Toleration是否與Node Taint匹配。
- 如果是Critical DaemonSet Pod,則再進行EssentialPredicates,包括:
- PodFitsHost:檢查Pod.spec.nodeName非空時是否與Node Name匹配;
- PodFitsHostPorts:檢查DaemonSet Pods請求的協議&Host埠是否已經被佔用;
- PodMatchNodeSelector: 檢查Pod的NodeSelector和NodeAffinity是否與Node匹配;
- 如果不是Critical DaemonSet Pod,則再進行GeneralPredicates,
- PodFitsResources:檢查Node剩餘可分配資源是否能滿足Pod請求;
- PodFitsHost: 檢查Pod.spec.nodeName非空時是否與Node Name匹配;
- PodFitsHostPorts: 檢查DaemonSet Pods請求的協議&Host埠是否已經被佔用;
- PodMatchNodeSelector: 檢查Pod的NodeSelector和NodeAffinity是否與Node匹配;
Sync Nodes
前面通過podsShouldBeOnNode得到了nodesNeedingDaemonPods []string, podsToDelete []string, failedPodsObserved int
,接下來就該去建立和刪除對應的Pods了。
// syncNodes deletes given pods and creates new daemon set pods on the given nodes
// returns slice with erros if any
func (dsc *DaemonSetsController) syncNodes(ds *apps.DaemonSet, podsToDelete, nodesNeedingDaemonPods []string, hash string) error {
// We need to set expectations before creating/deleting pods to avoid race conditions.
dsKey, err := controller.KeyFunc(ds)
if err != nil {
return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
}
createDiff := len(nodesNeedingDaemonPods)
deleteDiff := len(podsToDelete)
if createDiff > dsc.burstReplicas {
createDiff = dsc.burstReplicas
}
if deleteDiff > dsc.burstReplicas {
deleteDiff = dsc.burstReplicas
}
dsc.expectations.SetExpectations(dsKey, createDiff, deleteDiff)
// error channel to communicate back failures. make the buffer big enough to avoid any blocking
errCh := make(chan error, createDiff+deleteDiff)
klog.V(4).Infof("Nodes needing daemon pods for daemon set %s: %+v, creating %d", ds.Name, nodesNeedingDaemonPods, createDiff)
createWait := sync.WaitGroup{}
// If the returned error is not nil we have a parse error.
// The controller handles this via the hash.
generation, err := util.GetTemplateGeneration(ds)
if err != nil {
generation = nil
}
template := util.CreatePodTemplate(ds.Namespace, ds.Spec.Template, generation, hash)
// Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
// and double with each successful iteration in a kind of "slow start".
// This handles attempts to start large numbers of pods that would
// likely all fail with the same error. For example a project with a
// low quota that attempts to create a large number of pods will be
// prevented from spamming the API service with the pod create requests
// after one of its pods fails. Conveniently, this also prevents the
// event spam that those failures would generate.
batchSize := integer.IntMin(createDiff, controller.SlowStartInitialBatchSize)
for pos := 0; createDiff > pos; batchSize, pos = integer.IntMin(2*batchSize, createDiff-(pos+batchSize)), pos+batchSize {
errorCount := len(errCh)
createWait.Add(batchSize)
for i := pos; i < pos+batchSize; i++ {
go func(ix int) {
defer createWait.Done()
var err error
podTemplate := &template
if utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) {
podTemplate = template.DeepCopy()
// The pod's NodeAffinity will be updated to make sure the Pod is bound
// to the target node by default scheduler. It is safe to do so because there
// should be no conflicting node affinity with the target node.
podTemplate.Spec.Affinity = util.ReplaceDaemonSetPodNodeNameNodeAffinity(
podTemplate.Spec.Affinity, nodesNeedingDaemonPods[ix])
err = dsc.podControl.CreatePodsWithControllerRef(ds.Namespace, podTemplate,
ds, metav1.NewControllerRef(ds, controllerKind))
} else {
err = dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[ix], ds.Namespace, podTemplate,
ds, metav1.NewControllerRef(ds, controllerKind))
}
if err != nil && errors.IsTimeout(err) {
// Pod is created but its initialization has timed out.
// If the initialization is successful eventually, the
// controller will observe the creation via the informer.
// If the initialization fails, or if the pod keeps
// uninitialized for a long time, the informer will not
// receive any update, and the controller will create a new
// pod when the expectation expires.
return
}
if err != nil {
klog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
dsc.expectations.CreationObserved(dsKey)
errCh <- err
utilruntime.HandleError(err)
}
}(i)
}
createWait.Wait()
// any skipped pods that we never attempted to start shouldn't be expected.
skippedPods := createDiff - batchSize
if errorCount < len(errCh) && skippedPods > 0 {
klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for set %q/%q", skippedPods, ds.Namespace, ds.Name)
for i := 0; i < skippedPods; i++ {
dsc.expectations.CreationObserved(dsKey)
}
// The skipped pods will be retried later. The next controller resync will
// retry the slow start process.
break
}
}
klog.V(4).Infof("Pods to delete for daemon set %s: %+v, deleting %d", ds.Name, podsToDelete, deleteDiff)
deleteWait := sync.WaitGroup{}
deleteWait.Add(deleteDiff)
for i := 0; i < deleteDiff; i++ {
go func(ix int) {
defer deleteWait.Done()
if err := dsc.podControl.DeletePod(ds.Namespace, podsToDelete[ix], ds); err != nil {
klog.V(2).Infof("Failed deletion, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
dsc.expectations.DeletionObserved(dsKey)
errCh <- err
utilruntime.HandleError(err)
}
}(i)
}
deleteWait.Wait()
// collect errors if any for proper reporting/retry logic in the controller
errors := []error{}
close(errCh)
for err := range errCh {
errors = append(errors, err)
}
return utilerrors.NewAggregate(errors)
}
- 每次刪除和建立的最大Pods個數分別為250個。
- 根據DaemonSet Object構建Pod Template,並且增加/更新以下Tolerations:
node.kubernetes.io/not-ready
| exist | NoExecutenode.kubernetes.io/unreachable
| exist | NoExecutenode.kubernetes.io/disk-pressure
| exist | NoSchedulenode.kubernetes.io/memory-pressure
| exist | NoSchedulenode.kubernetes.io/unschedulable
| exist | NoSchedulenode.kubernetes.io/network-unavailable
| exist | NoSchedule- 如果是Critical Pod,還會增加以下Tolerations:
node.kubernetes.io/out-of-disk
| exist | NoExecutenode.kubernetes.io/out-of-disk
| exist | NoSchedule
- 給Pod加上Label: controller-revision-hash=$DaemonSetControlelrHash
- 分批的建立DaemonSet Pods(按照1,2,4,8,...的batch size去Create DaemonSet Pods,防止大批量的一次性建立所有DaemonSet Pods時因同樣的錯誤導致失敗。對於建立失敗的Pods,注意更新expectations中的Adds值,每失敗一個就會將expectations.adds值減1。
- 如果啟用了ScheduleDaemonSetPods FeatureGate,則往Pod Tempalete中新增/更新
metadata.name=$NodeName
的NodeAffinity。通過這種方式,來實現通過default scheduler來排程DaemonSet Pods的目的。
- 如果啟用了ScheduleDaemonSetPods FeatureGate,則往Pod Tempalete中新增/更新
- 一次性的刪除podsToDelete的Pods。
DaemonSet的滾動更新
DaemonSet的滾動更新,跟Deployment的滾動更新略有不同,DaemonSet RollingUpdate只有MaxUnavailable這一個配置項,沒有MinAvailable。
// rollingUpdate deletes old daemon set pods making sure that no more than
// ds.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable pods are unavailable
func (dsc *DaemonSetsController) rollingrollingrollingUpdate(ds *apps.DaemonSet, hash string) error {
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
if err != nil {
return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
}
_, oldPods := dsc.getAllDaemonSetPods(ds, nodeToDaemonPods, hash)
maxUnavailable, numUnavailable, err := dsc.getUnavailableNumbers(ds, nodeToDaemonPods)
if err != nil {
return fmt.Errorf("Couldn't get unavailable numbers: %v", err)
}
oldAvailablePods, oldUnavailablePods := util.SplitByAvailablePods(ds.Spec.MinReadySeconds, oldPods)
// for oldPods delete all not running pods
var oldPodsToDelete []string
klog.V(4).Infof("Marking all unavailable old pods for deletion")
for _, pod := range oldUnavailablePods {
// Skip terminating pods. We won't delete them again
if pod.DeletionTimestamp != nil {
continue
}
klog.V(4).Infof("Marking pod %s/%s for deletion", ds.Name, pod.Name)
oldPodsToDelete = append(oldPodsToDelete, pod.Name)
}
klog.V(4).Infof("Marking old pods for deletion")
for _, pod := range oldAvailablePods {
if numUnavailable >= maxUnavailable {
klog.V(4).Infof("Number of unavailable DaemonSet pods: %d, is equal to or exceeds allowed maximum: %d", numUnavailable, maxUnavailable)
break
}
klog.V(4).Infof("Marking pod %s/%s for deletion", ds.Name, pod.Name)
oldPodsToDelete = append(oldPodsToDelete, pod.Name)
numUnavailable++
}
return dsc.syncNodes(ds, oldPodsToDelete, []string{}, hash)
}
- 根據最新的Hash值選出所有的OldPods;
- 計算那些
!available
及那些期望排程但還沒執行的Pods之和,作為numUnavailable。 - 將OldPods分為oldAvailablePods和oldUnavailablePods,將DeletionTimestamp為空的oldUnavailablePods加入到待刪除Pods列表(oldPodsToDelete)。
- 遍歷oldAvailablePods,逐個加入到oldPodsToDelete中,直到numUnavailable達到maxUnavailable為止,從oldAvailablePods加入到oldPodsToDelete的Pods最大個數為(maxUnavailable - 1)。
- 因此,oldPodsToDelete包括所有的DeletionTimestamp為空的oldUnavailablePods及最多(maxUnavailable - 1)個oldAvailablePods。
- 最後呼叫syncNodes開始刪除oldPodsToDelete中的DaemonSet Pods。
Node更新
Node Add事件很簡單,遍歷所有DaemonSets物件,呼叫nodeShouldRunDaemonPod計算出每個DaemonSet是否應該在該Node上啟動。如果要啟動,則把DaemonSet加入到Queue,由syncDaemonSet進行處理。
對於Node Update事件,需要判斷Update的欄位等,然後根據情況決定是否要加入到Queue進行syncDaemonSet。
func (dsc *DaemonSetsController) updateNode(old, cur interface{}) {
oldNode := old.(*v1.Node)
curNode := cur.(*v1.Node)
if shouldIgnoreNodeUpdate(*oldNode, *curNode) {
return
}
dsList, err := dsc.dsLister.List(labels.Everything())
if err != nil {
klog.V(4).Infof("Error listing daemon sets: %v", err)
return
}
// TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too).
for _, ds := range dsList {
_, oldShouldSchedule, oldShouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(oldNode, ds)
if err != nil {
continue
}
_, currentShouldSchedule, currentShouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(curNode, ds)
if err != nil {
continue
}
if (oldShouldSchedule != currentShouldSchedule) || (oldShouldContinueRunning != currentShouldContinueRunning) {
dsc.enqueueDaemonSet(ds)
}
}
}
- 如果Node Condition沒有發生變更,則不能忽略該Node變更事件。
- 除了Node Condition和ResourceVersion之外,如果新舊Node物件不一致,也不能忽略該變更事件。
- 對於不能忽略的變更,則分別對於oldNode,currentNode呼叫nodeShouldRunDaemonPod計算ShouldSchedule、ShouldContinueRunning是否一致,只要ShouldSchedule或者ShouldContinueRunning發生變更,則將該DaemonSet Object入佇列進入syncDaemonSet進行處理。
DaemonSet Controller主體邏輯
總結
本文主要對DaemonSet的結構、建立、同步、排程、滾動更新幾個方面進行了原始碼分析,在生產環境中使用DaemonSet進行大規模部署使用之前,加深這些瞭解是有幫助的。下一篇部落格,我將會從一些實際問題出發,從使用者角度分析DaemonSet的若干行為。比如,Node Taint變更後DaemonSet的行為、DaemonSet刪除時異常導致Hang住的原因及解決辦法、Node NotReady時DamonSet Pods會