1. 程式人生 > 其它 >[原始碼分析-kubernetes]6. 優選過程

[原始碼分析-kubernetes]6. 優選過程

優選過程

走近priority過程

!FILENAME pkg/scheduler/core/generic_scheduler.go:186

priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)

今天的分析從這行程式碼開始。

PrioritizeNodes要做的事情是給已經通過predicate的nodes賦上一個分值,從而抉出一個最優node用於運行當前pod. 第一次看priority可能會一臉蒙,和predicate中的邏輯不太一樣;大夥得耐下性子多思考,實在有障礙也可以先不求甚解,整體過完後再二刷程式碼,再不行三刷,總會大徹大悟的!

從註釋中可以找到關於PrioritizeNodes的原理(pkg/scheduler/core/generic_scheduler.go:624):

  • PrioritizeNodes通過併發呼叫一個個priority函式來給node排優先順序。每一個priority函式會給一個1-10之間的分值,0最低10最高。
  • 每一個priority函式可以有自己的權重,單個函式返回的分值*權重後得到一個加權分值,最終所有的加權分值加在一起就是這個node的最終分值。

然後我們先函式簽名入手:

!FILENAME pkg/scheduler/core/generic_scheduler.go:624

func PrioritizeNodes(
	pod *v1.Pod,
	nodeNameToInfo map[string]*schedulercache.NodeInfo,
	meta interface{},
	priorityConfigs []algorithm.PriorityConfig,
	nodes []*v1.Node,
	extenders []algorithm.SchedulerExtender,
) (schedulerapi.HostPriorityList, error) 

形參定義和返回值:

  • pod *v1.Pod* // pod就不用說了;
  • *nodeNameToInfo map[string]*schedulercache.NodeInfo // 這個也不需要講,字面意思代表一切;
  • meta interface{} // 和predicate裡的meta不太一樣,下面會貼個debug的圖先,具體後面再看;
  • priorityConfigs []algorithm.PriorityConfig // 包含優選演算法各種資訊,比較重要;
  • nodes []*v1.Node // node集合,不需要解釋了;
  • extenders []algorithm.SchedulerExtender
    // extender邏輯放到後面單獨講。

meta實參長這個樣子:

返回值只需要看一下schedulerapi.HostPriorityList型別的含義了,這個型別之前也提過,後面頻繁涉及到操作這個結構,所以這裡再貼一次,大夥得爛熟於心才行!

!FILENAME pkg/scheduler/api/types.go:305

type HostPriority struct {
	Host string
	Score int
}
type HostPriorityList []HostPriority

著重分析一下這2個type,雖然很簡單,還是有必要囉嗦一下,必須記在心裡。HostPriority這個struct的屬性是HostScore,一個是string一個是int,所以很明顯HostPriority所能夠儲存的資訊是一個節點的名字和分值,再仔細一點說就是這個結構儲存的是一個node在一個priority演算法計算後所得到的結果;然後看HostPriorityList型別,這個型別是上一個型別的“集合”,集合表達的是一個node多個演算法還是多個node一個演算法呢?稍微思考一下可以知道HostPriorityList中存的是多個Host和Score的組合,所以HostPriorityList這個結構是要儲存一個演算法作用於所有node之後,得到的所有node的Score資訊的。(這裡我們先理解成一個演算法的結果,作為函式返回值這裡肯定是要保留所有演算法作用後的最終node的Score,所以函式後半部分肯定有combine分值的步驟。)

PrioritizeNodes整體流程

前面說到PrioritizeNodes()函式也就是node優選的具體邏輯,這個函式略長,我們分段講解。

Results

PrioritizeNodes()函式開頭的邏輯很簡單,我們先從第一行看到results定義的這一行。

!FILENAME pkg/scheduler/core/generic_scheduler.go:634

if len(priorityConfigs) == 0 && len(extenders) == 0 {
    // 這個if很明顯是處理特殊場景的,就是優選演算法一個都沒有配置(extenders同樣沒有配置)的時候怎麼做;
    // 這個result是要當作返回值的,HostPriorityList型別前面嘮叨了很多了,大家得心裡有數;
   result := make(schedulerapi.HostPriorityList, 0, len(nodes))
   for i := range nodes {
       // 這一行程式碼是唯一的“邏輯了”,下面直到for結束都是簡單程式碼;所以我們看一下EqualPriorityMap
       // 函式的作用就行了。這裡我不貼程式碼,這個函式很短,作用就是設定每個node的Score相同(都為1)
       // hostPriority的型別也就是schedulerapi.HostPriority型別,再次強調這個型別是要爛熟於心的;
      hostPriority, err := EqualPriorityMap(pod, meta, nodeNameToInfo[nodes[i].Name])
      if err != nil {
         return nil, err
      }
       // 最終的result也就是設定了每個node的Score為1的schedulerapi.HostPriorityList型別資料;
      result = append(result, hostPriority)
   }
   return result, nil
}
// 這裡只是簡單定義3個變數,一把鎖,一個併發等待相關的wg,一個錯誤集合errs;
var (
   mu   = sync.Mutex{}
   wg   = sync.WaitGroup{}
   errs []error
)
// 這裡定義了一個appendError小函式,邏輯很簡單,併發場景下將錯誤資訊收集到errs中;
appendError := func(err error) {
   mu.Lock()
   defer mu.Unlock()
   errs = append(errs, err)
}
// 最後一個變數results也不難理解,型別是[]schedulerapi.HostPriorityList,這裡需要注意這個型別
// 的作用,它儲存的是所有演算法作用所有node之後得到的結果集,相當於一個二維陣列,每個格子是1個演算法
// 作用於1個節點的結果,一行也就是1個演算法作用於所有節點的結果;一行展成一個二維就是所有演算法作用於所有節點;
results := make([]schedulerapi.HostPriorityList, len(priorityConfigs), len(priorityConfigs))

到這裡要求大家心中能夠想象上面提到的results是什麼樣的,可以藉助紙筆畫一畫。下面的程式碼會往這個二維結構裡面儲存資料。

Old Priority Function

我們既然講到“老式”,後面肯定有對應的“新式”。雖然這種函式已經DEPRECATED了,不過對於我們學習掌握優選流程還是很有幫助的。預設的優選演算法裡其實也只有1個是這在old形式的了:

貼這塊程式碼之前我們先關注一下多次出現的priorityConfigs這個變數的型別:

函式形參中有寫到:priorityConfigs []algorithm.PriorityConfig,所以我們直接看PriorityConfig是什麼型別:

!FILENAME pkg/scheduler/algorithm/types.go:62

// PriorityConfig is a config used for a priority function.
type PriorityConfig struct {
   Name   string
   Map    PriorityMapFunction
   Reduce PriorityReduceFunction
   // TODO: Remove it after migrating all functions to
   // Map-Reduce pattern.
   Function PriorityFunction
   Weight   int
}

PriorityConfig中有一個Name,一個Weight,很好猜到意思,名字和權重嘛。剩下的Map、Reduce和Function目測代表的就是優選函式的新舊兩種表達方式了。我們先看舊的Function屬性的型別PriorityFunction是什麼:

!FILENAME pkg/scheduler/algorithm/types.go:59

type PriorityFunction func(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*v1.Node) (schedulerapi.HostPriorityList, error)

很明顯這個型別代表了一個priority函式,入參是pod、nodeNameToInfo和nodes,返回值是HostPriorityList,也就是我們前面提到的1個priority函式作用於每個node後得到了Score資訊,存結果的結構就是這個HostPriorityList;看起來很和諧~

然後講回PrioritizeNodes過程:

!FILENAME pkg/scheduler/core/generic_scheduler.go:661

for i := range priorityConfigs {
    // 如果第i個優選配置(priorityConfig)定義了老函式,則呼叫之;
	if priorityConfigs[i].Function != nil {
		wg.Add(1)
        // 注意這裡的引數index,這裡傳入的實參是上面的i;
		go func(index int) {
			defer wg.Done()
			var err error
            // 所以這裡的results[index]就好理解了;後面priorityConfigs[index]的索引也是index,
            // 這裡表達的是第N個優選配置裡有Function,那麼這個Function的計算結果儲存在
            // results的第N個格子裡;
			results[index], err = priorityConfigs[index].Function(pod, nodeNameToInfo, nodes)
			if err != nil {
				appendError(err)
			}
		}(i)
	} else {
        // 如果沒有定義Function,其實也就是使用了Map-Reduce方式的,這裡先存個空的結構佔位;
		results[i] = make(schedulerapi.HostPriorityList, len(nodes))
	}
}

上面這段程式碼邏輯還算好理解,唯一有點小繞的還是前面強調的HostPriorityList相關型別的操作上。

Map-Reduce

關於map-reduce思想我就不在這裡贅述了,大資料行業很流行的一個詞彙,百度一下(如果你能夠google,自然更好咯)可以找到一大堆介紹的文章。

簡單說map-reduce就是:Map是對映,Reduce是歸約;map是統計一本書中的一頁出現了多少次k8s這個詞,reduce是將這些map結果彙總在一起得到最終結果。(map一般都是將一個演算法作用於一堆資料集的每一個元素,得到一個結果集,reduce有各種形式,可以是累加這些結果,或者是對這個結果集做其他複雜的f(x)操作。

看看在Scheduler裡面是怎麼用Map-Reduce的吧:

// 這個併發邏輯之前介紹過了,我們直接看ParallelizeUntil的最後一個引數就行,這裡直接寫了一個匿名函式;
workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
    // 這裡的index是[0,len(nodes)-1],相當於遍歷所有的nodes;
   nodeInfo := nodeNameToInfo[nodes[index].Name]
    // 這個for迴圈遍歷的是所有的優選配置,如果有老Fun就跳過,新邏輯就繼續;
   for i := range priorityConfigs {
      if priorityConfigs[i].Function != nil {
          // 因為前面old已經執行過了
         continue
      }

      var err error
       // 這裡的i和前面老Fun的互補,老Fun中沒有賦值的results中的元素就在這裡賦值了;
       // 注意到這裡呼叫了一個Map函式就直接賦值給了results[i][index],這裡的index是第一行這個
       // 匿名函式的形參,通過ParallelizeUntil這個併發實現所有node對應一個優選演算法的分值計算;
      results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
      if err != nil {
         appendError(err)
         results[i][index].Host = nodes[index].Name
      }
   }
})

for i := range priorityConfigs {
    // 沒有定義Reduce函式就不處理;
   if priorityConfigs[i].Reduce == nil {
      continue
   }
   wg.Add(1)
   go func(index int) {
      defer wg.Done()
       // 呼叫Reduce函式
      if err := priorityConfigs[index].Reduce(pod, meta, nodeNameToInfo, results[index]); err != nil {
         appendError(err)
      }
      if klog.V(10) {
         for _, hostPriority := range results[index] {
            klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), hostPriority.Host, priorityConfigs[index].Name, hostPriority.Score)
         }
      }
   }(i)
}
// Wait for all computations to be finished.
wg.Wait()
if len(errs) != 0 {
   return schedulerapi.HostPriorityList{}, errors.NewAggregate(errs)
}

看到這裡我們可以發現老Fun和Map的區別不大,都是優選函式的執行過程。那為什麼會存在兩種形式呢?我們看完PrioritizeNodes整體流程後通過具體的Fun和Map-Reduce實現來看二者的區別。

Combine Scores

這塊的程式碼很簡單,我們先拋開extender的邏輯,剩下的程式碼如下:

// Summarize all scores.
// 這個result和前面的results類似,result用於儲存每個node的Score,到這裡已經沒有必要區分演算法了;
result := make(schedulerapi.HostPriorityList, 0, len(nodes))
// 迴圈執行len(nodes)次
for i := range nodes {
    // 先在result中塞滿所有node的Name,Score初始化為0;
   result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0})
    // 執行了多少個priorityConfig就有多少個Score,所以這裡遍歷len(priorityConfigs)次;
   for j := range priorityConfigs {
       // 每個演算法對應第i個node的結果分值加權後累加;
      result[i].Score += results[j][i].Score * priorityConfigs[j].Weight
   }
}

return result, nil

這塊邏輯很清晰,要將前面得到的二維結果results壓縮成一維的加權分值集合result,最終返回這個result.

從這裡我們還可以得到一個結論,不管是Fun還是Map-Reduce,處理的結果都是填充results這個二維結構,所以Map-Reduce也沒有什麼神祕的,下面通過具體的演算法來看二者有何異同。

Fun和Map-Reduce例項分析

InterPodAffinityPriority(Function)

這個演算法做的是Pod間親和性優選,也就是親和pod越多的節點分值越高,反親和pod越多分值越低。

我們撇開具體的親和性計算規則,從優選函式的形式上看一下這段程式碼的邏輯:

!FILENAME pkg/scheduler/algorithm/priorities/interpod_affinity.go:119

func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*v1.Node) (schedulerapi.HostPriorityList, error) {
	affinity := pod.Spec.Affinity
    // 是否有親和性約束;
	hasAffinityConstraints := affinity != nil && affinity.PodAffinity != nil
    // 是否有反親和性約束;
	hasAntiAffinityConstraints := affinity != nil && affinity.PodAntiAffinity != nil
	
    // 這裡有一段根據親和性和反親和性來計算一個node上匹配的pod數量的邏輯,我們先跳過這些邏輯,從優選演算法實現的角度看這個演算法的架子;
    
    // 當遍歷完所有的node之後,可以得到1個最高分和1個最低分,分別記為maxCount和minCount;
    for _, node := range nodes {
		if pm.counts[node.Name] > maxCount {
			maxCount = pm.counts[node.Name]
		}
		if pm.counts[node.Name] < minCount {
			minCount = pm.counts[node.Name]
		}
	}
	// 這個result型別和前面看到的一樣,都是儲存單個演算法的計算結果的;
	result := make(schedulerapi.HostPriorityList, 0, len(nodes))
	for _, node := range nodes {
		fScore := float64(0)
        // 如果分差大於0,也就是說不是所有的node都一樣的情況,需要對分值做一個處理;
		if (maxCount - minCount) > 0 {
            // MaxPriority定義的是優選最高分10,第二個因數是當前node的count-最小count,
            // 然後除以(maxCount - minCount);舉個例子,當前node的計算結果是5,最大count是20,最小
            // count是-3,那麼這裡就是10*[5-(-3)/20-(-3)]
            // 這個計算的結果顯然會在[0-10]之間;
			fScore = float64(schedulerapi.MaxPriority) * ((pm.counts[node.Name] - minCount) / (maxCount - minCount))
		}
        // 如果分差不大於0,這時候int(fScore)也就是0,對於各個node的結果都是0;
		result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: int(fScore)})
	}
	return result, nil
}

如上,我們可以發現最終這個函式計算出了每個node的分值,這個分值在[0-10]之間。所以說到底Function做的事情就是根據一定的規則給每個node賦一個分值,這個分值要求在[0-10]之間,然後把這個HostPriorityList返回就行。

CalculateNodeAffinityPriorityMap(Map)

這個演算法和上一個類似,上一個是Pod的Affinity,這個是Node的Affinity;我們來看程式碼:

!FILENAME pkg/scheduler/algorithm/priorities/node_affinity.go:34

func CalculateNodeAffinityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) {
	node := nodeInfo.Node()
	if node == nil {
		return schedulerapi.HostPriority{}, fmt.Errorf("node not found")
	}

	// default is the podspec.
	affinity := pod.Spec.Affinity
	if priorityMeta, ok := meta.(*priorityMetadata); ok {
		// We were able to parse metadata, use affinity from there.
		affinity = priorityMeta.affinity
	}

	var count int32
	if affinity != nil && affinity.NodeAffinity != nil && affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution != nil {
		// Match PreferredDuringSchedulingIgnoredDuringExecution term by term.
		for i := range affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution {
			preferredSchedulingTerm := &affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution[i]
			if preferredSchedulingTerm.Weight == 0 {
				continue
			}

			nodeSelector, err := v1helper.NodeSelectorRequirementsAsSelector(preferredSchedulingTerm.Preference.MatchExpressions)
			if err != nil {
				return schedulerapi.HostPriority{}, err
			}
			if nodeSelector.Matches(labels.Set(node.Labels)) {
				count += preferredSchedulingTerm.Weight
			}
		}
	}

	return schedulerapi.HostPriority{
		Host:  node.Name,
		Score: int(count),
	}, nil
}

撇開具體的親和性計算細節,我們可以發現這個的count沒有特定的規則,可能會加到10以上;另外這裡的返回值是HostPriority型別,前面的Function返回了HostPriorityList型別。

map函式

!FILENAME pkg/scheduler/algorithm/priorities/selector_spreading.go:221

func (s *ServiceAntiAffinity) CalculateAntiAffinityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) {
	var firstServiceSelector labels.Selector

	node := nodeInfo.Node()
	if node == nil {
		return schedulerapi.HostPriority{}, fmt.Errorf("node not found")
	}
	priorityMeta, ok := meta.(*priorityMetadata)
	if ok {
		firstServiceSelector = priorityMeta.podFirstServiceSelector
	} else {
		firstServiceSelector = getFirstServiceSelector(pod, s.serviceLister)
	}
    // 查詢給定node在給定namespace下符合selector的pod,返回值是[]*v1.Pod
	matchedPodsOfNode := filteredPod(pod.Namespace, firstServiceSelector, nodeInfo)

	return schedulerapi.HostPriority{
		Host:  node.Name,
        // 返回值中Score設定成上面找到的pod的數量
		Score: int(len(matchedPodsOfNode)),
	}, nil
}

這個函式比較短,可以看到在指定node上查詢到匹配selector的pod越多,分值就越高。假設找到了20個,那麼這裡的分值就是20;假設找到的是2,那這裡的分值就是2.

CalculateNodeAffinityPriorityReduce(Reduce)

和上面這個Map對應的Reduce函式其實沒有單獨實現,通過NormalizeReduce函式做了一個通用的Reduce處理:

!FILENAME pkg/scheduler/algorithm/priorities/node_affinity.go:77

var CalculateNodeAffinityPriorityReduce = NormalizeReduce(schedulerapi.MaxPriority, false)

!FILENAME pkg/scheduler/algorithm/priorities/reduce.go:29

func NormalizeReduce(maxPriority int, reverse bool) algorithm.PriorityReduceFunction {
	return func(
		_ *v1.Pod,
		_ interface{},
		_ map[string]*schedulercache.NodeInfo,
        // 注意到這個result是HostPriorityList,對應1個演算法N個node的結果集
		result schedulerapi.HostPriorityList) error {

		var maxCount int
        // 遍歷result將最高的Score賦值給maxCount;
		for i := range result {
			if result[i].Score > maxCount {
				maxCount = result[i].Score
			}
		}

		if maxCount == 0 {
			if reverse {
				for i := range result {
					result[i].Score = maxPriority
				}
			}
			return nil
		}

		for i := range result {
			score := result[i].Score
            // 舉個例子:10*(5/20)
			score = maxPriority * score / maxCount
			if reverse {
                // 如果score是3,得到7;如果score是4,得到6,結果反轉;
				score = maxPriority - score
			}

			result[i].Score = score
		}
		return nil
	}
}

小結

  • Function:一個演算法一次性計算出所有node的Score,這個Score的範圍是規定的[0-10];
  • Map-Reduce:一個Map演算法計算1個node的Score,這個Score可以靈活處理,可能是20,可能是-3;Map過程併發進行;最終得到的結果result通過Reduce歸約,將這個演算法對應的所有node的分值歸約為[0-10];

本節有幾張圖是goland debug的截圖,我們目前還沒有提到如何debug;不過本節內容的閱讀基本是不影響的。下一節原始碼分析內容發出來前我會在“環境準備”這一章中增加如何開始debug的內容,大家可以選擇開始debug的時機。

引用連結:

gitbook:https://farmer-hutao.github.io/k8s-source-code-analysis/
github:https://hub.fastgit.org/daniel-hutao/k8s-source-code-analysis