1. 程式人生 > >Kubernetes Scheduler 排程- Node資訊管理

Kubernetes Scheduler 排程- Node資訊管理

原始碼為k8s v1.6.1版本,github上對應的commit id為b0b7a323cc5a4a2019b2e9520c21c7830b7f708e

本文將對Scheduler的排程中Node資訊的管理流程進行介紹,主要介紹Scheduler模組中node資訊如何初始化,node資訊如何與pod的更新資訊同步,node資訊如何用到排程演算法中

一、Kubernetes Scheduler中Node資訊建立

在Kubernetes中Scheduler中Node節點的資訊,通過list-watch機制從api-server獲取,同時每次排程完成後,又將結果同步到schedulecache物件nodeinfos中。再排程的演算法進行時,在將schedulecache物件nodeinfos中的資訊同步到genericScheduler物件的的cachedNodeInfoMap中。

Node資訊的建立包括如下幾個關鍵步驟:
(1) 建立Informer和Listen
在createScheduler-> factory.NewConfigFactory函式總,建立了nodeInformer和nodeLister物件,具體的程式碼如下:

    // Only nodes in the "Ready" condition with status == "True" are schedulable
    nodeInformer.Informer().AddEventHandlerWithResyncPeriod(
        cache.ResourceEventHandlerFuncs
{ AddFunc: c.addNodeToCache, UpdateFunc: c.updateNodeInCache, DeleteFunc: c.deleteNodeFromCache, }, 0, ) c.nodeLister = nodeInformer.Lister()

(2) 在schedulercache物件中建立nodeinfo物件

type schedulerCache struct {
    stop   <-chan struct{}
    ttl    time.Duration
    period time.Duration

    // This mutex guards all fields within this cache struct.
mu sync.Mutex // a set of assumed pod keys. // The key could further be used to get an entry in podStates. assumedPods map[string]bool // a map from pod key to podState. podStates map[string]*podState nodes map[string]*NodeInfo }

(3) 在genericScheduler物件中建立cachenodeinfo物件

type genericScheduler struct {
    cache                 schedulercache.Cache
    predicates            map[string]algorithm.FitPredicate
    priorityMetaProducer  algorithm.MetadataProducer
    predicateMetaProducer algorithm.MetadataProducer
    prioritizers          []algorithm.PriorityConfig
    extenders             []algorithm.SchedulerExtender
    pods                  algorithm.PodLister
    lastNodeIndexLock     sync.Mutex
    lastNodeIndex         uint64

    cachedNodeInfoMap map[string]*schedulercache.NodeInfo

    equivalenceCache *EquivalenceCache
}

二、Kubernetes Scheduler中Node資訊同步

在scheduleOne中每排程一個pod,則排程AssumePod函式,將Pod的資訊同步到schedulerCache中,在同步pod資訊的同時,同步修改對應的node資訊。

func (cache *schedulerCache) AssumePod(pod *v1.Pod) error {
    key, err := getPodKey(pod)
    if err != nil {
        return err
    }

    cache.mu.Lock()
    defer cache.mu.Unlock()
    if _, ok := cache.podStates[key]; ok {
        return fmt.Errorf("pod %v state wasn't initial but get assumed", key)
    }

    cache.addPod(pod)
    ps := &podState{
        pod: pod,
    }
    cache.podStates[key] = ps
    cache.assumedPods[key] = true
    return nil
}
// Assumes that lock is already acquired.
func (cache *schedulerCache) addPod(pod *v1.Pod) {
    n, ok := cache.nodes[pod.Spec.NodeName]
    if !ok {
        n = NewNodeInfo()
        cache.nodes[pod.Spec.NodeName] = n
    }
    n.addPod(pod)
}
// addPod adds pod information to this NodeInfo.
func (n *NodeInfo) addPod(pod *v1.Pod) {
    res, non0_cpu, non0_mem := calculateResource(pod)
    n.requestedResource.MilliCPU += res.MilliCPU
    n.requestedResource.Memory += res.Memory
    n.requestedResource.NvidiaGPU += res.NvidiaGPU
    if n.requestedResource.OpaqueIntResources == nil && len(res.OpaqueIntResources) > 0 {
        n.requestedResource.OpaqueIntResources = map[v1.ResourceName]int64{}
    }
    for rName, rQuant := range res.OpaqueIntResources {
        n.requestedResource.OpaqueIntResources[rName] += rQuant
    }
    n.nonzeroRequest.MilliCPU += non0_cpu
    n.nonzeroRequest.Memory += non0_mem
    n.pods = append(n.pods, pod)
    if hasPodAffinityConstraints(pod) {
        n.podsWithAffinity = append(n.podsWithAffinity, pod)
    }
    n.generation++
}

在Informer監聽到有node資訊變化時,會呼叫在初始化時傳入的對應的函式,具體的函式如下:

            AddFunc:    c.addNodeToCache,
            UpdateFunc: c.updateNodeInCache,
            DeleteFunc: c.deleteNodeFromCache,
func (c *ConfigFactory) addNodeToCache(obj interface{}) {
    node, ok := obj.(*v1.Node)
    if !ok {
        glog.Errorf("cannot convert to *v1.Node: %v", obj)
        return
    }

    if err := c.schedulerCache.AddNode(node); err != nil {
        glog.Errorf("scheduler cache AddNode failed: %v", err)
    }
}

func (cache *schedulerCache) AddNode(node *v1.Node) error {
    cache.mu.Lock()
    defer cache.mu.Unlock()

    n, ok := cache.nodes[node.Name]
    if !ok {
        n = NewNodeInfo()
        cache.nodes[node.Name] = n
    }
    return n.SetNode(node)
}

三、Kubernetes Scheduler中Node資訊在排程演算法中的使用

Kubernetes Scheduler中每次排程,輸入pod的資訊和node節點的資訊。其中node的節點的資訊來源於兩部分,一份來源於nodelisten另外一部分來源於schedulerCache中快取的nodeinfo資訊。

具體的流程如下
(1)通過nodelisten獲取node的名稱資訊

dest, err := s.config.Algorithm.Schedule(pod, s.config.NodeLister)
trace.Step("Computing predicates")
filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders, g.predicateMetaProducer)
    if err != nil {
        return "", err
    }
fmt.Printf("[%s] is fit, Unschedulable:%v ,\n",nodeName,nodeNameToInfo[nodeName].Node().Spec.Unschedulable)         
fits, failedPredicates, err := podFitsOnNode(pod, meta, nodeNameToInfo[nodeName], predicateFuncs)

(2)通過同步schedulerCache中快取的nodeinfo資訊作為排程演算法中node資訊的輸入
同步schedulerCache中快取的nodeinfo資訊,在genericScheduler物件的Schedule函式中:

// Used for all fit and priority funcs.
err = g.cache.UpdateNodeNameToInfoMap(g.cachedNodeInfoMap)
if err != nil {
    return "", err
}
func (cache *schedulerCache) UpdateNodeNameToInfoMap(nodeNameToInfo map[string]*NodeInfo) error {
    cache.mu.Lock()
    defer cache.mu.Unlock()
    for name, info := range cache.nodes {
        if current, ok := nodeNameToInfo[name]; !ok || current.generation != info.generation {
            nodeNameToInfo[name] = info.Clone()
        }
    }
    for name := range nodeNameToInfo {
        if _, ok := cache.nodes[name]; !ok {
            delete(nodeNameToInfo, name)
        }
    }
    return nil
}

在排程演算法中使用:

trace.Step("Computing predicates")
filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders, g.predicateMetaProducer)
    if err != nil {
        return "", err
    }
fmt.Printf("[%s] is fit, Unschedulable:%v ,\n",nodeName,nodeNameToInfo[nodeName].Node().Spec.Unschedulable)

fits, failedPredicates, err := podFitsOnNode(pod, meta, nodeNameToInfo[nodeName], predicateFuncs)
if err != nil {
    predicateResultLock.Lock()
    errs = append(errs, err)
    predicateResultLock.Unlock()
    return
}

這就是Kubernetes中schedule墨跡Node資訊管理的基本的流程。主要涉及到node資訊的同步和node資訊的使用