1. 程式人生 > >圖解kubernetes排程器預選設計實現學習

圖解kubernetes排程器預選設計實現學習

Scheduler中在進行node選舉的時候會首先進行一輪預選流程,即從當前叢集中選擇一批node節點,本文主要分析k8s在預選流程上一些優秀的篩選設計思想,歡迎大佬們指正

1. 基礎設計

1.1 預選場景

預選顧名思義就是從當前叢集中的所有的node中,選擇出滿足當前pod資源和親和性等需求的node節點,如何在叢集中快速選擇這樣的節點,是個複雜的問題

1.2 平均分佈

平均分佈主要是通過讓一個分配索引來進行即只有當所有的node都在本輪分配週期內分配一次後,才開始從頭進行分配,從而保證叢集的平均分佈

1.3 預選中斷

預選終端即在預選的過程中如果發現node已經不能滿足當前pod資源需求的時候,就進行中斷預選流程,嘗試下一個節點

1.4 並行篩選

在當前k8s版本中,預設會啟動16個goroutine來進行並行的預選,從而提高效能,從而提高預選的效能

1.5 區域性最優解

預選流程需要從當前叢集中選擇一臺符合要求的node隨著叢集規模的增長,如果每次遍歷所有叢集node則會必然導致效能的下降,於是通過區域性最優解的方式,縮小篩選節點的數量

2. 原始碼分析

預選的核心流程是通過findNodesThatFit來完成,其返回預選結果供優選流程使用

2.1 取樣邏輯

取樣是通過當前叢集中的node數量和預設的最小值來決定本次預選階段需要獲取的node節點數量

        // 獲取所有的節點數量,並通過計算百分比,獲取本次選舉選擇的節點數量
        allNodes := int32(g.cache.NodeTree().NumNodes())
        // 確定要查詢node數量
        numNodesToFind := g.numFeasibleNodesToFind(allNodes)

2.2 取樣演算法

取樣演算法很簡單從叢集中獲取指定百分比的節點預設是50%,如果50%的節點數量小於minFeasibleNodesToFind則按照minFeasibleNodesToFind(最小取樣節點數量)來取樣,

func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes int32) {
    // 如果當前節點數量小於minFeasibleNodesToFind即小於100臺node
    // 同理百分比如果大於100就是全量取樣
    // 這兩種情況都直接遍歷整個叢集中所有節點
    if numAllNodes < minFeasibleNodesToFind || g.percentageOfNodesToScore >= 100 {
        return numAllNodes
    }

    adaptivePercentage := g.percentageOfNodesToScore
    if adaptivePercentage <= 0 {
        adaptivePercentage = schedulerapi.DefaultPercentageOfNodesToScore - numAllNodes/125
        if adaptivePercentage < minFeasibleNodesPercentageToFind {
            adaptivePercentage = minFeasibleNodesPercentageToFind
        }
    }

    // 正常取樣計算:比如numAllNodes為5000,而adaptivePercentage為50%
    // 則numNodes=50000*0.5/100=250
    numNodes = numAllNodes * adaptivePercentage / 100
    if numNodes < minFeasibleNodesToFind { // 如果小於最少取樣則按照最少取樣進行取樣
        return minFeasibleNodesToFind
    }

    return numNodes
}

2.3 取樣元資料準備

通過filtered來進行預選結果的儲存,通過filteredLen來進行原子保護協作多個取樣goroutine, 並通過predicateMetaProducer和當前的snapshot來進行元資料構建

        filtered = make([]*v1.Node, numNodesToFind)
        errs := errors.MessageCountMap{}
        var (
            predicateResultLock sync.Mutex
            filteredLen         int32
        )

        ctx, cancel := context.WithCancel(context.Background())

        // We can use the same metadata producer for all nodes.
        meta := g.predicateMetaProducer(pod, g.nodeInfoSnapshot.NodeInfoMap)

2.4 通過channel協作並行取樣

並行取樣主要通過呼叫下面的函式來啟動16個goroutine來進行並行取樣,並通過ctx來協調退出

workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)


通過channel來構建取樣索引的管道,每個worker會負責從channel獲取的指定索引取樣node的填充

func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc) {
    var stop <-chan struct{}
    if ctx != nil {
        stop = ctx.Done()
    }

    // 生成指定數量索引,worker通過索引來進行預選成功節點的儲存
    toProcess := make(chan int, pieces)
    for i := 0; i < pieces; i++ {
        toProcess <- i
    }
    close(toProcess)

    if pieces < workers {
        workers = pieces
    }

    wg := sync.WaitGroup{}
    wg.Add(workers)
    for i := 0; i < workers; i++ {
        // 啟動多個goroutine
        go func() {
            defer utilruntime.HandleCrash()
            defer wg.Done()
            for piece := range toProcess {
                select {
                case <-stop:
                    return
                default:
                    //獲取索引,後續會通過該索引來進行結果的儲存
                    doWorkPiece(piece)
                }
            }
        }()
    }
    // 等待退出
    wg.Wait()
}

2.5 取樣並行函式

        checkNode := func(i int) {
            // 獲取一個節點
            nodeName := g.cache.NodeTree().Next()

            // 取樣核心流程是通過podFitsOnNode來確定
            fits, failedPredicates, status, err := g.podFitsOnNode(
                pluginContext,
                pod,
                meta,
                g.nodeInfoSnapshot.NodeInfoMap[nodeName],
                g.predicates, // 傳遞預選演算法
                g.schedulingQueue,
                g.alwaysCheckAllPredicates,
            )
            if err != nil {
                predicateResultLock.Lock()
                errs[err.Error()]++
                predicateResultLock.Unlock()
                return
            }
            if fits {
                // 如果當前以及查詢到的數量大於預選的數量,就退出
                length := atomic.AddInt32(&filteredLen, 1)
                if length > numNodesToFind {
                    cancel()
                    atomic.AddInt32(&filteredLen, -1)
                } else {
                    filtered[length-1] = g.nodeInfoSnapshot.NodeInfoMap[nodeName].Node()
                }
            } else {
                // 進行錯誤狀態的儲存 
                predicateResultLock.Lock()
                if !status.IsSuccess() {
                    filteredNodesStatuses[nodeName] = status
                }
                if len(failedPredicates) != 0 {
                    failedPredicateMap[nodeName] = failedPredicates
                }
                predicateResultLock.Unlock()
            }
        }

2.6 面向未來的篩選


在kubernetes中經過排程器排程後的pod結果會放入到SchedulingQueue中進行暫存,這些pod未來可能會經過後續排程流程執行在提議的node上,也可能因為某些原因導致最終沒有執行,而預選流程為了減少後續因為排程衝突(比如pod之間的親和性等問題,並且當前pod不能搶佔這些pod),則會在進行預選的時候,將這部分pod考慮進去

如果在這些pod存在的情況下,node可以滿足當前pod的篩選條件,則可以去除被提議的pod再進行篩選(如果這些提議的pod最終沒有排程到node,則當前node也需要滿足各種親和性的需求)

2.6 取樣核心設計


結合上面說的面向未來的篩選,通過兩輪篩選在無論那些優先順序高的pod是否被排程到當前node上,都可以滿足pod的排程需求,在排程的流程中只需要獲取之前註冊的排程演算法,完成預選檢測,如果發現有條件不通過則不會進行第二輪篩選,繼續選擇下一個節點

func (g *genericScheduler) podFitsOnNode(
    pluginContext *framework.PluginContext,
    pod *v1.Pod,
    meta predicates.PredicateMetadata,
    info *schedulernodeinfo.NodeInfo,
    predicateFuncs map[string]predicates.FitPredicate,
    queue internalqueue.SchedulingQueue,
    alwaysCheckAllPredicates bool,
) (bool, []predicates.PredicateFailureReason, *framework.Status, error) {
    var failedPredicates []predicates.PredicateFailureReason
    var status *framework.Status

    // podsAdded主要用於標識當前是否有提議的pod如果沒有提議的pod則就不需要再進行一輪篩選了
    podsAdded := false
    
    for i := 0; i < 2; i++ {
        metaToUse := meta
        nodeInfoToUse := info
        if i == 0 {
            // 首先獲取那些提議的pod進行第一輪篩選, 如果第一輪篩選出錯,則不會進行第二輪篩選
            podsAdded, metaToUse, nodeInfoToUse = addNominatedPods(pod, meta, info, queue)
        } else if !podsAdded || len(failedPredicates) != 0 {
            // 如果
            break
        }
        for _, predicateKey := range predicates.Ordering() {
            var (
                fit     bool
                reasons []predicates.PredicateFailureReason
                err     error
            )
            //TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metric
            if predicate, exist := predicateFuncs[predicateKey]; exist {
                // 預選演算法計算
                fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
                if err != nil {
                    return false, []predicates.PredicateFailureReason{}, nil, err
                }

                if !fit {
                    // eCache is available and valid, and predicates result is unfit, record the fail reasons
                    failedPredicates = append(failedPredicates, reasons...)
                    // if alwaysCheckAllPredicates is false, short circuit all predicates when one predicate fails.
                    if !alwaysCheckAllPredicates {
                        klog.V(5).Infoln("since alwaysCheckAllPredicates has not been set, the predicate " +
                            "evaluation is short circuited and there are chances " +
                            "of other predicates failing as well.")
                        break
                    }
                }
            }
        }

        status = g.framework.RunFilterPlugins(pluginContext, pod, info.Node().Name)
        if !status.IsSuccess() && !status.IsUnschedulable() {
            return false, failedPredicates, status, status.AsError()
        }
    }

    return len(failedPredicates) == 0 && status.IsSuccess(), failedPredicates, status, nil
}

微訊號:baxiaoshi2020
關注公告號閱讀更多原始碼分析文章
更多文章關注 www.sreguide.com
本文由部落格一文多發平臺 OpenWrite 釋出