深入理解k8s排程器與排程框架核心原始碼
阿新 • • 發佈:2021-01-10
k8s排程器kube-scheduler的核心實現在pkg/scheduler下
algorithmprovider:排程演算法的註冊與獲取功能,核心資料結構是一個字典類的結構
apis:k8s叢集中的資源版本相關的介面,和apiversion、type相關的一些內容
core:排程器例項的核心資料結構與介面以及外部擴充套件機制的實現
framework:定義了一套排程器內部擴充套件機制
internal:排程器核心例項依賴的內部資料結構
metrics:指標度量
profile:基於framework的一套排程器的配置,用於管控整個排程器的執行框架
testing:一些測試程式碼
util:一些通用的工具
在pkg/scheduler/scheduler.go,定義了Scheduler:
pkg/scheduler/internal/queue/scheduling_queue.go中定義了排程佇列的介面SchedulingQueue:
AssignedPodAdded、AssignedPodUpdated、MoveAllToActiveOrBackoffQueue底層都會呼叫 movePodsToActiveOrBackoffQueue方法,主要用來設定資源(Pod、Node等)更新時的回撥方法。即資源更新時,之前無法被排程的Pod,會有重試的機會。
PriorityQueue是介面的具體實現:
其核心資料結構主要包含三個佇列,高優先度的Pod排在前面。
(1)activeQ:儲存所有等待排程的Pod的佇列
預設是基於堆來實現,其中元素的優先順序則通過對比Pod的建立時間和Pod的優先順序來進行排序。
kube-scheduler發現某個Pod的nodeName是空後,就認為這個Pod處於未排程狀態,將其放到排程佇列裡:
(2)podBackoffQ:儲存執行失敗的Pod的佇列
type Scheduler struct { SchedulerCache internalcache.Cache Algorithm core.ScheduleAlgorithm NextPod func() *framework.QueuedPodInfo Error func(*framework.QueuedPodInfo, error) //預設的排程失敗處理方法 StopEverything <-chan struct{} SchedulingQueue internalqueue.SchedulingQueue //Pod的排程佇列 Profiles profile.Map //排程器配置 client clientset.Interface }
type SchedulingQueue interface { framework.PodNominator Add(pod *v1.Pod) error AddUnschedulableIfNotPresent(pod *framework.QueuedPodInfo, podSchedulingCycle int64) error SchedulingCycle() int64 Pop() (*framework.QueuedPodInfo, error) Update(oldPod, newPod *v1.Pod) error Delete(pod *v1.Pod) error MoveAllToActiveOrBackoffQueue(event string) AssignedPodAdded(pod *v1.Pod) AssignedPodUpdated(pod *v1.Pod) PendingPods() []*v1.Pod Close() NumUnschedulablePods() int //不可排程的Pod數量 Run() }
type PriorityQueue struct { framework.PodNominator //排程的結果(Pod和Node的對應關係) stop chan struct{} //外部控制佇列的channel clock util.Clock podInitialBackoffDuration time.Duration //backoff pod 初始的等待重新排程時間 podMaxBackoffDuration time.Duration //backoff pod 最大的等待重新排程時間 lock sync.RWMutex cond sync.Cond //併發場景下實現控制pop的阻塞 activeQ *heap.Heap podBackoffQ *heap.Heap unschedulableQ *UnschedulablePodsMap schedulingCycle int64 //計數器,每pop一共pod,增加一次 moveRequestCycle int64 closed bool }
func (p *PriorityQueue) podsCompareBackoffCompleted(podInfo1, podInfo2 interface{}) bool { pInfo1 := podInfo1.(*framework.QueuedPodInfo) pInfo2 := podInfo2.(*framework.QueuedPodInfo) bo1 := p.getBackoffTime(pInfo1) bo2 := p.getBackoffTime(pInfo2) return bo1.Before(bo2) } // getBackoffTime returns the time that podInfo completes backoff func (p *PriorityQueue) getBackoffTime(podInfo *framework.QueuedPodInfo) time.Time { duration := p.calculateBackoffDuration(podInfo) backoffTime := podInfo.Timestamp.Add(duration) return backoffTime } // 計算backoff時間 func (p *PriorityQueue) calculateBackoffDuration(podInfo *framework.QueuedPodInfo) time.Duration { duration := p.podInitialBackoffDuration for i := 1; i < podInfo.Attempts; i++ { duration = duration * 2 if duration > p.podMaxBackoffDuration { return p.podMaxBackoffDuration } } return duration }(3)unschedulableQ:其實是一個Map結構,儲存暫時無法排程的Pod
type UnschedulablePodsMap struct { podInfoMap map[string]*framework.QueuedPodInfo keyFunc func(*v1.Pod) string metricRecorder metrics.MetricRecorder //有Pod從Map中新增、刪除時就會增加1 } // 建構函式 func newUnschedulablePodsMap(metricRecorder metrics.MetricRecorder) *UnschedulablePodsMap { return &UnschedulablePodsMap{ podInfoMap: make(map[string]*framework.QueuedPodInfo), keyFunc: util.GetPodFullName, metricRecorder: metricRecorder, } }新建Scheduler的方法:
func New(client clientset.Interface, informerFactory informers.SharedInformerFactory, recorderFactory profile.RecorderFactory, stopCh <-chan struct{}, opts ...Option) (*Scheduler, error) { stopEverything := stopCh if stopEverything == nil { stopEverything = wait.NeverStop } options := defaultSchedulerOptions //獲取預設的排程器選項,裡面會給定預設的algorithmSourceProvider for _, opt := range opts { opt(&options) } schedulerCache := internalcache.New(30*time.Second, stopEverything) //初始化排程快取 registry := frameworkplugins.NewInTreeRegistry() //registry是一個字典,裡面存放了外掛名與外掛的工廠方法 if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil { return nil, err } snapshot := internalcache.NewEmptySnapshot() configurator := &Configurator{ //基於配置建立configurator例項 client: client, recorderFactory: recorderFactory, informerFactory: informerFactory, schedulerCache: schedulerCache, StopEverything: stopEverything, percentageOfNodesToScore: options.percentageOfNodesToScore, podInitialBackoffSeconds: options.podInitialBackoffSeconds, podMaxBackoffSeconds: options.podMaxBackoffSeconds, profiles: append([]schedulerapi.KubeSchedulerProfile(nil), options.profiles...), registry: registry, nodeInfoSnapshot: snapshot, extenders: options.extenders, frameworkCapturer: options.frameworkCapturer, } metrics.Register() var sched *Scheduler source := options.schedulerAlgorithmSource switch { case source.Provider != nil: // Create the config from a named algorithm provider. sc, err := configurator.createFromProvider(*source.Provider) if err != nil { return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err) } sched = sc case source.Policy != nil: // Create the config from a user specified policy source. policy := &schedulerapi.Policy{} switch { case source.Policy.File != nil: if err := initPolicyFromFile(source.Policy.File.Path, policy); err != nil { return nil, err } case source.Policy.ConfigMap != nil: if err := initPolicyFromConfigMap(client, source.Policy.ConfigMap, policy); err != nil { return nil, err } } // Set extenders on the configurator now that we've decoded the policy // In this case, c.extenders should be nil since we're using a policy (and therefore not componentconfig, // which would have set extenders in the above instantiation of Configurator from CC options) configurator.extenders = policy.Extenders sc, err := configurator.createFromConfig(*policy) if err != nil { return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err) } sched = sc default: return nil, fmt.Errorf("unsupported algorithm source: %v", source) } // Additional tweaks to the config produced by the configurator. sched.StopEverything = stopEverything sched.client = client addAllEventHandlers(sched, informerFactory) return sched, nil }addAllEventHandlers方法會啟動所有資源物件的事件監聽,例如,新生成的Pod,spec.nodeName為空且狀態是pending。kube-Scheduler會watch到這個Pod的生成事件。 kube-scheduler的排程流程為: (1)Cobra命令列引數解析 通過options.NewOptions函式初始化各個模組的預設配置,例如HTTP或HTTPS服務等。 通過options.Validate函式驗證配置引數的合法性和可用性 kube-scheduler啟動時通過--config <filename>指定配置檔案 對預設配置啟動的排程器,可以用 --write-config-to把預設配置寫到一個指定檔案裡面。
apiVersion: kubescheduler.config.k8s.io/v1alpha1 kind: KubeSchedulerConfiguration algorithmSource: provider: DefaultProvider percentageOfNodesToScore: 0 schedulerName: default-scheduler bindTimeoutSeconds: 600 clientConnection: acceptContentTypes: "" burst: 100 contentType: application/vnd.kubernetes.protobuf kubeconfig: "" qps: 50 disablePreemption: false enableContentionProfiling: false enableProfiling: false hardPodAffinitySymmetricWeight: 1 healthzBindAddress: 0.0.0.0:10251 leaderElection: leaderElect: true leaseDuration: 15s lockObjectName: kube-scheduler lockObjectNamespace: kube-system renewDeadline: 10s resourceLock: endpoints retryPeriod: 2s metricsBindAddress: 0.0.0.0:10251 profiles: - schedulerName: default-scheduler - schedulerName: no-scoring-scheduler plugins: preScore: disabled: - name: '*' score: disabled: - name: '*'algorithmSource:演算法提供者,即排程器配置(過濾器、打分器等一些配置檔案的格式),目前提供三種方式: Provider(DefaultProvider優先打散、ClusterAutoscalerProvider優先堆疊)、file、configMap percentageOfNodesToscore:控制Node的取樣規模; SchedulerName:排程器名稱,預設名稱是default-scheduler; bindTimeoutSeconds:Bind階段的超時時間 ClientConnection:配置跟kube-apiserver互動的一些引數配置。比如contentType是用來跟kube-apiserver互動的序列化協議,這裡指定為protobuf; disablePreemption:關閉搶佔協議; hardPodAffinitySymnetricweight:配置PodAffinity和NodeAffinity的權重是多少。 profiles:可以定義多個。Pod通過spec.schedulerName指定使用的排程器(預設排程器是default-scheduler) 將cc物件(kube-scheduler元件的執行配置)傳入cmd/kube-scheduler/app/server.go中的Run函式,Run函式定義了kube-scheduler元件啟動的邏輯,它是一個執行不退出的常駐程序 (1)Configz registration
if cz, err := configz.New("componentconfig"); err == nil { cz.Set(cc.ComponentConfig) } else { return fmt.Errorf("unable to register configz: %s", err) }(2)執行EventBroadcaster事件管理器。
cc.EventBroadcaster.StartRecordingToSink(ctx.Done())(3)執行HTTP服務 /healthz:用於健康檢查
var checks []healthz.HealthChecker // 設定健康檢查 if cc.ComponentConfig.LeaderElection.LeaderElect { checks = append(checks, cc.LeaderElection.WatchDog) } if cc.InsecureServing != nil { separateMetrics := cc.InsecureMetricsServing != nil handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, separateMetrics, checks...), nil, nil) if err := cc.InsecureServing.Serve(handler, 0, ctx.Done()); err != nil { return fmt.Errorf("failed to start healthz server: %v", err) } } var checks []healthz.HealthChecker if cc.ComponentConfig.LeaderElection.LeaderElect { checks = append(checks, cc.LeaderElection.WatchDog) }/metrics:用於監控指標,一般用於Prometheus指標採集
if cc.InsecureMetricsServing != nil { handler := buildHandlerChain(newMetricsHandler(&cc.ComponentConfig), nil, nil) if err := cc.InsecureMetricsServing.Serve(handler, 0, ctx.Done()); err != nil { return fmt.Errorf("failed to start metrics server: %v", err) } }(4)執行HTTPS服務
if cc.SecureServing != nil { handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, false, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer) // TODO: handle stoppedCh returned by c.SecureServing.Serve if _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil { // fail early for secure handlers, removing the old error loop from above return fmt.Errorf("failed to start secure server: %v", err) } }(5)例項化所有的Informer,執行所有已經例項化的Informer物件 包括Pod、Node、PV、PVC、SC、CSINode、PDB、RC、RS、Service、STS、Deployment
cc.InformerFactory.Start(ctx.Done()) cc.InformerFactory.WaitForCacheSync(ctx.Done()) // 等待所有執行中的Informer的資料同步到本地(6)參與選主:
if cc.LeaderElection != nil { //需要參與選主 cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{ OnStartedLeading: func(ctx context.Context) { close(waitingForLeader) sched.Run(ctx) }, OnStoppedLeading: func() { klog.Fatalf("leaderelection lost") }, } leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection) //例項化LeaderElector物件 if err != nil { return fmt.Errorf("couldn't create leader elector: %v", err) } leaderElector.Run(ctx) //呼叫client-go中tools/leaderelection/leaderelection.go中的Run()參與領導選舉 return fmt.Errorf("lost lease") }LeaderCallbacks中定義了兩個回撥函式: OnStartedLeading函式是當前節點領導者選舉成功後回撥的函式,定義了kube-scheduler元件的主邏輯; OnStoppedLeading函式是當前節點領導者被搶佔後回撥的函式,會退出當前的kube-scheduler協程。 (7)執行sched.Run排程器。
sched.Run(ctx)其執行邏輯為:
func (sched *Scheduler) Run(ctx context.Context) { sched.SchedulingQueue.Run() wait.UntilWithContext(ctx, sched.scheduleOne, 0) sched.SchedulingQueue.Close() }首先呼叫了pkg/scheduler/internal/queue/scheduling_queue.go中PriorityQueue的Run方法:
func (p *PriorityQueue) Run() { go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop) go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop)其邏輯為: 每隔1秒,檢測backoffQ裡是否有pod可以被放進activeQ裡 每隔30秒,檢測unschedulepodQ裡是否有pod可以被放進activeQ裡(預設條件是等待時間超過60 秒) 然後呼叫了sched.scheduleOne,它是kube-scheduler元件的排程主邏輯,通過wait.Until定時器執行,內部會定時呼叫sched.scheduleOne函式,當sched.config.StopEverythingChan關閉時,該定時器才會停止並退出。 kube-scheduler首先從activeQ裡pop一個等待排程的Pod出來,並從NodeCache裡拿到相關的Node資料 NodeCache橫軸為zoneIndex(即Node按照zone進行分堆,從而保證拿到的Node按zone打散),縱軸為nodeIndex。 在filter階段,每pop一個node進行過濾,zoneIndex往後自增一個位置,然後從該zone的node列表中取一個Node出來(如果當前zone的無Node,就會從下一個zone拿),取出後nodeIndex也要往後自增一個位置。 根據取樣比例判斷Filter到的Node是否足夠。如果取樣的規模已經達到了設定的取樣比例,Filter就會結束。 取樣比例通過percentageOfNodesToScore(0~100)設定 當叢集中的可排程節點少於50個時,排程器仍然會去檢查所有的Node 若不設定取樣比例,預設的比例會隨著節點數量的增多不斷降低(最低到5%) Scheduling Framework是一種可插入的架構,在原有的排程流程中定義了豐富的擴充套件點(extention point)介面 開發者可以通過實現擴充套件點所定義的介面來實現外掛,從而將自身的排程邏輯整合到Scheduling Framework中。 自帶的外掛在pkg/scheduler/algorithmprovider/registry.go中進行註冊:
func getDefaultConfig() *schedulerapi.Plugins { return &schedulerapi.Plugins{ QueueSort: &schedulerapi.PluginSet{ Enabled: []schedulerapi.Plugin{ {Name: queuesort.Name}, }, }, PreFilter: &schedulerapi.PluginSet{ Enabled: []schedulerapi.Plugin{ {Name: noderesources.FitName}, {Name: nodeports.Name}, {Name: podtopologyspread.Name}, {Name: interpodaffinity.Name}, {Name: volumebinding.Name}, }, }, Filter: &schedulerapi.PluginSet{ Enabled: []schedulerapi.Plugin{ {Name: nodeunschedulable.Name}, {Name: nodename.Name}, {Name: tainttoleration.Name}, {Name: nodeaffinity.Name}, {Name: nodeports.Name}, {Name: noderesources.FitName}, {Name: volumerestrictions.Name}, {Name: nodevolumelimits.EBSName}, {Name: nodevolumelimits.GCEPDName}, {Name: nodevolumelimits.CSIName}, {Name: nodevolumelimits.AzureDiskName}, {Name: volumebinding.Name}, {Name: volumezone.Name}, {Name: podtopologyspread.Name}, {Name: interpodaffinity.Name}, }, }, PostFilter: &schedulerapi.PluginSet{ Enabled: []schedulerapi.Plugin{ {Name: defaultpreemption.Name}, }, }, PreScore: &schedulerapi.PluginSet{ Enabled: []schedulerapi.Plugin{ {Name: interpodaffinity.Name}, {Name: podtopologyspread.Name}, {Name: tainttoleration.Name}, {Name: nodeaffinity.Name}, }, }, Score: &schedulerapi.PluginSet{ Enabled: []schedulerapi.Plugin{ {Name: noderesources.BalancedAllocationName, Weight: 1}, {Name: imagelocality.Name, Weight: 1}, {Name: interpodaffinity.Name, Weight: 1}, {Name: noderesources.LeastAllocatedName, Weight: 1}, {Name: nodeaffinity.Name, Weight: 1}, {Name: nodepreferavoidpods.Name, Weight: 10000}, {Name: podtopologyspread.Name, Weight: 2}, {Name: tainttoleration.Name, Weight: 1}, }, }, Reserve: &schedulerapi.PluginSet{ Enabled: []schedulerapi.Plugin{ {Name: volumebinding.Name}, }, }, PreBind: &schedulerapi.PluginSet{ Enabled: []schedulerapi.Plugin{ {Name: volumebinding.Name}, }, }, Bind: &schedulerapi.PluginSet{ Enabled: []schedulerapi.Plugin{ {Name: defaultbinder.Name}, }, }, } }Scheduling Framework在執行排程流程執行到相應的擴充套件點時,會呼叫使用者註冊的外掛,影響排程決策的結果。 核心排程流程在pkg/scheduler/core/generic_scheduler.go:
func (g *genericScheduler) Schedule(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) { trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name}) defer trace.LogIfLong(100 * time.Millisecond) if err := g.snapshot(); err != nil { return result, err } trace.Step("Snapshotting scheduler cache and node infos done") if g.nodeInfoSnapshot.NumNodes() == 0 { return result, ErrNoNodesAvailable } feasibleNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, fwk, state, pod) if err != nil { return result, err } trace.Step("Computing predicates done") if len(feasibleNodes) == 0 { return result, &FitError{ Pod: pod, NumAllNodes: g.nodeInfoSnapshot.NumNodes(), FilteredNodesStatuses: filteredNodesStatuses, } } // When only one node after predicate, just use it. if len(feasibleNodes) == 1 { return ScheduleResult{ SuggestedHost: feasibleNodes[0].Name, EvaluatedNodes: 1 + len(filteredNodesStatuses), FeasibleNodes: 1, }, nil } priorityList, err := g.prioritizeNodes(ctx, fwk, state, pod, feasibleNodes) if err != nil { return result, err } host, err := g.selectHost(priorityList) trace.Step("Prioritizing done") return ScheduleResult{ SuggestedHost: host, EvaluatedNodes: len(feasibleNodes) + len(filteredNodesStatuses), FeasibleNodes: len(feasibleNodes), }, err }下面為Scheduling Framework全流程,灰色外掛預設不啟用: 1、scheduling cycle scheduling cycle是排程的核心流程,主要進行排程決策,挑選出唯一的節點。 scheduling cycle是同步執行的,同一個時間只有一個scheduling cycle,是執行緒安全的 (1)QueueSort QueueSortPlugin用於排序排程佇列中的Pod,介面只定義了一個函式Less,用於堆排序待排程Pod時進行比較
type QueueSortPlugin interface { Plugin Less(*PodInfo, *PodInfo) bool }比較函式在同一時刻只有一個,所以QueueSort 外掛只能Enable一個,如果使用者Enable了2個則排程器啟動時會報錯退出 預設的比較函式首先比較優先順序,然後再比較timestamp:
type PrioritySort struct{} func (pl *PrioritySort) Less(pInfo1, pInfo2 *framework.QueuedPodInfo) bool { p1 := pod.GetPodPriority(pInfo1.Pod) p2 := pod.GetPodPriority(pInfo2.Pod) return (p1 > p2) || (p1 == p2 && pInfo1.Timestamp.Before(pInfo2.Timestamp)) } func GetPodPriority(pod *v1.Pod) int32 { if pod.Spec.Priority != nil { return *pod.Spec.Priority } return 0 }預選階段先併發執行PreFilter,只有當所有的PreFilter外掛都返回success 時,才能進入Filter階段,否則Pod將會被拒絕掉,標識此次排程流程失敗;再併發執行Filter的所有外掛,每個Node只要被任一Filter外掛認為不滿足排程要求就會被濾除。 為了提升效率,執行順序可以被配置,這樣使用者就可以將過濾掉大量節點的策略(例如NodeSelector的Filter)放到前邊執行,從而減少後邊Filter策略執行的次數 (2)PreFilter PreFilter是排程流程啟動之前的預處理,可以進行Pod資訊的加工、叢集或Pod必須滿足的預置條件的檢查等。
- NodeResourcesFit
- NodePorts
- podtopologyspread
- InterPodAffinity
- volumebinding
- ServiceAffinity
- nodeunschedulable
- NodeResourcesFit
- nodename:
- nodeports
- nodeaffinity
- volumerestrictions
- tainttoleration
- NodeVolumeLimits、 EBSLimits、 GCEPDLimits、 AzureDiskLimits、 CinderVolume:
- volumebinding
- volumezone
- podtopologyspread
- interpodaffinity
- NodeLabel
- ServiceAffinity
- DefaultPreemption:當高優先順序的Pod沒有找到合適的Node時,會執行Preempt搶佔演算法,搶佔的流程:
- SelectorSpread
- interpodaffinity
- podtopologyspread
- tainttoleration
- NodeResourceLimits
- SelectorSpread
- NodeResourcesBalancedAllocation:碎片率(BalancedResourceAllocation):
- NodeResourcesLeastAllocated:優先打散
- interpodaffinity
- nodeaffinity
- nodepreferavoidpods
- podtopologyspread
- tainttoleration
- NodeResourcesMostAllocated:優先堆疊
- RequestedToCapacityRatio:指定比率
- NodeResourceLimits
- NodeLabel
- ServiceAffinity:
- volumebinding
- volumebinding
- defaultbinder