1. 程式人生 > 其它 >[原始碼分析-kubernetes]5. 預選過程

[原始碼分析-kubernetes]5. 預選過程

預選過程

預選流程

predicate過程從pkg/scheduler/core/generic_scheduler.go:389 findNodesThatFit()方法就算正式開始了,這個方法根據給定的predicate functions過濾所有的nodes來尋找一堆可以跑pod的node集。老規矩,我們來看主幹程式碼:

!FILENAME pkg/scheduler/core/generic_scheduler.go:389

func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) {
	checkNode := func(i int) {
		fits, failedPredicates, err := podFitsOnNode(
			//……
		)
		if fits {
			length := atomic.AddInt32(&filteredLen, 1)
			filtered[length-1] = g.cachedNodeInfoMap[nodeName].Node()
		} 
	}
	workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)

	if len(filtered) > 0 && len(g.extenders) != 0 {
		for _, extender := range g.extenders {
			// Logic of extenders 
		}
	}
	return filtered, failedPredicateMap, nil
}

如上,刪的有點多,大家也可以看一下原函式然後對比一下,看看我為什麼只保留這一點。從上面程式碼中我們可以發現,最重要的是一個子函式呼叫過程fits, failedPredicates, err := podFitsOnNode(),這個函式的引數我沒有貼出來,下面會詳細講;下半部分是一個extender過程,extender不影響對predicate過程的理解,我們後面專門當作一個主題講。所以這裡的關注點是podFitsOnNode()函式。

predicate的併發

進入podFitsOnNode()函式邏輯之前,我們先看一下呼叫到podFitsOnNode()函式的匿名函式變數checkNode

是怎麼被呼叫的:

!FILENAME pkg/scheduler/core/generic_scheduler.go:458

workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)

ParallelizeUntil()函式是用於並行執行N個獨立的工作過程的,這個邏輯寫的挺有意思,我們看一下完整的程式碼(這段的分析思路寫到註釋裡哦):

!FILENAME vendor/k8s.io/client-go/util/workqueue/parallelizer.go:38

func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc) {
    // 從形參列表看,需要關注的有workers和pieces兩個數字型別的引數,doworkPiece這個函式型別的引數
    // DoWorkPieceFunc型別也就是func(piece int)型別
    // 注意到上面呼叫的時候workers的實參是16,pieces是allNodes,也就是node數量
	var stop <-chan struct{}
	if ctx != nil {
		stop = ctx.Done()
	}
	// 這裡定義toProcess的容量和pieces相等,也就是和node數量相等
	toProcess := make(chan int, pieces)
	for i := 0; i < pieces; i++ {
        // 假設有100個node,那麼這裡就寫了100個數到toProcess裡
		toProcess <- i
	}
    // 關閉了一個有快取的channel
	close(toProcess)
	// 如果pieces數量比較少,也就是說假設node只有10個,那麼workers就賦值為10個
    // 到這裡差不多可以猜到worker是併發工作數,當node大於16時併發是16,當node小於16時併發數就是node數
	if pieces < workers {
		workers = pieces
	}

	wg := sync.WaitGroup{}
	wg.Add(workers)
    // 要批量開goroutine了
	for i := 0; i < workers; i++ {
        // 如果100個node,這裡時16;如果是10個node,這裡是10
		go func() {
			defer utilruntime.HandleCrash()
			defer wg.Done()
			for piece := range toProcess {
                // 從toProcess中拿一個數,舉個例子,假如現在併發是10,那麼toProcess裡面存的資料其實
                // 也是10個,也就是1個goroutine拿到1個數,開始了一個下面的default邏輯;
                // 假設併發數是16,node數是100,這時候toProcess裡面也就是100個數,
                // 這時候就是16個“消費者”在消耗100個數。當然每拿到一個數需要執行到一次下面的default
				select {
				case <-stop:
					return
				default:
                    // 第piece個節點被doWorkPiece了;
                    // 對應呼叫過程也就是checkNode函式傳入了一個整型引數piece
					doWorkPiece(piece)
				}
			}
		}()
	}
	wg.Wait()
}

回想一下前面的checkNode := func(i int){……},上面的doWorkPiece(piece)也就是呼叫到了這裡的這個匿名函式func(i int){……};到這裡就清楚如何實現併發執行多個node的predicate過程了。

一個node的predicate

checkNode的主要邏輯就是上面介紹的併發加上下面這個podFitsOnNode()函式邏輯:

!FILENAME pkg/scheduler/core/generic_scheduler.go:425

fits, failedPredicates, err := podFitsOnNode(
				pod,
				meta,
				g.cachedNodeInfoMap[nodeName],
				g.predicates,
				nodeCache,
				g.schedulingQueue,
				g.alwaysCheckAllPredicates,
				equivClass,
			)

我們從podFitsOnNode()的函式定義入手:

!FILENAME pkg/scheduler/core/generic_scheduler.go:537

func podFitsOnNode(
	pod *v1.Pod,
	meta algorithm.PredicateMetadata,
	info *schedulercache.NodeInfo,
	predicateFuncs map[string]algorithm.FitPredicate,
	nodeCache *equivalence.NodeCache,
	queue internalqueue.SchedulingQueue,
	alwaysCheckAllPredicates bool,
	equivClass *equivalence.Class,
) (bool, []algorithm.PredicateFailureReason, error) 

關於這個函式的邏輯,註釋裡的描述翻譯過來大概是這個意思:

podFitsOnNode()函式檢查一個通過NodeInfo形式給定的node是否滿足指定的predicate functions. 對於給定的一個Pod,podFitsOnNode()函式會檢查是否有某個“等價的pod”存在,然後重用那個等價pod快取的predicate結果。
這個函式的呼叫入口有2處: Schedule and Preempt.

  1. 當從Schedule進入時:這個函式想要測試node上所有已經存在的pod外加被指定將要排程到這個node上的其他所有高優先順序(優先順序不比自己低,也就是>=)的pod後,當前pod是否可以被排程到這個node上。
  2. 當從Preempt進入時:後面講preempt時再詳細分析。

podFitsOnNode()函式的引數有點多,每個跟進去就是一堆知識點。這裡建議大家從字面先過一邊,然後跟進去看一下型別定義,型別的註釋等,瞭解一下功能,先不深究。整體看完一邊排程器程式碼後回過頭深入細節。

我們一起看一下其中這個引數:predicateFuncs map[string]algorithm.FitPredicate;這裡的predicateFuncs是一個map,表示所有的predicate函式。這個map的key是個字串,也就是某種形式的name了;value型別跟進去看一下:

!FILENAME pkg/scheduler/algorithm/types.go:36

// FitPredicate is a function that indicates if a pod fits into an existing node.
// The failure information is given by the error.
type FitPredicate func(pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []PredicateFailureReason, error)

FitPredicate是一個函式型別,3個引數,pod和node都很好理解,meta跟進去簡單看一下可以發現定義的是一些和predicate相關的一些元資料,這些資料是根據pod和node資訊獲取到的,類似pod的埠有哪些,pod親和的pod列表等。返回值是一個表示是否fit的bool值,predicate失敗的原因列表,一個錯誤型別。

也就是說,FitPredicate這個函式型別也就是前面一直說的predicate functions的真面目了。下面看podFitsOnNode()函式的具體邏輯吧:

!FILENAME pkg/scheduler/core/generic_scheduler.go:537

func podFitsOnNode(
	pod *v1.Pod,
	meta algorithm.PredicateMetadata,
	info *schedulercache.NodeInfo,
	predicateFuncs map[string]algorithm.FitPredicate,
	nodeCache *equivalence.NodeCache,
	queue internalqueue.SchedulingQueue,
	alwaysCheckAllPredicates bool,
	equivClass *equivalence.Class,
) (bool, []algorithm.PredicateFailureReason, error) {
	podsAdded := false
	for i := 0; i < 2; i++ {
		metaToUse := meta
		nodeInfoToUse := info
		if i == 0 {
			podsAdded, metaToUse, nodeInfoToUse = addNominatedPods(pod, meta, info, queue)
		} else if !podsAdded || len(failedPredicates) != 0 {
			break
		}
		eCacheAvailable = equivClass != nil && nodeCache != nil && !podsAdded
		// 這裡省略一個for迴圈,下面會單獨講
	}

	return len(failedPredicates) == 0, failedPredicates, nil
}

這裡的邏輯是從一個for迴圈開始的,關於這個2次迴圈的含義程式碼裡有很長的一段註釋,我們先看一下注釋裡怎麼說的(這裡可以多看幾遍體會一下):

引用連結:

gitbook:https://farmer-hutao.github.io/k8s-source-code-analysis/
github:https://hub.fastgit.org/daniel-hutao/k8s-source-code-analysis

  • 出於某些原因考慮我們需要執行兩次predicate. 如果node上有更高或者相同優先順序的“指定pods”(這裡的“指定pods”指的是通過schedule計算後指定要跑在一個node上但是還未真正執行到那個node上的pods),我們將這些pods加入到meta和nodeInfo後執行一次計算過程。
  • 如果這個過程所有的predicates都成功了,我們再假設這些“指定pods”不會跑到node上再執行一次。第二次計算是必須的,因為有一些predicates比如pod親和性,也許在“指定pods”沒有成功跑到node的情況下會不滿足。
  • 如果沒有“指定pods”或者第一次計算過程失敗了,那麼第二次計算不會進行。
  • 我們在第一次排程的時候只考慮相等或者更高優先順序的pods,因為這些pod是當前pod必須“臣服”的,也就是說不能夠從這些pod中搶到資源,這些pod不會被當前pod“搶佔”;這樣當前pod也就能夠安心從低優先順序的pod手裡搶資源了。
  • 新pod在上述2種情況下都可排程基於一個保守的假設:資源和pod反親和性等的predicate在“指定pods”被處理為Running時更容易失敗;pod親和性在“指定pods”被處理為Not Running時更加容易失敗。
  • 我們不能假設“指定pods”是Running的因為它們當前還沒有執行,而且事實上,它們確實有可能最終又被排程到其他node上了。

看了這個註釋後,上面程式碼裡的前幾行就很好理解了,在第一次進入迴圈體和第二次進入時做了不同的處理,具體怎麼做的處理我們暫時不關注。下面看省略的這個for迴圈做了啥:

!FILENAME pkg/scheduler/core/generic_scheduler.go:583

// predicates.Ordering()得到的是一個[]string,predicate名字集合
for predicateID, predicateKey := range predicates.Ordering() {
	var (
		fit     bool
		reasons []algorithm.PredicateFailureReason
		err     error
	)
	// 如果predicateFuncs有這個key,則呼叫這個predicate;也就是說predicateFuncs如果定義了一堆亂七八遭的名字,會被忽略調,因為predicateKey是內建的。
	if predicate, exist := predicateFuncs[predicateKey]; exist {
        // 降低難度,先不看快取情況。
		if eCacheAvailable {
			fit, reasons, err = nodeCache.RunPredicate(predicate, predicateKey, predicateID, pod, metaToUse, nodeInfoToUse, equivClass)
		} else {
            // 真正呼叫predicate函數了!!!!!!!!!
			fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
		}
		if err != nil {
			return false, []algorithm.PredicateFailureReason{}, err
		}
		if !fit {
			// ……
		}
	}
}

如上,我們看一下2個地方:

  1. predicates.Ordering()
  2. fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)

分兩個小節吧~

predicates的順序

!FILENAME pkg/scheduler/algorithm/predicates/predicates.go:130

var (
   predicatesOrdering = []string{
       CheckNodeConditionPred, 
       CheckNodeUnschedulablePred,
       GeneralPred, 
       HostNamePred, 
       PodFitsHostPortsPred,
       MatchNodeSelectorPred, 
       PodFitsResourcesPred, 
       NoDiskConflictPred,
       PodToleratesNodeTaintsPred, 
       PodToleratesNodeNoExecuteTaintsPred, 
       CheckNodeLabelPresencePred,
       CheckServiceAffinityPred, 
       MaxEBSVolumeCountPred, 
       MaxGCEPDVolumeCountPred, 
       MaxCSIVolumeCountPred,
       MaxAzureDiskVolumeCountPred, 
       CheckVolumeBindingPred, 
       NoVolumeZoneConflictPred,
       CheckNodeMemoryPressurePred, 
       CheckNodePIDPressurePred, 
       CheckNodeDiskPressurePred, 
       MatchInterPodAffinityPred}
)

如上,這裡定義了一個次序,前面的for迴圈遍歷的是這個[]string,這樣也就實現了不管predicateFuncs裡定義了怎樣的順序,影響不了predicate的實際呼叫順序。官網對於這個順序有這樣一個表格解釋:

Position Predicate comments (note, justification...)
1 CheckNodeConditionPredicate we really don’t want to check predicates against unschedulable nodes.
2 PodFitsHost we check the pod.spec.nodeName.
3 PodFitsHostPorts we check ports asked on the spec.
4 PodMatchNodeSelector check node label after narrowing search.
5 PodFitsResources this one comes here since it’s not restrictive enough as we do not try to match values but ranges.
6 NoDiskConflict Following the resource predicate, we check disk
7 PodToleratesNodeTaints check toleration here, as node might have toleration
8 PodToleratesNodeNoExecuteTaints check toleration here, as node might have toleration
9 CheckNodeLabelPresence labels are easy to check, so this one goes before
10 checkServiceAffinity -
11 MaxPDVolumeCountPredicate -
12 VolumeNodePredicate -
13 VolumeZonePredicate -
14 CheckNodeMemoryPressurePredicate doesn’t happen often
15 CheckNodeDiskPressurePredicate doesn’t happen often
16 InterPodAffinityMatches Most expensive predicate to compute

這個表格大家對著字面意思體會一下吧,基本還是可以聯想到意義的。

當然這個順序是可以被配置檔案覆蓋的,使用者可以使用類似這樣的配置:

{
"kind" : "Policy",
"apiVersion" : "v1",
"predicates" : [
	{"name" : "PodFitsHostPorts", "order": 2},
	{"name" : "PodFitsResources", "order": 3},
	{"name" : "NoDiskConflict", "order": 5},
	{"name" : "PodToleratesNodeTaints", "order": 4},
	{"name" : "MatchNodeSelector", "order": 6},
	{"name" : "PodFitsHost", "order": 1}
	],
"priorities" : [
	{"name" : "LeastRequestedPriority", "weight" : 1},
	{"name" : "BalancedResourceAllocation", "weight" : 1},
	{"name" : "ServiceSpreadingPriority", "weight" : 1},
	{"name" : "EqualPriority", "weight" : 1}
	],
"hardPodAffinitySymmetricWeight" : 10
}

整體過完原始碼後我們再實際嘗試一下這些特性,這一邊先知道有這回事吧,ok,繼續~

單個predicate執行過程

fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)

這行程式碼其實沒有啥複雜邏輯,不過我們還是重複講一下,清晰理解這一行很有必要。這裡的predicate()來自前幾行的if語句predicate, exist := predicateFuncs[predicateKey],往前跟也就是FitPredicate型別,我們前面提過,型別定義在pkg/scheduler/algorithm/types.go:36,這個型別表示的是一個具體的predicate函式,這裡使用predicate()也就是一個函式呼叫的語法,很和諧了。

具體的predicate函式

一直在講predicate,那麼predicate函式到底長什麼樣子呢,我們從具體的實現函式找一個看一下。開始講design的時候提到過predicate的實現在pkg/scheduler/algorithm/predicates/predicates.go檔案中,先看一眼Structure吧:

這個檔案中predicate函式有點多,這樣看眼花,我們具體點開一個觀察一下:

!FILENAME pkg/scheduler/algorithm/predicates/predicates.go:277

func NoDiskConflict(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
	for _, v := range pod.Spec.Volumes {
		for _, ev := range nodeInfo.Pods() {
			if isVolumeConflict(v, ev) {
				return false, []algorithm.PredicateFailureReason{ErrDiskConflict}, nil
			}
		}
	}
	return true, nil, nil
}

我們知道predicate函式的特點,這樣就很好在這個一千六百多行go檔案中尋找predicate函數了。像上面這個NoDiskConflict()函式,引數是pod、meta和nodeinfo,很明顯是FitPredicate型別的,標準的predicate函式。

這個函式的實現也特別簡單,遍歷pod的Volumes,然後對於pod的每一個Volume,遍歷node上的每個pod,看是否和當前podVolume衝突。如果不fit就返回false加原因;如果fit就返回true,很清晰。