[原始碼分析-kubernetes]6. 優選過程
優選過程
走近priority過程
!FILENAME pkg/scheduler/core/generic_scheduler.go:186
priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
今天的分析從這行程式碼開始。
PrioritizeNodes
要做的事情是給已經通過predicate的nodes賦上一個分值,從而抉出一個最優node用於運行當前pod. 第一次看priority可能會一臉蒙,和predicate中的邏輯不太一樣;大夥得耐下性子多思考,實在有障礙也可以先不求甚解,整體過完後再二刷程式碼,再不行三刷,總會大徹大悟的!
從註釋中可以找到關於PrioritizeNodes的原理(pkg/scheduler/core/generic_scheduler.go:624
):
- PrioritizeNodes通過併發呼叫一個個priority函式來給node排優先順序。每一個priority函式會給一個1-10之間的分值,0最低10最高。
- 每一個priority函式可以有自己的權重,單個函式返回的分值*權重後得到一個加權分值,最終所有的加權分值加在一起就是這個node的最終分值。
然後我們先函式簽名入手:
!FILENAME pkg/scheduler/core/generic_scheduler.go:624
func PrioritizeNodes( pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, meta interface{}, priorityConfigs []algorithm.PriorityConfig, nodes []*v1.Node, extenders []algorithm.SchedulerExtender, ) (schedulerapi.HostPriorityList, error)
形參定義和返回值:
pod *v1.Pod*
// pod就不用說了;*nodeNameToInfo map[string]*schedulercache.NodeInfo
// 這個也不需要講,字面意思代表一切;meta interface{}
// 和predicate裡的meta不太一樣,下面會貼個debug的圖先,具體後面再看;priorityConfigs []algorithm.PriorityConfig
// 包含優選演算法各種資訊,比較重要;nodes []*v1.Node
// node集合,不需要解釋了;extenders []algorithm.SchedulerExtender
meta實參長這個樣子:
返回值只需要看一下schedulerapi.HostPriorityList
型別的含義了,這個型別之前也提過,後面頻繁涉及到操作這個結構,所以這裡再貼一次,大夥得爛熟於心才行!
!FILENAME pkg/scheduler/api/types.go:305
type HostPriority struct {
Host string
Score int
}
type HostPriorityList []HostPriority
著重分析一下這2個type,雖然很簡單,還是有必要囉嗦一下,必須記在心裡。HostPriority這個struct的屬性是Host和Score,一個是string一個是int,所以很明顯HostPriority所能夠儲存的資訊是一個節點的名字和分值,再仔細一點說就是這個結構儲存的是一個node在一個priority演算法計算後所得到的結果;然後看HostPriorityList型別,這個型別是上一個型別的“集合”,集合表達的是一個node多個演算法還是多個node一個演算法呢?稍微思考一下可以知道HostPriorityList中存的是多個Host和Score的組合,所以HostPriorityList這個結構是要儲存一個演算法作用於所有node之後,得到的所有node的Score資訊的。(這裡我們先理解成一個演算法的結果,作為函式返回值這裡肯定是要保留所有演算法作用後的最終node的Score,所以函式後半部分肯定有combine分值的步驟。)
PrioritizeNodes整體流程
前面說到PrioritizeNodes()
函式也就是node優選的具體邏輯,這個函式略長,我們分段講解。
Results
PrioritizeNodes()函式開頭的邏輯很簡單,我們先從第一行看到results定義的這一行。
!FILENAME pkg/scheduler/core/generic_scheduler.go:634
if len(priorityConfigs) == 0 && len(extenders) == 0 {
// 這個if很明顯是處理特殊場景的,就是優選演算法一個都沒有配置(extenders同樣沒有配置)的時候怎麼做;
// 這個result是要當作返回值的,HostPriorityList型別前面嘮叨了很多了,大家得心裡有數;
result := make(schedulerapi.HostPriorityList, 0, len(nodes))
for i := range nodes {
// 這一行程式碼是唯一的“邏輯了”,下面直到for結束都是簡單程式碼;所以我們看一下EqualPriorityMap
// 函式的作用就行了。這裡我不貼程式碼,這個函式很短,作用就是設定每個node的Score相同(都為1)
// hostPriority的型別也就是schedulerapi.HostPriority型別,再次強調這個型別是要爛熟於心的;
hostPriority, err := EqualPriorityMap(pod, meta, nodeNameToInfo[nodes[i].Name])
if err != nil {
return nil, err
}
// 最終的result也就是設定了每個node的Score為1的schedulerapi.HostPriorityList型別資料;
result = append(result, hostPriority)
}
return result, nil
}
// 這裡只是簡單定義3個變數,一把鎖,一個併發等待相關的wg,一個錯誤集合errs;
var (
mu = sync.Mutex{}
wg = sync.WaitGroup{}
errs []error
)
// 這裡定義了一個appendError小函式,邏輯很簡單,併發場景下將錯誤資訊收集到errs中;
appendError := func(err error) {
mu.Lock()
defer mu.Unlock()
errs = append(errs, err)
}
// 最後一個變數results也不難理解,型別是[]schedulerapi.HostPriorityList,這裡需要注意這個型別
// 的作用,它儲存的是所有演算法作用所有node之後得到的結果集,相當於一個二維陣列,每個格子是1個演算法
// 作用於1個節點的結果,一行也就是1個演算法作用於所有節點的結果;一行展成一個二維就是所有演算法作用於所有節點;
results := make([]schedulerapi.HostPriorityList, len(priorityConfigs), len(priorityConfigs))
到這裡要求大家心中能夠想象上面提到的results是什麼樣的,可以藉助紙筆畫一畫。下面的程式碼會往這個二維結構裡面儲存資料。
Old Priority Function
我們既然講到“老式”,後面肯定有對應的“新式”。雖然這種函式已經DEPRECATED了,不過對於我們學習掌握優選流程還是很有幫助的。預設的優選演算法裡其實也只有1個是這在old形式的了:
貼這塊程式碼之前我們先關注一下多次出現的priorityConfigs
這個變數的型別:
函式形參中有寫到:priorityConfigs []algorithm.PriorityConfig
,所以我們直接看PriorityConfig是什麼型別:
!FILENAME pkg/scheduler/algorithm/types.go:62
// PriorityConfig is a config used for a priority function.
type PriorityConfig struct {
Name string
Map PriorityMapFunction
Reduce PriorityReduceFunction
// TODO: Remove it after migrating all functions to
// Map-Reduce pattern.
Function PriorityFunction
Weight int
}
PriorityConfig中有一個Name,一個Weight,很好猜到意思,名字和權重嘛。剩下的Map、Reduce和Function目測代表的就是優選函式的新舊兩種表達方式了。我們先看舊的Function屬性的型別PriorityFunction是什麼:
!FILENAME pkg/scheduler/algorithm/types.go:59
type PriorityFunction func(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*v1.Node) (schedulerapi.HostPriorityList, error)
很明顯這個型別代表了一個priority函式,入參是pod、nodeNameToInfo和nodes,返回值是HostPriorityList,也就是我們前面提到的1個priority函式作用於每個node後得到了Score資訊,存結果的結構就是這個HostPriorityList;看起來很和諧~
然後講回PrioritizeNodes過程:
!FILENAME pkg/scheduler/core/generic_scheduler.go:661
for i := range priorityConfigs {
// 如果第i個優選配置(priorityConfig)定義了老函式,則呼叫之;
if priorityConfigs[i].Function != nil {
wg.Add(1)
// 注意這裡的引數index,這裡傳入的實參是上面的i;
go func(index int) {
defer wg.Done()
var err error
// 所以這裡的results[index]就好理解了;後面priorityConfigs[index]的索引也是index,
// 這裡表達的是第N個優選配置裡有Function,那麼這個Function的計算結果儲存在
// results的第N個格子裡;
results[index], err = priorityConfigs[index].Function(pod, nodeNameToInfo, nodes)
if err != nil {
appendError(err)
}
}(i)
} else {
// 如果沒有定義Function,其實也就是使用了Map-Reduce方式的,這裡先存個空的結構佔位;
results[i] = make(schedulerapi.HostPriorityList, len(nodes))
}
}
上面這段程式碼邏輯還算好理解,唯一有點小繞的還是前面強調的HostPriorityList相關型別的操作上。
Map-Reduce
關於map-reduce思想我就不在這裡贅述了,大資料行業很流行的一個詞彙,百度一下(如果你能夠google,自然更好咯)可以找到一大堆介紹的文章。
簡單說map-reduce就是:Map是對映,Reduce是歸約;map是統計一本書中的一頁出現了多少次k8s這個詞,reduce是將這些map結果彙總在一起得到最終結果。(map一般都是將一個演算法作用於一堆資料集的每一個元素,得到一個結果集,reduce有各種形式,可以是累加這些結果,或者是對這個結果集做其他複雜的f(x)操作。
看看在Scheduler裡面是怎麼用Map-Reduce的吧:
// 這個併發邏輯之前介紹過了,我們直接看ParallelizeUntil的最後一個引數就行,這裡直接寫了一個匿名函式;
workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
// 這裡的index是[0,len(nodes)-1],相當於遍歷所有的nodes;
nodeInfo := nodeNameToInfo[nodes[index].Name]
// 這個for迴圈遍歷的是所有的優選配置,如果有老Fun就跳過,新邏輯就繼續;
for i := range priorityConfigs {
if priorityConfigs[i].Function != nil {
// 因為前面old已經執行過了
continue
}
var err error
// 這裡的i和前面老Fun的互補,老Fun中沒有賦值的results中的元素就在這裡賦值了;
// 注意到這裡呼叫了一個Map函式就直接賦值給了results[i][index],這裡的index是第一行這個
// 匿名函式的形參,通過ParallelizeUntil這個併發實現所有node對應一個優選演算法的分值計算;
results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
if err != nil {
appendError(err)
results[i][index].Host = nodes[index].Name
}
}
})
for i := range priorityConfigs {
// 沒有定義Reduce函式就不處理;
if priorityConfigs[i].Reduce == nil {
continue
}
wg.Add(1)
go func(index int) {
defer wg.Done()
// 呼叫Reduce函式
if err := priorityConfigs[index].Reduce(pod, meta, nodeNameToInfo, results[index]); err != nil {
appendError(err)
}
if klog.V(10) {
for _, hostPriority := range results[index] {
klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), hostPriority.Host, priorityConfigs[index].Name, hostPriority.Score)
}
}
}(i)
}
// Wait for all computations to be finished.
wg.Wait()
if len(errs) != 0 {
return schedulerapi.HostPriorityList{}, errors.NewAggregate(errs)
}
看到這裡我們可以發現老Fun和Map的區別不大,都是優選函式的執行過程。那為什麼會存在兩種形式呢?我們看完PrioritizeNodes整體流程後通過具體的Fun和Map-Reduce實現來看二者的區別。
Combine Scores
這塊的程式碼很簡單,我們先拋開extender的邏輯,剩下的程式碼如下:
// Summarize all scores.
// 這個result和前面的results類似,result用於儲存每個node的Score,到這裡已經沒有必要區分演算法了;
result := make(schedulerapi.HostPriorityList, 0, len(nodes))
// 迴圈執行len(nodes)次
for i := range nodes {
// 先在result中塞滿所有node的Name,Score初始化為0;
result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0})
// 執行了多少個priorityConfig就有多少個Score,所以這裡遍歷len(priorityConfigs)次;
for j := range priorityConfigs {
// 每個演算法對應第i個node的結果分值加權後累加;
result[i].Score += results[j][i].Score * priorityConfigs[j].Weight
}
}
return result, nil
這塊邏輯很清晰,要將前面得到的二維結果results壓縮成一維的加權分值集合result,最終返回這個result.
從這裡我們還可以得到一個結論,不管是Fun還是Map-Reduce,處理的結果都是填充results這個二維結構,所以Map-Reduce也沒有什麼神祕的,下面通過具體的演算法來看二者有何異同。
Fun和Map-Reduce例項分析
InterPodAffinityPriority(Function)
這個演算法做的是Pod間親和性優選,也就是親和pod越多的節點分值越高,反親和pod越多分值越低。
我們撇開具體的親和性計算規則,從優選函式的形式上看一下這段程式碼的邏輯:
!FILENAME pkg/scheduler/algorithm/priorities/interpod_affinity.go:119
func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*v1.Node) (schedulerapi.HostPriorityList, error) {
affinity := pod.Spec.Affinity
// 是否有親和性約束;
hasAffinityConstraints := affinity != nil && affinity.PodAffinity != nil
// 是否有反親和性約束;
hasAntiAffinityConstraints := affinity != nil && affinity.PodAntiAffinity != nil
// 這裡有一段根據親和性和反親和性來計算一個node上匹配的pod數量的邏輯,我們先跳過這些邏輯,從優選演算法實現的角度看這個演算法的架子;
// 當遍歷完所有的node之後,可以得到1個最高分和1個最低分,分別記為maxCount和minCount;
for _, node := range nodes {
if pm.counts[node.Name] > maxCount {
maxCount = pm.counts[node.Name]
}
if pm.counts[node.Name] < minCount {
minCount = pm.counts[node.Name]
}
}
// 這個result型別和前面看到的一樣,都是儲存單個演算法的計算結果的;
result := make(schedulerapi.HostPriorityList, 0, len(nodes))
for _, node := range nodes {
fScore := float64(0)
// 如果分差大於0,也就是說不是所有的node都一樣的情況,需要對分值做一個處理;
if (maxCount - minCount) > 0 {
// MaxPriority定義的是優選最高分10,第二個因數是當前node的count-最小count,
// 然後除以(maxCount - minCount);舉個例子,當前node的計算結果是5,最大count是20,最小
// count是-3,那麼這裡就是10*[5-(-3)/20-(-3)]
// 這個計算的結果顯然會在[0-10]之間;
fScore = float64(schedulerapi.MaxPriority) * ((pm.counts[node.Name] - minCount) / (maxCount - minCount))
}
// 如果分差不大於0,這時候int(fScore)也就是0,對於各個node的結果都是0;
result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: int(fScore)})
}
return result, nil
}
如上,我們可以發現最終這個函式計算出了每個node的分值,這個分值在[0-10]之間。所以說到底Function做的事情就是根據一定的規則給每個node賦一個分值,這個分值要求在[0-10]之間,然後把這個HostPriorityList
返回就行。
CalculateNodeAffinityPriorityMap(Map)
這個演算法和上一個類似,上一個是Pod的Affinity,這個是Node的Affinity;我們來看程式碼:
!FILENAME pkg/scheduler/algorithm/priorities/node_affinity.go:34
func CalculateNodeAffinityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) {
node := nodeInfo.Node()
if node == nil {
return schedulerapi.HostPriority{}, fmt.Errorf("node not found")
}
// default is the podspec.
affinity := pod.Spec.Affinity
if priorityMeta, ok := meta.(*priorityMetadata); ok {
// We were able to parse metadata, use affinity from there.
affinity = priorityMeta.affinity
}
var count int32
if affinity != nil && affinity.NodeAffinity != nil && affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution != nil {
// Match PreferredDuringSchedulingIgnoredDuringExecution term by term.
for i := range affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution {
preferredSchedulingTerm := &affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution[i]
if preferredSchedulingTerm.Weight == 0 {
continue
}
nodeSelector, err := v1helper.NodeSelectorRequirementsAsSelector(preferredSchedulingTerm.Preference.MatchExpressions)
if err != nil {
return schedulerapi.HostPriority{}, err
}
if nodeSelector.Matches(labels.Set(node.Labels)) {
count += preferredSchedulingTerm.Weight
}
}
}
return schedulerapi.HostPriority{
Host: node.Name,
Score: int(count),
}, nil
}
撇開具體的親和性計算細節,我們可以發現這個的count沒有特定的規則,可能會加到10以上;另外這裡的返回值是HostPriority
型別,前面的Function返回了HostPriorityList
型別。
map函式
!FILENAME pkg/scheduler/algorithm/priorities/selector_spreading.go:221
func (s *ServiceAntiAffinity) CalculateAntiAffinityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) {
var firstServiceSelector labels.Selector
node := nodeInfo.Node()
if node == nil {
return schedulerapi.HostPriority{}, fmt.Errorf("node not found")
}
priorityMeta, ok := meta.(*priorityMetadata)
if ok {
firstServiceSelector = priorityMeta.podFirstServiceSelector
} else {
firstServiceSelector = getFirstServiceSelector(pod, s.serviceLister)
}
// 查詢給定node在給定namespace下符合selector的pod,返回值是[]*v1.Pod
matchedPodsOfNode := filteredPod(pod.Namespace, firstServiceSelector, nodeInfo)
return schedulerapi.HostPriority{
Host: node.Name,
// 返回值中Score設定成上面找到的pod的數量
Score: int(len(matchedPodsOfNode)),
}, nil
}
這個函式比較短,可以看到在指定node上查詢到匹配selector的pod越多,分值就越高。假設找到了20個,那麼這裡的分值就是20;假設找到的是2,那這裡的分值就是2.
CalculateNodeAffinityPriorityReduce(Reduce)
和上面這個Map對應的Reduce函式其實沒有單獨實現,通過NormalizeReduce
函式做了一個通用的Reduce處理:
!FILENAME pkg/scheduler/algorithm/priorities/node_affinity.go:77
var CalculateNodeAffinityPriorityReduce = NormalizeReduce(schedulerapi.MaxPriority, false)
!FILENAME pkg/scheduler/algorithm/priorities/reduce.go:29
func NormalizeReduce(maxPriority int, reverse bool) algorithm.PriorityReduceFunction {
return func(
_ *v1.Pod,
_ interface{},
_ map[string]*schedulercache.NodeInfo,
// 注意到這個result是HostPriorityList,對應1個演算法N個node的結果集
result schedulerapi.HostPriorityList) error {
var maxCount int
// 遍歷result將最高的Score賦值給maxCount;
for i := range result {
if result[i].Score > maxCount {
maxCount = result[i].Score
}
}
if maxCount == 0 {
if reverse {
for i := range result {
result[i].Score = maxPriority
}
}
return nil
}
for i := range result {
score := result[i].Score
// 舉個例子:10*(5/20)
score = maxPriority * score / maxCount
if reverse {
// 如果score是3,得到7;如果score是4,得到6,結果反轉;
score = maxPriority - score
}
result[i].Score = score
}
return nil
}
}
小結
- Function:一個演算法一次性計算出所有node的Score,這個Score的範圍是規定的[0-10];
- Map-Reduce:一個Map演算法計算1個node的Score,這個Score可以靈活處理,可能是20,可能是-3;Map過程併發進行;最終得到的結果result通過Reduce歸約,將這個演算法對應的所有node的分值歸約為[0-10];
本節有幾張圖是goland debug的截圖,我們目前還沒有提到如何debug;不過本節內容的閱讀基本是不影響的。下一節原始碼分析內容發出來前我會在“環境準備”這一章中增加如何開始debug的內容,大家可以選擇開始debug的時機。
引用連結:
gitbook:https://farmer-hutao.github.io/k8s-source-code-analysis/
github:https://hub.fastgit.org/daniel-hutao/k8s-source-code-analysis