Kubernetes Scheduler 排程- Node資訊管理
原始碼為k8s v1.6.1版本,github上對應的commit id為b0b7a323cc5a4a2019b2e9520c21c7830b7f708e
本文將對Scheduler的排程中Node資訊的管理流程進行介紹,主要介紹Scheduler模組中node資訊如何初始化,node資訊如何與pod的更新資訊同步,node資訊如何用到排程演算法中
一、Kubernetes Scheduler中Node資訊建立
在Kubernetes中Scheduler中Node節點的資訊,通過list-watch機制從api-server獲取,同時每次排程完成後,又將結果同步到schedulecache物件nodeinfos中。再排程的演算法進行時,在將schedulecache物件nodeinfos中的資訊同步到genericScheduler物件的的cachedNodeInfoMap中。
Node資訊的建立包括如下幾個關鍵步驟:
(1) 建立Informer和Listen
在createScheduler-> factory.NewConfigFactory函式總,建立了nodeInformer和nodeLister物件,具體的程式碼如下:
// Only nodes in the "Ready" condition with status == "True" are schedulable
nodeInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs {
AddFunc: c.addNodeToCache,
UpdateFunc: c.updateNodeInCache,
DeleteFunc: c.deleteNodeFromCache,
},
0,
)
c.nodeLister = nodeInformer.Lister()
(2) 在schedulercache物件中建立nodeinfo物件
type schedulerCache struct {
stop <-chan struct{}
ttl time.Duration
period time.Duration
// This mutex guards all fields within this cache struct.
mu sync.Mutex
// a set of assumed pod keys.
// The key could further be used to get an entry in podStates.
assumedPods map[string]bool
// a map from pod key to podState.
podStates map[string]*podState
nodes map[string]*NodeInfo
}
(3) 在genericScheduler物件中建立cachenodeinfo物件
type genericScheduler struct {
cache schedulercache.Cache
predicates map[string]algorithm.FitPredicate
priorityMetaProducer algorithm.MetadataProducer
predicateMetaProducer algorithm.MetadataProducer
prioritizers []algorithm.PriorityConfig
extenders []algorithm.SchedulerExtender
pods algorithm.PodLister
lastNodeIndexLock sync.Mutex
lastNodeIndex uint64
cachedNodeInfoMap map[string]*schedulercache.NodeInfo
equivalenceCache *EquivalenceCache
}
二、Kubernetes Scheduler中Node資訊同步
在scheduleOne中每排程一個pod,則排程AssumePod函式,將Pod的資訊同步到schedulerCache中,在同步pod資訊的同時,同步修改對應的node資訊。
func (cache *schedulerCache) AssumePod(pod *v1.Pod) error {
key, err := getPodKey(pod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
if _, ok := cache.podStates[key]; ok {
return fmt.Errorf("pod %v state wasn't initial but get assumed", key)
}
cache.addPod(pod)
ps := &podState{
pod: pod,
}
cache.podStates[key] = ps
cache.assumedPods[key] = true
return nil
}
// Assumes that lock is already acquired.
func (cache *schedulerCache) addPod(pod *v1.Pod) {
n, ok := cache.nodes[pod.Spec.NodeName]
if !ok {
n = NewNodeInfo()
cache.nodes[pod.Spec.NodeName] = n
}
n.addPod(pod)
}
// addPod adds pod information to this NodeInfo.
func (n *NodeInfo) addPod(pod *v1.Pod) {
res, non0_cpu, non0_mem := calculateResource(pod)
n.requestedResource.MilliCPU += res.MilliCPU
n.requestedResource.Memory += res.Memory
n.requestedResource.NvidiaGPU += res.NvidiaGPU
if n.requestedResource.OpaqueIntResources == nil && len(res.OpaqueIntResources) > 0 {
n.requestedResource.OpaqueIntResources = map[v1.ResourceName]int64{}
}
for rName, rQuant := range res.OpaqueIntResources {
n.requestedResource.OpaqueIntResources[rName] += rQuant
}
n.nonzeroRequest.MilliCPU += non0_cpu
n.nonzeroRequest.Memory += non0_mem
n.pods = append(n.pods, pod)
if hasPodAffinityConstraints(pod) {
n.podsWithAffinity = append(n.podsWithAffinity, pod)
}
n.generation++
}
在Informer監聽到有node資訊變化時,會呼叫在初始化時傳入的對應的函式,具體的函式如下:
AddFunc: c.addNodeToCache,
UpdateFunc: c.updateNodeInCache,
DeleteFunc: c.deleteNodeFromCache,
func (c *ConfigFactory) addNodeToCache(obj interface{}) {
node, ok := obj.(*v1.Node)
if !ok {
glog.Errorf("cannot convert to *v1.Node: %v", obj)
return
}
if err := c.schedulerCache.AddNode(node); err != nil {
glog.Errorf("scheduler cache AddNode failed: %v", err)
}
}
func (cache *schedulerCache) AddNode(node *v1.Node) error {
cache.mu.Lock()
defer cache.mu.Unlock()
n, ok := cache.nodes[node.Name]
if !ok {
n = NewNodeInfo()
cache.nodes[node.Name] = n
}
return n.SetNode(node)
}
三、Kubernetes Scheduler中Node資訊在排程演算法中的使用
Kubernetes Scheduler中每次排程,輸入pod的資訊和node節點的資訊。其中node的節點的資訊來源於兩部分,一份來源於nodelisten另外一部分來源於schedulerCache中快取的nodeinfo資訊。
具體的流程如下
(1)通過nodelisten獲取node的名稱資訊
dest, err := s.config.Algorithm.Schedule(pod, s.config.NodeLister)
trace.Step("Computing predicates")
filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders, g.predicateMetaProducer)
if err != nil {
return "", err
}
fmt.Printf("[%s] is fit, Unschedulable:%v ,\n",nodeName,nodeNameToInfo[nodeName].Node().Spec.Unschedulable)
fits, failedPredicates, err := podFitsOnNode(pod, meta, nodeNameToInfo[nodeName], predicateFuncs)
(2)通過同步schedulerCache中快取的nodeinfo資訊作為排程演算法中node資訊的輸入
同步schedulerCache中快取的nodeinfo資訊,在genericScheduler物件的Schedule函式中:
// Used for all fit and priority funcs.
err = g.cache.UpdateNodeNameToInfoMap(g.cachedNodeInfoMap)
if err != nil {
return "", err
}
func (cache *schedulerCache) UpdateNodeNameToInfoMap(nodeNameToInfo map[string]*NodeInfo) error {
cache.mu.Lock()
defer cache.mu.Unlock()
for name, info := range cache.nodes {
if current, ok := nodeNameToInfo[name]; !ok || current.generation != info.generation {
nodeNameToInfo[name] = info.Clone()
}
}
for name := range nodeNameToInfo {
if _, ok := cache.nodes[name]; !ok {
delete(nodeNameToInfo, name)
}
}
return nil
}
在排程演算法中使用:
trace.Step("Computing predicates")
filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders, g.predicateMetaProducer)
if err != nil {
return "", err
}
fmt.Printf("[%s] is fit, Unschedulable:%v ,\n",nodeName,nodeNameToInfo[nodeName].Node().Spec.Unschedulable)
fits, failedPredicates, err := podFitsOnNode(pod, meta, nodeNameToInfo[nodeName], predicateFuncs)
if err != nil {
predicateResultLock.Lock()
errs = append(errs, err)
predicateResultLock.Unlock()
return
}
這就是Kubernetes中schedule墨跡Node資訊管理的基本的流程。主要涉及到node資訊的同步和node資訊的使用