kube-scheduler原始碼分析
背景知識
簡介
kubernetes原生排程器與其說是一個排程器,不如說是一個pod的放置器,如同我們整理家務一樣,把各種東西放進收納盒內。而偶爾遇到收納盒放滿了,就拿出(驅逐)一些東西(pod)。
並且我們比較笨,看不到這些東西(pod)彼此的聯絡,如果你要求我所有手辦放一起,所有零食放一起,但又不告訴我所有手辦放哪兒(給個眼神自己體會?),我的腦子就轉不過來了。我只能一件一件東西放置,雖然收納的時候我能看到每個盒子裡都有什麼,但我並不知道還有哪些東西等著我整理。
這就是這樣一個笨笨的,但十分高效的家務整理者的自白?
此外,在閱讀kubernetes的原始碼之前,建議先學習client-go的原始碼,尤其是informer的機制。這是kubernetes架構中非常重要的內容,負責各個元件與apiserver的資源同步與事件處理。informer的知識可以參考
排程流程
參考社群中排程器的設計,可以把排程器的執行流程大致分為如下階段:
- 選擇一個pod
- 預選過濾
- 優選打分
- 選擇分數最高的節點
- 繫結
演進
受限於容器基於namespace
和cgroup
隔離資源的機制,整個kubernetes在多租戶方面都無法實現如openstack般的隔離力度。排程器也一樣,在多租戶方面只能依賴於namespace
和quota
等機製做軟隔離。
並且由於排程器的排程粒度是pod
,無法對一批任務做統籌的排程,而很多場景下,如:
- AI訓練任務
- batch任務
- 大資料任務
等時常要求系統對一批任務做統籌的處理,在批處理排程和多租戶上的不足逐漸成為相關行業從業者的痛點。為此,社群推出了
kube-batch is a batch scheduler for Kubernetes,providing mechanisms for applications which would like to run batch jobs leveraging Kubernetes. It builds upon a decade and a half of experience on running batch workloads at scale using several systems,combined with best-of-breed ideas and practices from the open source community.
關於kube-batch,與衍生出volcano等排程器,有空在另一篇文章裡解析。
社群的方案同時推動著upstream方案的迭代。在2018年,社群就propose了一個scheduler enhancement的提案,融合了批處理排程器的設計,將功能抽象plugin
。
The scheduling framework is a new set of "plugin" APIs being added to the existing Kubernetes Scheduler. Plugins are compiled into the scheduler,and these APIs allow many scheduling features to be implemented as plugins,while keeping the scheduling "core" simple and maintainable.
該方案若有後續進展,我也會持續跟進。
原始碼分析
我不會告訴你是因為用1.15版本才不看最新的程式碼的
時間 | 版本 |
---|---|
2019.12 | 1.15 |
原始碼分析不會分析各種通用庫的詳細用法,如cobra,pflag,klog等等。
原始碼分為三個層次:
- cmd/kube-scheduler/scheduler.go:主函式
- pkg/scheduler/scheduler.go:在具體排程演演算法前的事件處理框架
- pkg/scheduler/core/generic_scheduler.go:排程演演算法
scheduler.go與整個排程器框架
從cmd/kube-scheduler/scheduler.go開始,經過一串trivial的pflag,cobra的程式碼,在cmd/kube-scheduler/app/server.go中我們找到runCommand()
函式,這段程式碼只是排程器執行的準備工作,做了如下的事情
- 驗證引數是否合法
- 建立
stopCh
作為排程器終止的訊號 - 呼叫
c,err := opts.Config()
生成排程器配置 - 將未填充的引數填充預設值(其中還與apiserver的鑑權有關)
-
applyFeatureGates()
預設關閉或開啟一些預選與優選演演算法。這一塊目前來說非常凌亂,隨著k8s版本變化而變化,感興趣的童鞋可以參考kube-scheduler reference - 進入本節的主角
Run()
函式。
簡單的說一下一些小細節,在opts.Config()
中c := &schedulerappconfig.Config{}
初始化了config結構體,值得注意的是InformerFactory informers.SharedInformerFactory
和PodInformer coreinformers.PodInformer
是獨立的。在後面的程式碼中可以清晰的看到排程器需要監控如下資源:
- pods
- nodes
- pv/pvc/storageClass
- replicasController/replicasSet/statefulSet/service
- PDB
然後程式碼中做了o.ApplyTo(c)
,即把options中的配置轉換為config。這些程式碼比較trivial,多用到了一些更底層的庫,值得注意的是在parse configfile的時候,kubernetes用到了runtime.DecodeInto(kubeschedulerscheme.Codecs.UniversalDecoder(),data,configObj)
,這個universalDecoder是kubernetes自己封裝的反序列化庫,事實上如果你用json.Unmarshal()
也是可行的,但是畢竟工具都提供了,為啥不用呢。
之後createClients
建立了三個客戶端:leaderElectionClient和eventClient顧名思義,在建立client的時候,我們能看到用法與之前的略有不同,通常我們只需要clientset.NewForConfig(config)
,這裡多了一步AddUserAgent(kubeConfig,"scheduler")
。查閱註釋可以發現
UserAgent is an optional field that specifies the caller of this request.
好的,說明瞭這個訊息是scheduler發出來的。獲得這些clientset之後,初始化了recorder用來記錄event,之後特意把podInformer和其他的informer分開了,並且把resync設為了0。那麼為什麼要把podInformer單獨列出來呢,說到底這些informer底層都是共享一份threadSafeStore的呀?
c.InformerFactory = informers.NewSharedInformerFactory(client,0)
c.PodInformer = factory.NewPodInformer(client,0)
複製程式碼
看到NewPodInformer內部,原來是建立podInformer的時候額外加了一個selector
,只監聽非succeeded或者failed的pod,故listWatch函式和其他informer有所不同。
然後我們直接跳到Run()
。這個函式是這部分的主角,主要分為以下部分:
- 新建一個排程器物件,其中包括新建各種informer
- 初始化事件廣播、健康檢查
- 啟動各個informer
- 等待informer快取與apiserver同步完成
- 排程器開始執行
有個小細節要留意一下,如果顯示的生成了informer例項,需要在新的協程中執行,而informerFactory可以直接start()
。看一下sched長什麼樣子
sched,err := scheduler.New(cc.Client,cc.InformerFactory.Core().V1().Nodes(),cc.PodInformer,cc.InformerFactory.Core().V1().PersistentVolumes(),cc.InformerFactory.Core().V1().PersistentVolumeClaims(),cc.InformerFactory.Core().V1().ReplicationControllers(),cc.InformerFactory.Apps().V1().ReplicaSets(),cc.InformerFactory.Apps().V1().StatefulSets(),cc.InformerFactory.Core().V1().Services(),cc.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),cc.InformerFactory.Storage().V1().StorageClasses(),cc.Recorder,cc.ComponentConfig.AlgorithmSource,stopCh,framework.NewRegistry(),cc.ComponentConfig.Plugins,cc.ComponentConfig.PluginConfig,scheduler.WithName(cc.ComponentConfig.SchedulerName),scheduler.WithHardPodAffinitySymmetricWeight(cc.ComponentConfig.HardPodAffinitySymmetricWeight),scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),scheduler.WithBindTimeoutSeconds(*cc.ComponentConfig.BindTimeoutSeconds))
複製程式碼
一大串的初始化的引數,包括這麼幾類
- informer
- recorder記錄event
- 演演算法來源:預設是"Default Provider"
- framework:這個非常重要,之後再說
- plugin和pluginConfig:這個對應著之前提到的scheduler的plugin的改進
- options:大多是一些輔助函式,這裡面需要注意的有
PercentageOfNodesToScore
,其motivation是隨著叢集規模增大,排程器每輪排程的時延線性增加,為了加速預選,故預選中只要有百分之多少的節點可用,就結束預選。
接下來看一段和context包有關的程式碼:
// Prepare a reusable runCommand function.
run := func(ctx context.Context) {
sched.Run()
<-ctx.Done()
}
ctx,cancel := context.WithCancel(context.TODO()) // TODO once Run() accepts a context,it should be used here
defer cancel()
go func() {
select {
case <-stopCh:
cancel()
case <-ctx.Done():
}
}()
複製程式碼
先看run函式,呼叫了sched.Run()
,這個函式將不停在協程中執行scheduleOne
,知道接收到stopChannel。此同時,用ctx.Done()阻塞住run的執行,直到ctx.Done()可以讀取,run才會返回。一旦run()結束,排程器也停止運作,接下來就可以進行cobra中定義的一些後處理了。那麼關鍵就在於context的邏輯了。ctx,cancel := context.WithCancel(context.TODO())
,根據註釋
WithCancel returns a copy of parent with a new Done channel.
得知這個parent context就是context.TODO()
,再看TODO的用法,
TODO returns a non-nil,empty Context. Code should use context.TODO when it's unclear which Context to use or it is not yet available (because the surrounding function has not yet been extended to accept a Context parameter).
結合上面的run :=
函式的賦值,即可以理解:由於run函式剛剛完成賦值,還沒有真正接受context引數,故這裡先建一個TODO的context。回到WithCancel
The returned context's Done channel is closed when the returned cancel function is called or when the parent context's Done channel is closed,whichever happens first.
Canceling this context releases resources associated with it,so code should call cancel as soon as the operations running in this Context complete.
它返回的ctx是這個context.TODO()
的子context,之後在協程中select,如果stopCh可以讀取,呼叫cancel()
函式。這個cancel函式在WithCancel()
中返回,它的作用是把context所有的children關閉並移除。如果ctx.Done()可以讀取,將結束run()
的執行,從而結束排程器。看一下cancel()的用法,在func (c *cancelCtx) cancel
中,關鍵程式碼是
if c.done == nil {
c.done = closedchan
} else {
close(c.done)
}
複製程式碼
c.done是什麼呢,看Done()的細節,如果c.done不為空,則返回c.done。也就是說我們可以粗略的認為這裡面關閉了Done()要返回的channel。所以呼叫cancel()之後,由於從一個關閉中的channel中讀取資料會出錯,故同樣會結束run()函式。故run(ctx)
就不停的執行sched.scheduleOne
,進行一輪一輪的排程。
總結一下到現在為止,我們直到了排程器的工作方式:通過informer獲取一系列的資源,包括所有正在執行或pending的pod。然後不停的執行一輪一輪的排程(每一輪之間沒有空隙時間)。接下來看一下排程器究竟長什麼樣。
scheduler各部分解析
在New()
中初始化了一個排程器的結構體type Scheduler
,裡面只包含一個config *factory.Config
,這就為我們魔改與拓展提供了便利。config包含的內容裡,我們要關注cache,algorithm,framework以及schedulingQueue。
// Config is an implementation of the Scheduler's configured input data.
// TODO over time we should make this struct a hidden implementation detail of the scheduler.
type Config struct {
// It is expected that changes made via SchedulerCache will be observed
// by NodeLister and Algorithm.
SchedulerCache internalcache.Cache
NodeLister algorithm.NodeLister
Algorithm core.ScheduleAlgorithm
GetBinder func(pod *v1.Pod) Binder
// PodConditionUpdater is used only in case of scheduling errors. If we succeed
// with scheduling,PodScheduled condition will be updated in apiserver in /bind
// handler so that binding and setting PodCondition it is atomic.
PodConditionUpdater PodConditionUpdater
// PodPreemptor is used to evict pods and update 'NominatedNode' field of
// the preemptor pod.
PodPreemptor PodPreemptor
// Framework runs scheduler plugins at configured extension points.
Framework framework.Framework
// NextPod should be a function that blocks until the next pod
// is available. We don't use a channel for this,because scheduling
// a pod may take some amount of time and we don't want pods to get
// stale while they sit in a channel.
NextPod func() *v1.Pod
// WaitForCacheSync waits for scheduler cache to populate.
// It returns true if it was successful,false if the controller should shutdown.
WaitForCacheSync func() bool
// Error is called if there is an error. It is passed the pod in
// question,and the error
Error func(*v1.Pod,error)
// Recorder is the EventRecorder to use
Recorder record.EventRecorder
// Close this to shut down the scheduler.
StopEverything <-chan struct{}
// VolumeBinder handles PVC/PV binding for the pod.
VolumeBinder *volumebinder.VolumeBinder
// Disable pod preemption or not.
DisablePreemption bool
// SchedulingQueue holds pods to be scheduled
SchedulingQueue internalqueue.SchedulingQueue
}
複製程式碼
回到New()
,跳過一些預設值的設定,首先看到生成了一個configurator
,傳進去的引數都是New()
的引數,在內部我們能看到它初始化了排程器的快取、framework、queue,相當於在config的基礎上增加了一個更具體的執行配置。
之後,根據algorithm provider和policy,進入CreateFromProvider
與CreateFromConfig
。要重點注意這兩個函式最後都呼叫了CreateFromKeys
,它最終返回一個config,這裡很容易忽略Error
欄位,事實上這個欄位看起來只是一個錯誤處理函式,實際上在排程器機制中佔了重要的部分。後面我們會額外的解析這個Error
欄位。
return &Config{
SchedulerCache: c.schedulerCache,// The scheduler only needs to consider schedulable nodes.
NodeLister: &nodeLister{c.nodeLister},Algorithm: algo,GetBinder: getBinderFunc(c.client,extenders),PodConditionUpdater: &podConditionUpdater{c.client},PodPreemptor: &podPreemptor{c.client},Framework: c.framework,WaitForCacheSync: func() bool {
return cache.WaitForCacheSync(c.StopEverything,c.scheduledPodsHasSynced)
},NextPod: internalqueue.MakeNextPodFunc(c.podQueue),Error: MakeDefaultErrorFunc(c.client,c.podQueue,c.schedulerCache,c.StopEverything),StopEverything: c.StopEverything,VolumeBinder: c.volumeBinder,SchedulingQueue: c.podQueue,},nil
複製程式碼
獲取這個config之後,就可以呼叫sched := NewFromConfig(config)
生成排程器例項了。當然,還需要註冊回撥函式,見AddAllEventHandlers(sched,options.schedulerName,nodeInformer,podInformer,pvInformer,pvcInformer,serviceInformer,storageClassInformer)
,它負責informer與schedulerCache的聯動。
cache
在進入internalcache.New(30*time.Second,stopEverything)
之前先看一個golang獨特的用法:stopEverything = wait.NeverStop
,而var NeverStop <-chan struct{} = make(chan struct{})
,從一個初始化的channel中讀取資料,即字面意思上的Never stop。在New中幹了兩件事情,首先newSchedulerCache(30s,1s,stop)
,規定30s為assumed pod失效時間。之後cache.run()
。
New returns a Cache implementation. It automatically starts a go routine that manages expiration of assumed pods. "ttl" is how long the assumed pod will get expired.
那麼問題來了,什麼是assumed pod?直接說結論:assumed pod是pod在bind行為真正發生前的狀態,與bind操作非同步,從而讓排程器不用等待耗時的bind操作返回結果,提升排程器的效率。
cache包含nodes和nodeTree。nodes是一個雙向連結串列,每個元素包含節點資訊,pod資訊,埠資訊,資源資訊等等。nodeTree則是樹結構,每個節點的key是zone的名稱,value是每個zone的節點名稱。對zone不瞭解的童鞋可以參考multiple-zones。簡單來說,nodeTree和排程策略有關,對我們理解排程機制其實不影響。
cache.run()
以1s為間隔不停的排程cleanupExpiredAssumedPods
,它的作用是監測cache中每個assumed pod是否完成了繫結。如果沒有完成繫結,則不管,對完成繫結,且已經超時的pod,則從快取中剔除,並更新cache的各個狀態。
framework
framework.NewFramework(args.Registry,args.Plugins,args.PluginConfig)
基本上把kube-batch的那套邏輯搬到了原生排程器中,通過waitingPods
來實現gang scheduling,通過plugins.QueueSort
支援多租戶。
未來可以觀望一下這樣的extension是否被社群廣泛接受。
queue
internalqueue.NewSchedulingQueue(stopEverything,framework)
新建一個排程佇列。我們暫時不考慮framework,所以可以先簡單的看成nil。schedulingQueue實質上是一個priorityQueue
// PriorityQueue implements a scheduling queue.
// The head of PriorityQueue is the highest priority pending pod. This structure
// has three sub queues. One sub-queue holds pods that are being considered for
// scheduling. This is called activeQ and is a Heap. Another queue holds
// pods that are already tried and are determined to be unschedulable. The latter
// is called unschedulableQ. The third queue holds pods that are moved from
// unschedulable queues and will be moved to active queue when backoff are completed.
type PriorityQueue struct {
stop <-chan struct{}
clock util.Clock
// podBackoff tracks backoff for pods attempting to be rescheduled
podBackoff *PodBackoffMap
lock sync.RWMutex
cond sync.Cond
// activeQ is heap structure that scheduler actively looks at to find pods to
// schedule. Head of heap is the highest priority pod.
activeQ *util.Heap
// podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff
// are popped from this heap before the scheduler looks at activeQ
podBackoffQ *util.Heap
// unschedulableQ holds pods that have been tried and determined unschedulable.
unschedulableQ *UnschedulablePodsMap
// nominatedPods is a structures that stores pods which are nominated to run
// on nodes.
nominatedPods *nominatedPodMap
// schedulingCycle represents sequence number of scheduling cycle and is incremented
// when a pod is popped.
schedulingCycle int64
// moveRequestCycle caches the sequence number of scheduling cycle when we
// received a move request. Unscheduable pods in and before this scheduling
// cycle will be put back to activeQueue if we were trying to schedule them
// when we received move request.
moveRequestCycle int64
// closed indicates that the queue is closed.
// It is mainly used to let Pop() exit its control loop while waiting for an item.
closed bool
}
複製程式碼
prioirityQueue通過堆排序實現,在pkg/scheduler/util/heap.go中提供了堆排序的實現。queue裡面有三種佇列
- activeQ:等待排程的pod
- podBackoffQ:如果pod嘗試排程,但是排程失敗,將成為backoff狀態。這一點和unschedulable有點像,但backoff允許pod延遲一段時間後重試,延遲時間與重試次數有關。有點像controller中的workqueue的機制。backoffQ避免了高優先順序但不可排程的的任務不停重試阻塞其他任務的問題。
- unschedulableQ:pod嘗試排程失敗後,被判斷為不可排程的pod
要注意IDE是:activeQ是一個heap,而unschedulableQ是一個UnschedulablePodsMap
// UnschedulablePodsMap holds pods that cannot be scheduled. This data structure
// is used to implement unschedulableQ.
type UnschedulablePodsMap struct {
// podInfoMap is a map key by a pod's full-name and the value is a pointer to the PodInfo.
podInfoMap map[string]*framework.PodInfo
keyFunc func(*v1.Pod) string
// metricRecorder updates the counter when elements of an unschedulablePodsMap
// get added or removed,and it does nothing if it's nil
metricRecorder metrics.MetricRecorder
}
複製程式碼
在NewPriorityQueue
中,能看到這樣的呼叫unschedulableQ: newUnschedulablePodsMap(metrics.NewUnschedulablePodsRecorder())
以及pq.podBackoffQ = util.NewHeapWithRecorder(podInfoKeyFunc,pq.podsCompareBackoffCompleted,metrics.NewBackoffPodsRecorder())
。這兩個用法都和prometheus有關係,可以用prometheus記錄這兩個佇列的變化情況。
一個queue之內的pod優先順序的比較演演算法為:以優先順序為準,優先順序相同時,建立時間早的優先順序更高。多個Q之間的關係為:排程時從activeQ中取出pod,backoff的pod和unschedulablePod會在某些時候加入activeQ,具體可見
func (p *PriorityQueue) run() {
go wait.Until(p.flushBackoffQCompleted,1.0*time.Second,p.stop)
go wait.Until(p.flushUnschedulableQLeftover,30*time.Second,p.stop)
}
複製程式碼
queue實現的關鍵方法有:
- Add:將pod加入activeQ中,並檢查它是否已經在unschedulableQ中,同時如果它在backoffQ中,將它從backoffQ中刪除。之後把它加入nominatedPods中。該函式只能在新pod入隊時使用。
- AddIfNotPresent:只有pod不在任何一個Queue中,才將它加入activeQ,否則報錯,不做任何處理。
- AddUnschedulableIfNotPresent:將不在任何佇列的不可排程的pod加入unschedulableQ中
- Pop: 將activeQ中的首個元素取出,當activeQ為空時,阻塞直到其不為空。取出元素後,scheduleCycle值加1
- flushBackoffQCompleted:將所有在backoffQ中的超過該pod該輪次的backoff時間的pod加入activeQ中
- flushUnschedulableQLeftover:將所有在unschedualbleQ中停留時間超過閾值的pod重新加入activeQ中,該閾值寫死為60秒
- Update: 發生在pod有update操作時,如果在activeQ或者backoffQ中,則更新資訊,如果pod在unschedualbeQ中,則加入activeQ
有個很tricky的點在於,如果我們通讀了所有正確的處理邏輯,會發現沒有地方用到了unschedulableQ和backoffQ。那麼究竟是哪兒用到了這兩個Queue呢?答案在錯誤處理裡面。後面我們在通讀了正確的邏輯之後,會詳細的看錯誤處理。
現在我們先關注另一個問題,activeQ中的資料從哪兒來呢?之前已經說到了初始化排程器的時候為各個informer註冊了回撥函式,在pkg/scheduler/eventhandlers.go中,我們重點只關注podInformer,其他的後面我們會再次梳理。
// scheduled pod cache
podInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *v1.Pod:
return assignedPod(t)
case cache.DeletedFinalStateUnknown:
if pod,ok := t.Obj.(*v1.Pod); ok {
return assignedPod(pod)
}
utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T",obj,sched))
return false
default:
utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T",sched,obj))
return false
}
},Handler: cache.ResourceEventHandlerFuncs{
AddFunc: sched.addPodToCache,UpdateFunc: sched.updatePodInCache,DeleteFunc: sched.deletePodFromCache,)
// unscheduled pod queue
podInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *v1.Pod:
return !assignedPod(t) && responsibleForPod(t,schedulerName)
case cache.DeletedFinalStateUnknown:
if pod,ok := t.Obj.(*v1.Pod); ok {
return !assignedPod(pod) && responsibleForPod(pod,schedulerName)
}
utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T",Handler: cache.ResourceEventHandlerFuncs{
AddFunc: sched.addPodToSchedulingQueue,UpdateFunc: sched.updatePodInSchedulingQueue,DeleteFunc: sched.deletePodFromSchedulingQueue,)
複製程式碼
這裡對pod分為兩類,一類是已經排程了的,限於v1版本的podPhase的限制,所以我們只能統一歸類為running(即使它是Terminating)。這時候會直接與schedulerCache同步資源。如在AddFunc裡呼叫了addPodToCache
,裡面又呼叫了sched.config.SchedulerCache.AddPod(pod)
。對於沒有排程過的pod,如pending狀態,則與schedulingQueue同步。如sched.config.SchedulingQueue.Add
。
留一個小問題(之前已經說明過):succeeded與failed的pod去哪兒了呢?
algorithm
provider和policy都和演演算法有關。provider對我們比較重要,在createFromProvider()
中能看到,它載入了預選函式、優選函式等。這些策略的具體預設配置在pkg/scheduler/algorithmprovider/defaults/defaults.go中,預設的預選函式有
- NoVolumeZoneConflictPred
- MaxEBSVolumeCountPred
- MaxGCEPDVolumeCountPred
- MaxAzureDiskVolumeCountPred
- MaxCSIVolumeCountPred
- MatchInterPodAffinityPred
- NoDiskConflictPred
- GeneralPred
- CheckNodeMemoryPressurePred
- CheckNodeDiskPressurePred
- CheckNodePIDPressurePred
- CheckNodeConditionPred
- PodToleratesNodeTaintsPred
- CheckVolumeBindingPred
預設的優選函式有
- SelectorSpreadPriority
- InterPodAffinityPriority
- LeastRequestedPriority
- BalancedResourceAllocation
- NodePreferAvoidPodsPriority
- NodeAffinityPriority
- TaintTolerationPriority
- ImageLocalityPriority
不考慮錯誤處理的排程流程
首先我們先不考慮錯誤處理,先看最簡單的流程。由於排程是一輪一輪的,每一輪都會呼叫scheduleOne()
函式。如果我們忽略framework,它首先從activeQ中取出一個pod,如果pod的DeletionTimestamp
不為空,則直接返回,因為這個pod已經被下達了刪除命令。之後將呼叫sched.schedule
嘗試模擬放置該pod。如果模擬失敗,則嘗試搶佔sched.preempt
。如果模擬成功,將pod置為assumed狀態,並執行reserve操作。reserve的定義和kube-batch中的reserve有所不同
resources on a node are being reserved for a given Pod. This happens before the scheduler actually binds the pod to the Node,and it exists to prevent race conditions while the scheduler waits for the bind to succeed.
話雖如此,然而目前各個plugin還沒有進行開發,所以我們暫時可以忽略這句reserve。assume之後,開始嘗試非同步繫結。忽略所有的plugin相關的程式碼,核心部分是sched.bind()
。bind的意思是真正將pod交給節點的kubelet進行初始化。繫結結果返回後,將通知cache該pod的繫結已經結束。如果繫結失敗,從快取中清除該pod。
schedule
首先進行一些基本檢查podPassesBasicChecks
,其實檢查的是如果使用了pvc,該pvc是否存在;pvc是否正在刪除。
然後會做一次node的list:nodes,err := nodeLister.List()
,這次操作是從nodeInformer的快取中取出的,這份快取和scheduler cache不同,它通過list watch機制與etcd保持同步;而scheduler cache需要我們手動維護。在獲取所有node之後,做了一次g.snapshot()
。它將遍歷cache的所有node,更新node的資訊。那麼為什麼不直接在Informer的cache中操作呢?因為首先,Informer的cache和etcd有關係,如果直接修改cache,將很難管理整個叢集的節點狀態;其次,由於bind操作是非同步的,在bind結果返回前,我們並不知道節點的最終狀態,所以需要在cache中記錄,以維持排程器的正常運轉;真實的節點資訊則等待informer的同步。
之後做了預選g.findNodesThatFit(pod,nodes)
。整體的策略是對每個node,引用之前定義的預選函式進行篩選,選出可以排程的節點。通過workqueue.ParallelizeUntil(ctx,16,int(allNodes),checkNode)
,兵分16路進行各節點的預選,即每個節點執行一次checkNode
。我們仔細的看一下用法。
// ParallelizeUntil is a framework that allows for parallelizing N
// independent pieces of work until done or the context is canceled.
func ParallelizeUntil(ctx context.Context,workers,pieces int,doWorkPiece DoWorkPieceFunc) {
var stop <-chan struct{}
if ctx != nil {
stop = ctx.Done()
}
toProcess := make(chan int,pieces)
for i := 0; i < pieces; i++ {
toProcess <- i
}
close(toProcess)
if pieces < workers {
workers = pieces
}
wg := sync.WaitGroup{}
wg.Add(workers)
for i := 0; i < workers; i++ {
go func() {
defer utilruntime.HandleCrash()
defer wg.Done()
for piece := range toProcess {
select {
case <-stop:
return
default:
doWorkPiece(piece)
}
}
}()
}
wg.Wait()
}
複製程式碼
定義stopCh為ctx.Done()。這個ctx在上一層呼叫中定義為ctx,cancel := context.WithCancel(context.Background())
。context包的用法在informer機制中已經講過了,這裡不再贅述。有個問題要說明一下,為什麼要加一個context呢?之前有講到過為了加速預選,選出百分之多少的節點即可,在checkNode中,如果判斷出已經選出這麼多的節點,將執行cancel()
,此時由於接收到stopCh,將返回結果。在ParallelizeUntil
內部,初始化stopCh之後,初始化toProcess,這是一個buffer長度為節點數量的channel,並將channel填滿。之後新建一個waitGroup,等待這16個workers處理完所有的任務或是接收到退出訊號。每個worker會執行checkNode
函式。在checkNode中,我們仔細看一下podFitsOnNode
for i := 0; i < 2; i++ {
metaToUse := meta
nodeInfoToUse := info
if i == 0 {
podsAdded,metaToUse,nodeInfoToUse = addNominatedPods(pod,meta,info,queue)
} else if !podsAdded || len(failedPredicates) != 0 {
break
}
for _,predicateKey := range predicates.Ordering() {
var (
fit bool
reasons []predicates.PredicateFailureReason
err error
)
//TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metric
if predicate,exist := predicateFuncs[predicateKey]; exist {
fit,reasons,err = predicate(pod,nodeInfoToUse)
if err != nil {
return false,[]predicates.PredicateFailureReason{},err
}
if !fit {
// eCache is available and valid,and predicates result is unfit,record the fail reasons
failedPredicates = append(failedPredicates,reasons...)
// if alwaysCheckAllPredicates is false,short circuit all predicates when one predicate fails.
if !alwaysCheckAllPredicates {
klog.V(5).Infoln("since alwaysCheckAllPredicates has not been set,the predicate " +
"evaluation is short circuited and there are chances " +
"of other predicates failing as well.")
break
}
}
}
}
}
複製程式碼
很神奇的是重複了兩次。區別在於第一次會新增nominated pods,所謂nominated意味著pod被指定一定要排程到某些節點,更具體的說和搶佔有關。註釋裡解釋的很清楚:
We run predicates twice in some cases. If the node has greater or equal priority nominated pods,we run them when those pods are added to meta and nodeInfo. If all predicates succeed in this pass,we run them again when these nominated pods are not added. This second pass is necessary because some predicates such as inter-pod affinity may not pass without the nominated pods. If there are no nominated pods for the node or if the first run of the predicates fail,we don't run the second pass.
之後對預選函式進行排序,排序的目的是讓複雜度低的預選函式優先進行,以減少預選時延。
預選完成後,進行優選PrioritizeNodes()
,在pkg/scheduler/core/generic_scheduler.go。簡單來說,優選的原理是在每個pod的優選中,對第n個節點,第k個策略,計算score(n,k)。最終計算score的加權合,作為每個節點應用所有優選策略後的總得分。
計算的時候用的是mapreduce的模式,map操作為results[i][index],err = priorityConfigs[i].Map(pod,nodeInfo)
,reduce操作為priorityConfigs[index].Reduce(pod,nodeNameToInfo,results[index])
。map和reduce函式在註冊優選函式的時候制定了。預設的函式在pkg/scheduler/algorithmprovider/defaults/register_priorities.go中。有的策略定義了MapReduceFunction
,有的只定義了Function
。所以我們可以在程式碼中看到,priorityConfigs[index].Function
這樣的計算方式。這一段的原始碼就補貼了,邏輯比較簡單。
優選結束之後,選出一個節點進行後續的繫結。目前host,err := g.selectHost(priorityList)
的策略非常簡單,選擇分最高的節。如果有多個節點打分相同,則kubernetes定義了一個選擇的演演算法,這個演演算法也比較trivial,不展開討論了。
preempt
排程器預設不允許搶佔,不過我們仍然簡單的看一下搶佔的機制。搶佔的入口在preempt()
。如果允許搶佔:
- 首先獲取排程失敗的這個pod的最新資料。
- 執行
sched.config.Algorithm.Preempt(preemptor,sched.config.NodeLister,scheduleErr)
獲取要排程到的節點,要剔除的pod的列表,記為victims - 更新該pod的nominatedNodeName到etcd中。
- 刪除victims中所有pod
此時,由於etcd中的pod有變化,pod由informer的eventHandler處理後又加入了activeQueue,pod得以重新排程。需要注意的是,雖然進行了搶佔,驅逐了一些pod,並標記了nominated node,但是並不保證一定排程到nominated node上,還是得看優選結果而定。
assume
assume一個pod意味著在將這個pod的nodeName設為通過預選、優選、節點選擇這些步驟選擇出的節點的名稱,並加入cache中。
func (cache *schedulerCache) AssumePod(pod *v1.Pod) error {
key,err := schedulernodeinfo.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
if _,ok := cache.podStates[key]; ok {
return fmt.Errorf("pod %v is in the cache,so can't be assumed",key)
}
cache.addPod(pod)
ps := &podState{
pod: pod,}
cache.podStates[key] = ps
cache.assumedPods[key] = true
return nil
}
複製程式碼
assume完畢之後,開始非同步的進行bind。
bind
跳過一堆的plugin,我們直接找到這一段bind的邏輯:
err := sched.bind(assumedPod,&v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace,Name: assumedPod.Name,UID: assumedPod.UID},Target: v1.ObjectReference{
Kind: "Node",Name: scheduleResult.SuggestedHost,})
複製程式碼
bind的操作通過err := sched.config.GetBinder(assumed).Bind(b)
完成。如果還記得scheduler初始化的過程的話,有一句GetBinder: getBinderFunc(c.client,extenders)
。而在這個函式裡面我們又能找到defaultBinder := &binder{client}
。也就是說,預設情況下,其實GetBinder就返回了一個clientset,它將向apiserver傳送bind請求。apiserver將處理這個請求,傳送到節點的kubelet,由kubelet進行後續的操作,並返回結果。如果繫結失敗,執行sched.config.SchedulerCache.ForgetPod(assumed)
。forget操作將從cache中移除該pod。
總結
看完了不考慮錯誤處理的排程,我們可以總結出預設排程器的流程其實就是不停的從activeQ中取出pod,做預選、優選後取出分數最高的節點,進行非同步繫結。如果繫結失敗,則從快取中丟棄該pod。
加上錯誤處理呢?
看到這兒我們會發現,unschedulableQ和backoffQ根本沒有用到過。原因在於剛才我們分析的時候只分析了正確的邏輯,沒有管錯誤處理。如果還能想起來初始化scheduler的操作,會發現裡面很不顯眼的註冊了一個MakeDefaultErrorFunc
。
return &Config{
SchedulerCache: c.schedulerCache,nil
複製程式碼
這個Error()在哪兒用到了呢?比如在recordSchedulingFailure
中就呼叫了sched.config.Error(pod,err)
。這個真的有點隱蔽,非常容易不小心就漏過去了。我們再找recordSchedulingFailure
哪兒用到了?在assume,schedule,bind等等都呼叫了!不得不吐槽命名,命名寫的是record,實際上偷偷摸摸的把任務加進backoffQ或者unschedulableQ……
找到pkg/scheduler/factory/factory.go的MakeDefaultErrorFunc
// MakeDefaultErrorFunc construct a function to handle pod scheduler error
func MakeDefaultErrorFunc(client clientset.Interface,podQueue internalqueue.SchedulingQueue,schedulerCache internalcache.Cache,stopEverything <-chan struct{}) func(pod *v1.Pod,err error) {
return func(pod *v1.Pod,err error) {
if err == core.ErrNoNodesAvailable {
klog.V(4).Infof("Unable to schedule %v/%v: no nodes are registered to the cluster; waiting",pod.Namespace,pod.Name)
} else {
if _,ok := err.(*core.FitError); ok {
klog.V(4).Infof("Unable to schedule %v/%v: no fit: %v; waiting",pod.Name,err)
} else if errors.IsNotFound(err) {
if errStatus,ok := err.(errors.APIStatus); ok && errStatus.Status().Details.Kind == "node" {
nodeName := errStatus.Status().Details.Name
// when node is not found,We do not remove the node right away. Trying again to get
// the node and if the node is still not found,then remove it from the scheduler cache.
_,err := client.CoreV1().Nodes().Get(nodeName,metav1.GetOptions{})
if err != nil && errors.IsNotFound(err) {
node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}}
schedulerCache.RemoveNode(&node)
}
}
} else {
klog.Errorf("Error scheduling %v/%v: %v; retrying",err)
}
}
podSchedulingCycle := podQueue.SchedulingCycle()
// Retry asynchronously.
// Note that this is extremely rudimentary and we need a more real error handling path.
go func() {
defer runtime.HandleCrash()
podID := types.NamespacedName{
Namespace: pod.Namespace,Name: pod.Name,}
// An unschedulable pod will be placed in the unschedulable queue.
// This ensures that if the pod is nominated to run on a node,
// scheduler takes the pod into account when running predicates for the node.
// Get the pod again; it may have changed/been scheduled already.
getBackoff := initialGetBackoff
for {
pod,err := client.CoreV1().Pods(podID.Namespace).Get(podID.Name,metav1.GetOptions{})
if err == nil {
if len(pod.Spec.NodeName) == 0 {
if err := podQueue.AddUnschedulableIfNotPresent(pod,podSchedulingCycle); err != nil {
klog.Error(err)
}
}
break
}
if errors.IsNotFound(err) {
klog.Warningf("A pod %v no longer exists",podID)
return
}
klog.Errorf("Error getting pod %v for retry: %v; retrying...",podID,err)
if getBackoff = getBackoff * 2; getBackoff > maximalGetBackoff {
getBackoff = maximalGetBackoff
}
time.Sleep(getBackoff)
}
}()
}
}
複製程式碼
除了一些node相關的錯誤,其他都會非同步的重試這個pod。在一個新的協程裡,首先檢視pod是否還存在,如果不存在,返回即可。如果存在但是nodeName為空,則呼叫AddUnschedulableIfNotPresent
。這一段邏輯有點欺騙性,因為進入到AddUnschedulableIfNotPresent
內部之後,實際上是根據p.moveRequestCycle
決定加入backoffQ還是unschedulableQ。
moveRequestCycle的值將在呼叫movePodsToActiveQueue
與MoveAllToActiveQueue
中被設為schedulingCycle。這些條件的觸發都在informers的event handler中,即如果資源發生變化,排程器認為這時候可能pod將變得可以排程,故需要把不可排程的pod或者backoff的pod再次加入activeQ中。具體的邏輯不再贅述。