1. 程式人生 > >Kubernetes Scheduler原始碼分析

Kubernetes Scheduler原始碼分析

本文是對Kubernetes 1.5的Scheduler原始碼層面的剖析,包括對應的原始碼目錄結構分析、kube-scheduler執行機制分析、整體程式碼流程圖、核心程式碼走讀分析等內容。閱讀本文前,請先了解kubernetes scheduler原理解析

Kubernetes原始碼目錄結構分析

Kubernetes Scheduler是作為kubernetes的一個plugin來設計的,這種可插拔的設計極大方便使用者自定義排程演算法,在不同的公司,通常大家對排程的需求是不同的,自定義排程是很常見的。

Scheduler的原始碼主要在k8s.io/kubernetes/plugin/目錄下,其中兩個目錄cmd/scheduler和pkg/scheduler分別定義了kube-scheduler command的引數封裝和app啟動執行和scheduler的具體內部實現。具體的目錄結構分析如下所示。

k8s.io/kubernetes/plugin/
.
├── cmd
│   └── kube-scheduler          // kube-scheduler command的相關程式碼
│       ├── app                 // kube-scheduler app的啟動
│       │   ├── options         
│       │   │   └── options.go  // 封裝SchedulerServer物件和AddFlags方法
│       │   └── server.go       // 定義SchedulerServer的config封裝和Run方法
│ └── scheduler.go // kube-scheduler main方法入口 └── pkg ├── scheduler // scheduler後端核心程式碼 │ ├── algorithm │ │ ├── doc.go │ │ ├── listers.go // 定義NodeLister和PodLister等Interface │ │ ├── predicates // 定義kubernetes自帶的Predicates Policies的Function實現
│ │ │ ├── error.go │ │ │ ├── metadata.go │ │ │ ├── predicates.go // 自帶Predicates Policies的主要實現 │ │ │ ├── predicates_test.go │ │ │ ├── utils.go │ │ │ └── utils_test.go │ │ ├── priorities // 定義kubernetes自帶的Priorities Policies的Function實現 │ │ │ ├── balanced_resource_allocation.go // defaultProvider - BalancedResourceAllocation │ │ │ ├── balanced_resource_allocation_test.go │ │ │ ├── image_locality.go // defaultProvider - ImageLocalityPriority │ │ │ ├── image_locality_test.go │ │ │ ├── interpod_affinity.go // defaultProvider - InterPodAffinityPriority │ │ │ ├── interpod_affinity_test.go │ │ │ ├── least_requested.go // defaultProvider - LeastRequestedPriority │ │ │ ├── least_requested_test.go │ │ │ ├── metadata.go // priorityMetadata定義 │ │ │ ├── most_requested.go // defaultProvider - MostRequestedPriority │ │ │ ├── most_requested_test.go │ │ │ ├── node_affinity.go // defaultProvider - NodeAffinityPriority │ │ │ ├── node_affinity_test.go │ │ │ ├── node_label.go // 當policy.Argument.LabelPreference != nil時,會註冊該Policy │ │ │ ├── node_label_test.go │ │ │ ├── node_prefer_avoid_pods.go // defaultProvider - NodePreferAvoidPodsPriority │ │ │ ├── node_prefer_avoid_pods_test.go │ │ │ ├── selector_spreading.go // defaultProvider - SelectorSpreadPriority │ │ │ ├── selector_spreading_test.go │ │ │ ├── taint_toleration.go // defaultProvider - TaintTolerationPriority │ │ │ ├── taint_toleration_test.go │ │ │ ├── test_util.go │ │ │ └── util // 工具類 │ │ │ ├── non_zero.go │ │ │ ├── topologies.go │ │ │ └── util.go │ │ ├── scheduler_interface.go // 定義SchedulerExtender和ScheduleAlgorithm Interface │ │ ├── scheduler_interface_test.go │ │ └── types.go // 定義了Predicates和Priorities Algorithm要實現的方法型別(FitPredicate, PriorityMapFunction) │ ├── algorithmprovider // algorithm-provider引數配置的項 │ │ ├── defaults │ │ │ ├── compatibility_test.go │ │ │ └── defaults.go // "DefaultProvider"的實現 │ │ ├── plugins.go // 空,預留自定義 │ │ └── plugins_test.go │ ├── api // 定義Scheduelr API介面和物件,用於SchedulerExtender處理來自HTTPExtender的請求。 │ │ ├── latest │ │ │ └── latest.go │ │ ├── register.go │ │ ├── types.go // 定義Policy, PredicatePolicy,PriorityPolicy等 │ │ ├── v1 │ │ │ ├── register.go │ │ │ └── types.go │ │ └── validation │ │ ├── validation.go // 驗證Policy的定義是否合法 │ │ └── validation_test.go │ ├── equivalence_cache.go // │ ├── extender.go // 定義HTTPExtender的新建以及對應的Filter和Prioritize方法來干預預選和優選 │ ├── extender_test.go │ ├── factory // 根據配置的Policies註冊和匹配到對應的預選(FitPredicateFactory)和優選(PriorityFunctionFactory2)函式 │ │ ├── factory.go // 核心是定義ConfigFactory來工具配置完成scheduler的封裝函式,最關鍵的CreateFromConfig和CreateFromKeys │ │ ├── factory_test.go │ │ ├── plugins.go // 核心是定義註冊自定義預選和優選Policy的方法 │ │ └── plugins_test.go │ ├── generic_scheduler.go // 定義genericScheduler,其Schedule(...)方法作為排程執行的真正開始的地方 │ ├── generic_scheduler_test.go │ ├── metrics // 支援註冊metrics到Prometheus │ │ └── metrics.go │ ├── scheduler.go // 定義Scheduler及Run(),核心的scheduleOne()方法也在此,scheduleOne()一個完成的排程流程,包括或許待排程Pod、排程、Bind等 │ ├── scheduler_test.go │ ├── schedulercache │ │ ├── cache.go // 定義schedulerCache對Pod,Node,以及Bind的CURD,以及超時維護等工作 │ │ ├── cache_test.go │ │ ├── interface.go // schedulerCache要實現的Interface │ │ ├── node_info.go // 定義NodeInfo及其相關Opertation │ │ └── util.go │ └── testing │ ├── fake_cache.go │ └── pods_to_cache.go

Kube-scheduler執行機制分析

  1. kube-scheduler作為kubernetes master上一個單獨的程序提供排程服務,通過–master指定kube-api-server的地址,用來watch pod和node和呼叫api server bind介面完成node和pod的Bind操作。

  2. kube-scheduler中維護了一個FIFO型別的PodQueue cache,新建立的Pod都會被ConfigFactory watch到,被新增到該PodQueue中,每次排程都從該PodQueue中getNextPod作為即將排程的Pod。

  3. 獲取到待排程的Pod後,就執行AlgorithmProvider配置Algorithm的Schedule方法進行排程,整個排程過程分兩個關鍵步驟:Predicates和Priorities,最終選出一個最適合該Pod借宿的Node返回。

  4. 更新SchedulerCache中Pod的狀態(AssumePod),標誌該Pod為scheduled,並更新到最有NodeInfo中。

  5. 呼叫api server的Bind介面,完成node和pod的Bind操作,如果Bind失敗,從SchedulerCache中刪除上一步中已經Assumed的Pod。

Kubernetes Scheduler程式碼流程圖

由於圖片佈局較大,請下載到本地放大檢視。
這裡寫圖片描述

Kubernetes Scheduler核心程式碼走讀分析

Scheduler的main入口如下,負責建立SchedulerServer和啟動。

plugin/cmd/kube-scheduler/scheduler.go

func main() {
    s := options.NewSchedulerServer()
    s.AddFlags(pflag.CommandLine)

    flag.InitFlags()
    logs.InitLogs()
    defer logs.FlushLogs()

    verflag.PrintAndExitIfRequested()

    if err := app.Run(s); err != nil {
        glog.Fatalf("scheduler app failed to run: %v", err)
    }
}

kuber-scheduler的引數說明在options中定義如下:

plugin/cmd/kube-scheduler/app/options/options.go

// AddFlags adds flags for a specific SchedulerServer to the specified FlagSet
func (s *SchedulerServer) AddFlags(fs *pflag.FlagSet) {
    fs.Int32Var(&s.Port, "port", s.Port, "The port that the scheduler's http service runs on")
    fs.StringVar(&s.Address, "address", s.Address, "The IP address to serve on (set to 0.0.0.0 for all interfaces)")
    fs.StringVar(&s.AlgorithmProvider, "algorithm-provider", s.AlgorithmProvider, "The scheduling algorithm provider to use, one of: "+factory.ListAlgorithmProviders())
    fs.StringVar(&s.PolicyConfigFile, "policy-config-file", s.PolicyConfigFile, "File with scheduler policy configuration")
    fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/")
    fs.BoolVar(&s.EnableContentionProfiling, "contention-profiling", false, "Enable lock contention profiling, if profiling is enabled")
    fs.StringVar(&s.Master, "master", s.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")
    fs.StringVar(&s.Kubeconfig, "kubeconfig", s.Kubeconfig, "Path to kubeconfig file with authorization and master location information.")
    fs.StringVar(&s.ContentType, "kube-api-content-type", s.ContentType, "Content type of requests sent to apiserver.")
    fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", s.KubeAPIQPS, "QPS to use while talking with kubernetes apiserver")
    fs.Int32Var(&s.KubeAPIBurst, "kube-api-burst", s.KubeAPIBurst, "Burst to use while talking with kubernetes apiserver")
    fs.StringVar(&s.SchedulerName, "scheduler-name", s.SchedulerName, "Name of the scheduler, used to select which pods will be processed by this scheduler, based on pod's annotation with key 'scheduler.alpha.kubernetes.io/name'")
    fs.IntVar(&s.HardPodAffinitySymmetricWeight, "hard-pod-affinity-symmetric-weight", api.DefaultHardPodAffinitySymmetricWeight,
        "RequiredDuringScheduling affinity is not symmetric, but there is an implicit PreferredDuringScheduling affinity rule corresponding "+
            "to every RequiredDuringScheduling affinity rule. --hard-pod-affinity-symmetric-weight represents the weight of implicit PreferredDuringScheduling affinity rule.")
    fs.StringVar(&s.FailureDomains, "failure-domains", api.DefaultFailureDomains, "Indicate the \"all topologies\" set for an empty topologyKey when it's used for PreferredDuringScheduling pod anti-affinity.")
    leaderelection.BindFlags(&s.LeaderElection, fs)
    config.DefaultFeatureGate.AddFlag(fs)
}

server.Run方法是cmd/kube-scheduler中最重要的方法:

  • 負責config的生成。
  • 並根據config建立sheduler物件。
  • 啟動HTTP服務,提供/debug/pprof http介面方便進行效能資料收集調優,提供/metrics http介面以供prometheus收集監控資料。
  • kube-scheduler自選舉完成後立刻開始迴圈執行scheduler.Run進行排程。
plugin/cmd/kube-scheduler/app/server.go:75

// Run runs the specified SchedulerServer.  This should never exit.
func Run(s *options.SchedulerServer) error {
    ...
    config, err := createConfig(s, kubecli)
    ...
    sched := scheduler.New(config)

    go startHTTP(s)

    run := func(_ <-chan struct{}) {
        sched.Run()
        select {}
    }

    ...
    leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
        Lock:          rl,
        LeaseDuration: s.LeaderElection.LeaseDuration.Duration,
        RenewDeadline: s.LeaderElection.RenewDeadline.Duration,
        RetryPeriod:   s.LeaderElection.RetryPeriod.Duration,
        Callbacks: leaderelection.LeaderCallbacks{
            OnStartedLeading: run,
            OnStoppedLeading: func() {
                glog.Fatalf("lost master")
            },
        },
    })
    ...
}

開始進入Scheduler.Run的邏輯,啟動goroutine,迴圈反覆執行Scheduler.scheduleOne方法,直到收到shut down scheduler的訊號。

Scheduler.scheduleOne開始真正的排程邏輯,每次負責一個Pod的排程:

  • 從PodQueue中獲取一個Pod。
  • 執行對應Algorithm的Schedule,進行預選和優選。
  • AssumePod
  • Bind Pod, 如果Bind Failed,ForgetPod。
plugin/pkg/scheduler/scheduler.go:86

// Run begins watching and scheduling. It starts a goroutine and returns immediately.
func (s *Scheduler) Run() {
    go wait.Until(s.scheduleOne, 0, s.config.StopEverything)
}

func (s *Scheduler) scheduleOne() {
    pod := s.config.NextPod()
    ...
    dest, err := s.config.Algorithm.Schedule(pod, s.config.NodeLister)
    ...
    assumed := *pod
    assumed.Spec.NodeName = dest
    if err := s.config.SchedulerCache.AssumePod(&assumed); err != nil {
        ...
        return
    }

    go func() {
        ...

        b := &v1.Binding{
            ObjectMeta: v1.ObjectMeta{Namespace: pod.Namespace, Name: pod.Name},
            Target: v1.ObjectReference{
                Kind: "Node",
                Name: dest,
            },
        }

        ...
        err := s.config.Binder.Bind(b)
        if err != nil {
            glog.V(1).Infof("Failed to bind pod: %v/%v", pod.Namespace, pod.Name)
            if err := s.config.SchedulerCache.ForgetPod(&assumed); err != nil {
                ...
            return
        }

    }()
}

下面是Schedule Algorithm要實現的Schedule介面:

plugin/pkg/scheduler/algorithm/scheduler_interface.go:41

// ScheduleAlgorithm is an interface implemented by things that know how to schedule pods onto machines.
type ScheduleAlgorithm interface {
    Schedule(*v1.Pod, NodeLister) (selectedMachine string, err error)
}

genericScheduler作為一個預設Scheduler,當然也必須實現上述介面:

plugin/pkg/scheduler/generic_scheduler.go:89

func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (string, error) {

    // 從cache中獲取可被排程的Nodes
    ...
    nodes, err := nodeLister.List()
    ...

    // 開始預選
    trace.Step("Computing predicates")
    filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders, g.predicateMetaProducer)

    ...

    // 開始優選打分
    trace.Step("Prioritizing")
    metaPrioritiesInterface := g.priorityMetaProducer(pod, g.cachedNodeInfoMap)
    priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
    ...

    // 如果優選出多個Node,則隨機選擇一個Node作為最佳Node返回
    trace.Step("Selecting host")
    return g.selectHost(priorityList)
}


// findNodesThatFit是預選的入口
func findNodesThatFit(
    pod *v1.Pod,
    nodeNameToInfo map[string]*schedulercache.NodeInfo,
    nodes []*v1.Node,
    predicateFuncs map[string]algorithm.FitPredicate,
    extenders []algorithm.SchedulerExtender,
    metadataProducer algorithm.MetadataProducer,
) ([]*v1.Node, FailedPredicateMap, error) {
    var filtered []*v1.Node
    failedPredicateMap := FailedPredicateMap{}

    if len(predicateFuncs) == 0 {
        filtered = nodes
    } else {
        ...
        // checkNode會呼叫podFitsOnNode完成配置的所有Predicates Policies對該Node的檢查。
        checkNode := func(i int) {
            nodeName := nodes[i].Name
            fits, failedPredicates, err := podFitsOnNode(pod, meta, nodeNameToInfo[nodeName], predicateFuncs)
            ...
        }

        // 根據nodes數量,啟動最多16個個goroutine worker執行checkNode方法
        workqueue.Parallelize(16, len(nodes), checkNode)
        filtered = filtered[:filteredLen]
        if len(errs) > 0 {
            return []*v1.Node{}, FailedPredicateMap{}, errors.NewAggregate(errs)
        }
    }

    // 如果配置了Extender,則執行Extender的Filter邏輯再次進行甩選。
    if len(filtered) > 0 && len(extenders) != 0 {
        for _, extender := range extenders {
            filteredList, failedMap, err := extender.Filter(pod, filtered)
            ...
        }
    }
    return filtered, failedPredicateMap, nil
}

// 迴圈執行所有配置的Predicates Polic對應的predicateFunc。
func podFitsOnNode(pod *v1.Pod, meta interface{}, info *schedulercache.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate) (bool, []algorithm.PredicateFailureReason, error) {
    var failedPredicates []algorithm.PredicateFailureReason
    for _, predicate := range predicateFuncs {
        fit, reasons, err := predicate(pod, meta, info)
        ...
    }
    return len(failedPredicates) == 0, failedPredicates, nil
}


// 根據所有配置到Priorities Policies對所有預選後的Nodes進行優選打分
// 每個Priorities policy對每個node打分範圍為0-10分,分越高表示越合適
func PrioritizeNodes(
    pod *v1.Pod,
    nodeNameToInfo map[string]*schedulercache.NodeInfo,
    meta interface{},
    priorityConfigs []algorithm.PriorityConfig,
    nodes []*v1.Node,
    extenders []algorithm.SchedulerExtender,
) (schedulerapi.HostPriorityList, error) {

    ...
    // 對單個node遍歷所有的Priorities Policies,得到每個node每個policy打分的二維資料資料
    processNode := func(index int) {
        nodeInfo := nodeNameToInfo[nodes[index].Name]
        var err error
        for i := range priorityConfigs {
            if priorityConfigs[i].Function != nil {
                continue
            }
            results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
            if err != nil {
                appendError(err)
                return
            }
        }
    }

    // 根據nodes數量,啟動最多16個goroutine worker執行processNode方法
    workqueue.Parallelize(16, len(nodes), processNode)

    // 遍歷所有配置的Priorities policies,如果某個policy配置了Reduce,則執行對應的Reduce,更新result[node][policy]得分
    for i, priorityConfig := range priorityConfigs {
        if priorityConfig.Reduce == nil {
            continue
        }
        wg.Add(1)
        go func(index int, config algorithm.PriorityConfig) {
            defer wg.Done()
            if err := config.Reduce(pod, meta, nodeNameToInfo, results[index]); err != nil {
                appendError(err)
            }
        }(i, priorityConfig)
    }

    // Wait for all computations to be finished.
    wg.Wait()
    ...

    // 對得分進行加權求和得到最終分數
    result := make(schedulerapi.HostPriorityList, 0, len(nodes))
    // TODO: Consider parallelizing it.
    for i := range nodes {
        result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0})
        for j := range priorityConfigs {
            result[i].Score += results[j][i].Score * priorityConfigs[j].Weight
        }
    }

    // 如果配置了Extender,則再執行Extender的優選打分方法Extender.Prioritize
    if len(extenders) != 0 && nodes != nil {
        combinedScores := make(map[string]int, len(nodeNameToInfo))
        for _, extender := range extenders {
            wg.Add(1)
            go func(ext algorithm.SchedulerExtender) {
                defer wg.Done()
                prioritizedList, weight, err := ext.Prioritize(pod, nodes)
                ...
            }(extender)
        }


        // wait for all go routines to finish
        wg.Wait()

        // 執行combinedScores,將非Extender優選後的node得分再次經過Extender的優選打分排序
        for i := range result {
            result[i].Score += combinedScores[result[i].Host]
        }
    }

    ...
}

具體的Predicate Policy對應的PredicateFunc都定義在plugin/pkg/scheduler/algorithm/predicates/predicates.go中,下面是CheckNodeMemoryPressurePredicate的定義。

plugin/pkg/scheduler/algorithm/predicates/predicates.go:1202

// CheckNodeMemoryPressurePredicate checks if a pod can be scheduled on a node
// reporting memory pressure condition.
func CheckNodeMemoryPressurePredicate(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
    var podBestEffort bool
    if predicateMeta, ok := meta.(*predicateMetadata); ok {
        podBestEffort = predicateMeta.podBestEffort
    } else {
        // We couldn't parse metadata - fallback to computing it.
        podBestEffort = isPodBestEffort(pod)
    }
    // pod is not BestEffort pod
    if !podBestEffort {
        return true, nil, nil
    }

    // is node under presure?
    if nodeInfo.MemoryPressureCondition() == v1.ConditionTrue {
        return false, []algorithm.PredicateFailureReason{ErrNodeUnderMemoryPressure}, nil
    }
    return true, nil, nil
}

具體的Priorities Policy對應的PriorityFunc都定義在plugin/pkg/scheduler/algorithm/priorities/*.go中,下面是MostRequestedPriority的定義。

plugin/pkg/scheduler/algorithm/priorities/most_requested.go:33

// MostRequestedPriority is a priority function that favors nodes with most requested resources.
// It calculates the percentage of memory and CPU requested by pods scheduled on the node, and prioritizes
// based on the maximum of the average of the fraction of requested to capacity.
// Details: (cpu(10 * sum(requested) / capacity) + memory(10 * sum(requested) / capacity)) / 2
func MostRequestedPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) {
    var nonZeroRequest *schedulercache.Resource
    if priorityMeta, ok := meta.(*priorityMetadata); ok {
        nonZeroRequest = priorityMeta.nonZeroRequest
    } else {
        // We couldn't parse metadatat - fallback to computing it.
        nonZeroRequest = getNonZeroRequests(pod)
    }
    return calculateUsedPriority(pod, nonZeroRequest, nodeInfo)
}

kubernetes預設給kube-scheduler配置了DefaultProvider。DefaultProvider配置了哪些Predicates和Priorities Policies呢?這些都定義在plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go中,如下所示:

plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go:205

// DefaultProvider配置的預設Predicates Policies
func defaultPredicates() sets.String {
    return sets.NewString(
        // Fit is determined by volume zone requirements.
        factory.RegisterFitPredicateFactory(
            "NoVolumeZoneConflict",
            func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
                return predicates.NewVolumeZonePredicate(args.PVInfo, args.PVCInfo)
            },
        ),
        ...
        // Fit is determined by non-conflicting disk volumes.
        factory.RegisterFitPredicate("NoDiskConflict", predicates.NoDiskConflict),

        // GeneralPredicates are the predicates that are enforced by all Kubernetes components
        // (e.g. kubelet and all schedulers)
        factory.RegisterFitPredicate("GeneralPredicates", predicates.GeneralPredicates),

        // Fit is determined based on whether a pod can tolerate all of the node's taints
        factory.RegisterFitPredicate("PodToleratesNodeTaints", predicates.PodToleratesNodeTaints),

        // Fit is determined by node memory pressure condition.
        factory.RegisterFitPredicate("CheckNodeMemoryPressure", predicates.CheckNodeMemoryPressurePredicate),

        // Fit is determined by node disk pressure condition.
        factory.RegisterFitPredicate("CheckNodeDiskPressure", predicates.CheckNodeDiskPressurePredicate),
    )
}

// DefaultProvider配置的預設Priorities Policies
func defaultPriorities() sets.String {
    return sets.NewString(
        // spreads pods by minimizing the number of pods (belonging to the same service or replication controller) on the same node.
        factory.RegisterPriorityConfigFactory(
            "SelectorSpreadPriority",
            factory.PriorityConfigFactory{
                Function: func(args factory.PluginFactoryArgs) algorithm.PriorityFunction {
                    return priorities.NewSelectorSpreadPriority(args.ServiceLister, args.ControllerLister, args.ReplicaSetLister)
                },
                Weight: 1,
            },
        ),
        ...

        // TODO: explain what it does.
        factory.RegisterPriorityFunction2("TaintTolerationPriority", priorities.ComputeTaintTolerationPriorityMap, priorities.ComputeTaintTolerationPriorityReduce, 1),
    )
}

上面核心程式碼的走讀分析,請結合上一節Kubernetes Scheduler程式碼流程圖進行閱讀。相信讀到這裡,你對整個scheduler的程式碼已經有一定的理解了。

總結

  • kube-scheduler作為kubernetes master上一個單獨的程序提供排程服務,通過–master指定kube-api-server的地址,用來watch pod和node和呼叫api server bind介面完成node和pod的Bind操作。

  • kube-scheduler中維護了一個FIFO型別的PodQueue cache,新建立的Pod都會被ConfigFactory watch到,被新增到該PodQueue中,每次排程都從該PodQueue中getNextPod作為即將排程的Pod。

  • 獲取到待排程的Pod後,就執行AlgorithmProvider配置Algorithm的Schedule方法進行排程,整個排程過程分兩個關鍵步驟:Predicates和Priorities,最終選出一個最適合該Pod借宿的Node返回。

  • 更新SchedulerCache中Pod的狀態(AssumePod),標誌該Pod為scheduled,並更新到最有NodeInfo中。

  • 呼叫api server的Bind介面,完成node和pod的Bind操作,如果Bind失敗,從SchedulerCache中刪除上一步中已經Assumed的Pod。

相關推薦

Kubernetes Scheduler原始碼分析

本文是對Kubernetes 1.5的Scheduler原始碼層面的剖析,包括對應的原始碼目錄結構分析、kube-scheduler執行機制分析、整體程式碼流程圖、核心程式碼走讀分析等內容。閱讀本文前,請先了解kubernetes scheduler原理解析。

kubernetes/k8s原始碼分析】kube-scheduler 原始碼分析

前言 在 kubernetes 體系中,scheduler 是唯一一個以 plugin 形式存在的模組,這種可插拔的設計方便使用者自定義所需要的排程演算法,所以原始碼路徑為 plugin 目錄下

kubernetes/k8s原始碼分析】kubelet原始碼分析之cdvisor原始碼分析

  資料流 UnsecuredDependencies -> run   1. cadvisor.New初始化 if kubeDeps.CAdvisorInterface == nil { imageFsInfoProvider := cadv

kubernetes/k8s原始碼分析】kubelet原始碼分析之容器網路初始化原始碼分析

一. 網路基礎   1.1 網路名稱空間的操作 建立網路名稱空間: ip netns add 名稱空間內執行命令: ip netns exec 進入名稱空間: ip netns exec bash   1.2 bridge-nf-c

kubernetes/k8s原始碼分析】kubelet原始碼分析之資源上報

0. 資料流   路徑: pkg/kubelet/kubelet.go   Run函式() ->   syncNodeStatus ()  ->   registerWithAPIServer() ->

kubernetes/k8s原始碼分析】kubelet原始碼分析之啟動容器

主要是呼叫runtime,這裡預設為docker 0. 資料流 NewMainKubelet(cmd/kubelet/app/server.go) -> NewKubeGenericRuntimeManager(pkg/kubelet/kuberuntime/kuberuntime

kubernetes/k8s原始碼分析】 controller-manager之replicaset原始碼分析

ReplicaSet簡介     Kubernetes 中建議使用 ReplicaSet來取代 ReplicationController。ReplicaSet 跟 ReplicationController 沒有本質的不同, ReplicaSet 支援集合式的

kubernetes/k8s原始碼分析】 client-go包之Informer原始碼分析

Informer 簡介        Informer 是 Client-go 中的一個核心工具包。如果 Kubernetes 的某個元件,需要 List/Get Kubernetes 中的 Object(包括pod,service等等),可以直接使用

kubernetes/k8s原始碼分析】kube-apiserver的go-restful框架使用

go-restful框架     github: https://github.com/emicklei/go-restful 三個重要資料結構   1. 初始化   路徑pkg/kubelet/kubelet.go中函式Ne

105 - kube-scheduler原始碼分析 - predicate演算法註冊

一、predicate註冊過程  今天我們來聊聊predicate函式是怎麼被註冊進去的,也就是要執行的一堆predicate是怎麼成為“選中的孩子”。  程式碼位置:pkg/scheduler/factory/plugins.go:111

kubernetes/k8s原始碼分析】 deployment原始碼分析

0. 開始 func NewControllerInitializers() map[string]InitFunc { controllers := map[string]InitFunc{}

kubernetes/k8s原始碼分析kubernetes event原始碼分析

描述         使用方式 eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster

kubernetes/k8s原始碼分析】kubectl-controller-manager之job原始碼分析

job介紹     Job: 批量一次性任務,並保證處理的一個或者多個Pod成功結束 非並行Job: 固定完成次數的並行Job: 帶有工作佇列的並行Job: SPEC引數 .spec.completions: 

kubernetes/k8s原始碼分析】kubectl-controller-manager之cronjob原始碼分析

crontab的基本格式     支援 , - * / 四個字元           *:表示匹配任意值,如果在Minutes 中使用表示每分鐘    &

kubernetes/k8s原始碼分析】kubectl-controller-manager之HPA原始碼分析

本文基於kubernetes版本:v1.12.1 HPA介紹      https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/     Th

kubernetes/k8s原始碼分析】kubectl-controller-manager之pod gc原始碼分析

  引數:    --controllers strings:配置需要enable的列表         這裡也包括podgc          All con

kubernetes/k8s原始碼分析】kubectl-proxy ipvs原始碼分析

kubernetes版本: 1.12.1  原始碼路徑 pkg/proxy/ipvs/proxier.go 本文只講解IPVS相關部分,啟動流程前文: https://blog.csdn.net/zhonglinzhang/article/details/80185053

kubernetes/k8s原始碼分析】CNI flannel原始碼分析

原始碼路徑: https://github.com/containernetworking/plugins 版本: v.0.10.0 flannel cni路徑: plugins/plugins/meta/flannel/flannel.go   subnet

kubernetes/k8s原始碼分析】eviction機制原理以及原始碼解析

What?   Why?   kubelet通過OOM Killer來回收缺點: System OOM events會儲存記錄直到完成了OOM OOM Killer幹掉containers後,Scheduler可能又會排程新的Pod到該Node上或

Openstack nova-scheduler 原始碼分析 — Filters/Weighting

目錄 前言 本篇記錄了 Openstack 在建立 Instances 時,nova-scheduler 作為排程器的工作原理和程式碼實現。 Openstack 中會由多個的 Instance 共享同一個 Host,而不是獨佔。所以就需要使用排