1. 程式人生 > >圖解kubernetes排程器SchedulingQueue核心原始碼實現

圖解kubernetes排程器SchedulingQueue核心原始碼實現

SchedulingQueue是kubernetes scheduler中負責進行等待排程pod儲存的對,Scheduler通過SchedulingQueue來獲取當前系統中等待排程的Pod,本文主要討論SchedulingQueue的設計與實現的各種實現, 瞭解探究其內部實現與底層原始碼,本系列程式碼基於kubernets1.1.6分析而來,圖解主要位於第二部分

SchedulingQueue設計

佇列與優先順序

佇列與場景

型別 描述 通常實現
佇列 普通佇列是一個FIFO的資料結構,根據元素入隊的次序依次出隊 陣列或者連結串列
優先順序佇列 優先順序佇列通常是指根據某些優先順序策略,高優先順序會優先被獲取 陣列或者樹

其實在大多數的排程場景中,大多都是採用優先順序佇列來實現,優先滿足優先順序比較高的任務或者需求,從而減少後續高優先順序對低優先順序的搶佔,scheduler中也是如此

優先順序的選擇

k8s中排程的單元是Pod,scheduler中根據pod的優先順序的高低來進行優先順序佇列的構建, 這個其實是在kubernets的adminission准入外掛中,會為使用者建立的pod根據使用者的設定,進行優先順序欄位的計算

三級佇列

活動佇列

活動佇列儲存當前系統中所有正在等待排程的佇列

不可排程佇列

當pod的資源在當前叢集中不能被滿足時,則會被加入到一個不可排程佇列中,然後等待稍後再進行嘗試

backoff佇列

backoff機制是併發程式設計中常見的一種機制,即如果任務反覆執行依舊失敗,則會按次增長等待排程時間,降低重試效率,從而避免反覆失敗浪費排程資源

針對排程失敗的pod會優先儲存在backoff佇列中,等待後續重試

阻塞與搶佔

阻塞設計

當佇列中不存在等待排程的pod的時候,會阻塞scheduler等待有需要排程的pod的時候再喚醒排程器,獲取pod進行排程

搶佔相關

nominatedPods儲存pod被提議執行的node,主要用於搶佔排程流程中使用,本節先不分析

原始碼分析

資料結構

kubernetes中預設的schedulingQueue實現是PriorityQueue,本章就以該資料結構來分析

type PriorityQueue struct {
    stop  <-chan struct{}
    clock util.Clock
    // 儲存backoff的pod計時器
    podBackoff *PodBackoffMap

    lock sync.RWMutex
    // 用於協調通知因為獲取不到排程pod而阻塞的cond
    cond sync.Cond

    // 活動佇列
    activeQ *util.Heap
    
    // backoff佇列
    podBackoffQ *util.Heap
    
    // 不可排程佇列
    unschedulableQ *UnschedulablePodsMap
    // 儲存pod和被提名的node, 實際上就是儲存pod和建議的node節點
    nominatedPods *nominatedPodMap
    // schedulingCycle是一個排程週期的遞增序號,當pod pop的時候會遞增
    schedulingCycle int64
    // moveRequestCycle快取schedulingCycle, 當未排程的pod重新被新增到activeQueue中
    // 會儲存schedulingCycle到moveRequestCycle中
    moveRequestCycle int64
    closed bool
}

PriorityQueue作為實現SchedulingQueue的實現,其核心資料結構主要包含三個佇列:activeQ、podBackoffQ、unscheduleQ內部通過cond來實現Pop操作的阻塞與通知,接下來先分析核心的排程流程,最後再分析util.Heap裡面的具體實現

activeQ

儲存所有等待排程的Pod的佇列,預設是基於堆來實現,其中元素的優先順序則通過對比pod的建立時間和pod的優先順序來進行排序

    // activeQ is heap structure that scheduler actively looks at to find pods to
    // schedule. Head of heap is the highest priority pod.
    activeQ *util.Heap

優先順序比較函式

// activeQComp is the function used by the activeQ heap algorithm to sort pods.
// It sorts pods based on their priority. When priorities are equal, it uses
// PodInfo.timestamp.
func activeQComp(podInfo1, podInfo2 interface{}) bool {
    pInfo1 := podInfo1.(*framework.PodInfo)
    pInfo2 := podInfo2.(*framework.PodInfo)
    prio1 := util.GetPodPriority(pInfo1.Pod)
    prio2 := util.GetPodPriority(pInfo2.Pod)
    // 首先根據優先順序的高低進行比較,然後根據pod的建立時間,越高優先順序的Pod越被優先排程
    // 越早建立的pod越優先
    return (prio1 > prio2) || (prio1 == prio2 && pInfo1.Timestamp.Before(pInfo2.Timestamp))
}

podbackOffQ

podBackOffQ主要儲存那些在多個schedulingCycle中依舊排程失敗的情況下,則會通過之前說的backOff機制,延遲等待排程的時間

    // podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff
    // are popped from this heap before the scheduler looks at activeQ
    podBackoffQ *util.Heap

podBackOff

上面提到podBackOffQ佇列中並沒有儲存pod的backOff的具體資訊,比如backoff的計數器,最後一次更新的時間等,podBackOff則類似一個記分板,記錄這些資訊,供podBackOffQ使用

    // podBackoff tracks backoff for pods attempting to be rescheduled
    podBackoff *PodBackoffMap

// PodBackoffMap is a structure that stores backoff related information for pods
type PodBackoffMap struct {
    // lock for performing actions on this PodBackoffMap
    lock sync.RWMutex
    // initial backoff duration
    initialDuration time.Duration // 當前值是1秒
    // maximal backoff duration
    maxDuration time.Duration // 當前值是1分鐘
    // map for pod -> number of attempts for this pod
    podAttempts map[ktypes.NamespacedName]int
    // map for pod -> lastUpdateTime pod of this pod
    podLastUpdateTime map[ktypes.NamespacedName]time.Time
}

unschedulableQ

儲存已經嘗試排程但是當前叢集資源不滿足的pod的佇列

moveRequestCycle

當因為叢集資源發生變化會嘗試進行unschedulableQ中的pod轉移到activeQ,moveRequestCycle就是儲存資源變更時的schedulingCycle

func (p *PriorityQueue) MoveAllToActiveQueue() {
    // 省略其他程式碼
    p.moveRequestCycle = p.schedulingCycle
}

schedulingCycle

schedulingCycle是一個遞增的序列每次從activeQ中pop出一個pod都會遞增

func (p *PriorityQueue) Pop() (*v1.Pod, error) {
    //省略其他
        p.schedulingCycle++
}

併發活動佇列

併發從活動佇列中獲取pod


SchedulingQueue提供了一個Pop介面用於從獲取當前叢集中等待排程的pod,其內部實現主要通過上面cond與activeQ來實現

當前佇列中沒有可排程的pod的時候,則通過cond.Wait來進行阻塞,然後在忘activeQ中新增pod的時候通過cond.Broadcast來實現通知

func (p *PriorityQueue) Pop() (*v1.Pod, error) {
    p.lock.Lock()
    defer p.lock.Unlock()
    for p.activeQ.Len() == 0 {
        if p.closed {
            return nil, fmt.Errorf(queueClosed)
        }
        // 
        p.cond.Wait()
    }
    obj, err := p.activeQ.Pop()
    if err != nil {
        return nil, err
    }
    pInfo := obj.(*framework.PodInfo)
    p.schedulingCycle++
    return pInfo.Pod, err
}

加入排程pod到活動佇列


當pod加入活動佇列中,除了加入activeQ的優先順序佇列中,還需要從podBackoffQ和unschedulableQ中移除當前的pod,最後進行廣播通知阻塞在Pop操作的scheudler進行最新pod的獲取

func (p *PriorityQueue) Add(pod *v1.Pod) error {
    p.lock.Lock()
    defer p.lock.Unlock()
    pInfo := p.newPodInfo(pod)
    // 加入activeQ
    if err := p.activeQ.Add(pInfo); err != nil {
        klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err)
        return err
    }
    // 從unschedulableQ刪除
    if p.unschedulableQ.get(pod) != nil {
        klog.Errorf("Error: pod %v/%v is already in the unschedulable queue.", pod.Namespace, pod.Name)
        p.unschedulableQ.delete(pod)
    }
    // Delete pod from backoffQ if it is backing off
    // 從podBackoffQ刪除
    if err := p.podBackoffQ.Delete(pInfo); err == nil {
        klog.Errorf("Error: pod %v/%v is already in the podBackoff queue.", pod.Namespace, pod.Name)
    }
    // 儲存pod和被提名的node
    p.nominatedPods.add(pod, "")
    p.cond.Broadcast()

    return nil
}

schedulingCycle與moveRequestCycle

未排程的佇列的及時重試


導致排程週期schedulingCyclye變更主要因素如下:
1.當叢集資源發生變化的時候:比如新新增pv、node等資源,那之前在unschedulableQ中因為資源不滿足需求的pod就可以進行放入activeQ中或者podBackoffQ中,及時進行排程
2.pod被成功排程: 之前由於親和性不滿足被放入到unschedulableQ中的pod,此時也可以進行嘗試,而不必等到超時之後,再加入

這兩種情況下會分別觸發MoveAllToActiveQueue和movePodsToActiveQueue變更moveRequestCycle使其等於schedulingCycle

對重試機制的影響

當前一個pod失敗的時候,有兩種選擇一是加入podBackoffQ中,二是加入unschedulableQ中,那麼針對一個失敗的pod如何選擇該進入那個佇列中呢

結合上面的moveRequestCycle變更時機,什麼時候moveRequestCycle會大於等於podSchedulingCycle呢?答案就是當前叢集中進行過叢集資源的變更或者pod被成功分配,那這個時候我們如果重試一個失敗的排程則可能會成功,因為叢集資源變更了可能有新的資源加入

    if p.moveRequestCycle >= podSchedulingCycle {
        if err := p.podBackoffQ.Add(pInfo); err != nil {
            return fmt.Errorf("error adding pod %v to the backoff queue: %v", pod.Name, err)
        }
    } else {
        p.unschedulableQ.addOrUpdate(pInfo)
    }

失敗處理邏輯的注入

注入排程失敗邏輯處理

在建立scheduler Config的時候會通過MakeDefaultErrorFunc注入一個失敗處理函式, 在scheduler排程的時候會進行呼叫
kubernetes/pkg/scheduler/factory/factory.go: MakeDefaultErrorFunc會將沒有排程到任何一個node的pod重新放回到優先順序佇列中

    podSchedulingCycle := podQueue.SchedulingCycle()
    // 省略非核心程式碼
    if len(pod.Spec.NodeName) == 0 {
        //重新放回佇列
        if err := podQueue.AddUnschedulableIfNotPresent(pod, podSchedulingCycle); err != nil {
            klog.Error(err)
        }
    }

失敗處理的回撥

當排程pod的失敗的時候, scheduler會同時呼叫sched.Error就是上面注入的失敗處理邏輯,來將排程失敗未分配node的pod節點重新加入到隊裡鍾
kubernetes/pkg/scheduler/scheduler.go

func (sched *Scheduler) recordSchedulingFailure(pod *v1.Pod, err error, reason string, message string) {
    // 錯誤回撥
    sched.Error(pod, err)
    sched.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", message)
    if err := sched.PodConditionUpdater.Update(pod, &v1.PodCondition{
        Type:    v1.PodScheduled,
        Status:  v1.ConditionFalse,
        Reason:  reason,
        Message: err.Error(),
    }); err != nil {
        klog.Errorf("Error updating the condition of the pod %s/%s: %v", pod.Namespace, pod.Name, err)
    }
}

PodBackoffMap

PodBackoffMap主要用於儲存pod的最後一次失敗的更新時間與實現次數,從而根據這些資料來進行pod的backoffTime的計算

資料結構設計

type PodBackoffMap struct {
    // lock for performing actions on this PodBackoffMap
    lock sync.RWMutex
    // 初始化 backoff duration
    initialDuration time.Duration // 當前值是1秒
    // 最大 backoff duration
    maxDuration time.Duration // 當前值是1分鐘
    // 記錄pod重試的次數
    podAttempts map[ktypes.NamespacedName]int
    // 記錄pod的最後一次的更新時間
    podLastUpdateTime map[ktypes.NamespacedName]time.Time
}

backoffTime計算演算法

初始化的時候回設定initialDuration和maxDuration,在當前版本中分別是1s和10s,也就是backoffQ中的pod最長10s就會重新加入activeQ中(需要等待定時任務進行輔助)

在每次失敗回撥的時候,都會進行BackoffPod方法來進行計數更新,在後續獲取pod的backoffTime的時候,只需要獲取次數然後結合initialDuration進行演算法計算,結合pod最後一次的更新時間,就會獲取pod的backoffTime的終止時間

backoffDuration計算

其實最終的計算很簡單就是2的N次冪

func (pbm *PodBackoffMap) calculateBackoffDuration(nsPod ktypes.NamespacedName) time.Duration {
    // initialDuration是1s
    backoffDuration := pbm.initialDuration
    if _, found := pbm.podAttempts[nsPod]; found {
        // podAttempts裡面包含pod的嘗試失敗的次數
        for i := 1; i < pbm.podAttempts[nsPod]; i++ {
            backoffDuration = backoffDuration * 2
            // 最大10s
            if backoffDuration > pbm.maxDuration {
                return pbm.maxDuration
            }
        }
    }
    return backoffDuration
}

podBackoffQ

優先順序函式

podBackoffQ實際上會根據pod的backoffTime來進行優先順序排序,所以podBackoffQ的佇列頭部,就是最近一個要過期的pod

func (p *PriorityQueue) podsCompareBackoffCompleted(podInfo1, podInfo2 interface{}) bool {
    pInfo1 := podInfo1.(*framework.PodInfo)
    pInfo2 := podInfo2.(*framework.PodInfo)
    bo1, _ := p.podBackoff.GetBackoffTime(nsNameForPod(pInfo1.Pod))
    bo2, _ := p.podBackoff.GetBackoffTime(nsNameForPod(pInfo2.Pod))
    return bo1.Before(bo2)
}

排程失敗加入到podBackoffQ

如果排程失敗,並且moveRequestCycle=podSchedulingCycle的時候就加入podBackfoffQ中

func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error {
    // 省略檢查性程式碼
    // 更新pod的backoff 資訊
    p.backoffPod(pod)

    // moveRequestCycle將pod從unscheduledQ大於pod的排程週期新增到 如果pod的排程週期小於當前的排程週期
    if p.moveRequestCycle >= podSchedulingCycle {
        if err := p.podBackoffQ.Add(pInfo); err != nil {
            return fmt.Errorf("error adding pod %v to the backoff queue: %v", pod.Name, err)
        }
    } else {
        p.unschedulableQ.addOrUpdate(pInfo)
    }

    p.nominatedPods.add(pod, "")
    return nil

}

從unschedulableQ遷移

在前面介紹的當叢集資源發生變更的時候,會觸發嘗試unschedulabelQ中的pod進行轉移,如果發現當前pod還未到達backoffTime,就加入到podBackoffQ中

        if p.isPodBackingOff(pod) {
            if err := p.podBackoffQ.Add(pInfo); err != nil {
                klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err)
                addErrorPods = append(addErrorPods, pInfo)
            }
        } else {
            if err := p.activeQ.Add(pInfo); err != nil {
                klog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err)
                addErrorPods = append(addErrorPods, pInfo)
            }
        }

podBackoffQ定時轉移

在建立PriorityQueue的時候,會建立兩個定時任務其中一個就是講backoffQ中的pod到期後的轉移,每秒鐘嘗試一次

func (p *PriorityQueue) run() {
    go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
    go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop)
}

因為是一個堆結果,所以只需要獲取堆頂的元素,然後確定是否到期,如果到期後則進行pop處來,加入到activeQ中

func (p *PriorityQueue) flushBackoffQCompleted() {
    p.lock.Lock()
    defer p.lock.Unlock()

    for {
        // 獲取堆頂元素
        rawPodInfo := p.podBackoffQ.Peek()
        if rawPodInfo == nil {
            return
        }
        pod := rawPodInfo.(*framework.PodInfo).Pod
        // 獲取到期時間
        boTime, found := p.podBackoff.GetBackoffTime(nsNameForPod(pod))
        if !found {
            // 如果當前已經不在podBackoff中,則就pop出來然後放入到activeQ
            klog.Errorf("Unable to find backoff value for pod %v in backoffQ", nsNameForPod(pod))
            p.podBackoffQ.Pop()
            p.activeQ.Add(rawPodInfo)
            defer p.cond.Broadcast()
            continue
        }

        // 未超時
        if boTime.After(p.clock.Now()) {
            return
        }
        // 超時就pop出來
        _, err := p.podBackoffQ.Pop()
        if err != nil {
            klog.Errorf("Unable to pop pod %v from backoffQ despite backoff completion.", nsNameForPod(pod))
            return
        }
        // 加入到activeQ中
        p.activeQ.Add(rawPodInfo)
        defer p.cond.Broadcast()
    }
}

unschedulableQ

排程失敗

排程失敗後,如果當前叢集資源沒有發生變更,就加入到unschedulable,原因上面說過

func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error {
    // 省略檢查性程式碼
    // 更新pod的backoff 資訊
    p.backoffPod(pod)

    // moveRequestCycle將pod從unscheduledQ大於pod的排程週期新增到 如果pod的排程週期小於當前的排程週期
    if p.moveRequestCycle >= podSchedulingCycle {
        if err := p.podBackoffQ.Add(pInfo); err != nil {
            return fmt.Errorf("error adding pod %v to the backoff queue: %v", pod.Name, err)
        }
    } else {
        p.unschedulableQ.addOrUpdate(pInfo)
    }

    p.nominatedPods.add(pod, "")
    return nil

}

定時轉移任務

定時任務每30秒執行一次

func (p *PriorityQueue) run() {
    go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop)
}

邏輯其實就非常簡單如果當前時間-pod的最後排程時間大於60s,就重新排程,轉移到podBackoffQ或者activeQ中

func (p *PriorityQueue) flushUnschedulableQLeftover() {
    p.lock.Lock()
    defer p.lock.Unlock()

    var podsToMove []*framework.PodInfo
    currentTime := p.clock.Now()
    for _, pInfo := range p.unschedulableQ.podInfoMap {
        lastScheduleTime := pInfo.Timestamp
        // 如果該pod1分鐘內沒有被排程就加入到podsToMove
        if currentTime.Sub(lastScheduleTime) > unschedulableQTimeInterval {
            podsToMove = append(podsToMove, pInfo)
        }
    }

    if len(podsToMove) > 0 {
        // podsToMove將這些pod移動到activeQ
        p.movePodsToActiveQueue(podsToMove)
    }
}

排程佇列總結

資料流設計總結

3.1.1 三佇列與後臺定時任務

從設計上三佇列分別儲存:活動佇列、bakcoff佇列、不可排程佇列,其中backoff中會根據任務的失敗來逐步遞增重試時間(最長10s)、unschedulableQ佇列則延遲60s

通過後臺定時任務分別將backoffQ佇列、unschedulableQ佇列來進行重試,加入到activeQ中,從而加快完成pod的失敗重試排程

cycle與優先排程

schedulingCycle、moveRequestCycle兩個cycle其實本質上也是為了加快失敗任務的重試排程,當叢集資源發生變化的時候,進行立即重試,那些失敗的優先順序比較高、親和性問題的pod都可能會被優先排程

鎖與cond實現執行緒安全pop

內部通過lock保證執行緒安全,並通過cond來實現阻塞等待,從而實現阻塞scheduler worker的通知

今天就分析到這裡,其實參考這個實現,我們也可以從中抽象出一些設計思想,實現自己的一個具有優先順序、快速重試、高可用的任務佇列,先分析到這,下一個分析的元件是SchedulerCache, 感興趣可以加我微信一起交流學習,畢竟三個臭皮匠算計不過諸葛亮

微訊號:baxiaoshi2020
關注公告號閱讀更多原始碼分析文章
更多文章關注 www.sreguide.com
本文由部落格一文多發平臺 OpenWrite 釋出