[原始碼分析-kubernetes]7. 搶佔排程
搶佔排程
Pod priority
Pod 有了 priority(優先順序) 後才有優先順序排程、搶佔排程的說法,高優先順序的 pod 可以在排程佇列中排到前面,優先選擇 node;另外當高優先順序的 pod 找不到合適的 node 時,就會看 node 上低優先順序的 pod 驅逐之後是否能夠 run 起來,如果可以,那麼 node 上的一個或多個低優先順序的 pod 會被驅逐,然後高優先順序的 pod 得以成功執行1個 node 上。
今天我們分析 pod 搶佔相關的程式碼。開始之前我們看一下和 priority 相關的2個示例配置檔案:
PriorityClass 例子
apiVersion: scheduling.k8s.io/v1 kind: PriorityClass metadata: name: high-priority value: 1000000 globalDefault: false description: "This priority class should be used for XYZ service pods only."
使用上述 PriorityClass
apiVersion: v1
kind: Pod
metadata:
name: nginx
labels:
env: test
spec:
containers:
- name: nginx
image: nginx
imagePullPolicy: IfNotPresent
priorityClassName: high-priority
這兩個檔案的內容這裡不解釋,Pod priority 相關知識點不熟悉的小夥伴請先查閱官方文件,我們下面看排程器中和 preempt 相關的程式碼邏輯。
preempt 入口
在pkg/scheduler/scheduler.go:513 scheduleOne()
方法中我們上一次關注的是suggestedHost, err := sched.schedule(pod)
這行程式碼,也就是關注通常情況下排程器如何給一個 pod 匹配一個最合適的 node. 今天我們來看如果這一行程式碼返回的 err != nil
情況下,如何開始 preempt 過程。
!FILENAME pkg/scheduler/scheduler.go:529
suggestedHost, err := sched.schedule(pod) if err != nil { if fitError, ok := err.(*core.FitError); ok { preemptionStartTime := time.Now() sched.preempt(pod, fitError) metrics.PreemptionAttempts.Inc() } else { klog.Errorf("error selecting node for pod: %v", err) metrics.PodScheduleErrors.Inc() } return }
當schedule()
函式沒有返回 host,也就是沒有找到合適的 node 的時候,就會出發 preempt 過程。這時候程式碼邏輯進入sched.preempt(pod, fitError)
這一行。我們先看一下這個函式的整體邏輯,然後深入其中涉及的子過程:
!FILENAME pkg/scheduler/scheduler.go:311
func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, error) {
// 特性沒有開啟就返回 ""
if !util.PodPriorityEnabled() || sched.config.DisablePreemption {
return "", nil
}
// 更新 pod 資訊;入參和返回值都是 *v1.Pod 型別
preemptor, err := sched.config.PodPreemptor.GetUpdatedPod(preemptor)
// preempt 過程,下文分析
node, victims, nominatedPodsToClear, err := sched.config.Algorithm.Preempt(preemptor, sched.config.NodeLister, scheduleErr)
var nodeName = ""
if node != nil {
nodeName = node.Name
// 更新佇列中“任命pod”佇列
sched.config.SchedulingQueue.UpdateNominatedPodForNode(preemptor, nodeName)
// 設定pod的Status.NominatedNodeName
err = sched.config.PodPreemptor.SetNominatedNodeName(preemptor, nodeName)
if err != nil {
// 如果出錯就從 queue 中移除
sched.config.SchedulingQueue.DeleteNominatedPodIfExists(preemptor)
return "", err
}
for _, victim := range victims {
// 將要驅逐的 pod 驅逐
if err := sched.config.PodPreemptor.DeletePod(victim); err != nil {
return "", err
}
sched.config.Recorder.Eventf(victim, v1.EventTypeNormal, "Preempted", "by %v/%v on node %v", preemptor.Namespace, preemptor.Name, nodeName)
}
}
// Clearing nominated pods should happen outside of "if node != nil".
// 這個清理過程在上面的if外部,我們回頭從 Preempt() 的實現去理解
for _, p := range nominatedPodsToClear {
rErr := sched.config.PodPreemptor.RemoveNominatedNodeName(p)
if rErr != nil {
klog.Errorf("Cannot remove nominated node annotation of pod: %v", rErr)
// We do not return as this error is not critical.
}
}
return nodeName, err
}
preempt 實現
上面 preempt()
函式中涉及到了一些值得深入看看的物件,下面我們逐個看一下這些物件的實現。
SchedulingQueue
SchedulingQueue 表示的是一個儲存待排程 pod 的佇列
!FILENAME pkg/scheduler/internal/queue/scheduling_queue.go:60
type SchedulingQueue interface {
Add(pod *v1.Pod) error
AddIfNotPresent(pod *v1.Pod) error
AddUnschedulableIfNotPresent(pod *v1.Pod) error
Pop() (*v1.Pod, error)
Update(oldPod, newPod *v1.Pod) error
Delete(pod *v1.Pod) error
MoveAllToActiveQueue()
AssignedPodAdded(pod *v1.Pod)
AssignedPodUpdated(pod *v1.Pod)
NominatedPodsForNode(nodeName string) []*v1.Pod
WaitingPods() []*v1.Pod
Close()
UpdateNominatedPodForNode(pod *v1.Pod, nodeName string)
DeleteNominatedPodIfExists(pod *v1.Pod)
NumUnschedulablePods() int
}
在 Scheduler 中 SchedulingQueue 介面對應兩種實現:
- FIFO 先進先出佇列
- PriorityQueue 優先順序佇列
FIFO
FIFO 結構是對 cache.FIFO 的簡單包裝,然後實現了 SchedulingQueue 介面。
!FILENAME pkg/scheduler/internal/queue/scheduling_queue.go:97
type FIFO struct {
*cache.FIFO
}
cache.FIFO定義在vendor/k8s.io/client-go/tools/cache/fifo.go:93
,這個先進先出佇列的細節先不討論。
PriorityQueue
PriorityQueue 同樣實現了 SchedulingQueue 介面,PriorityQueue 的頂是最高優先順序的 pending pod. 這裡的PriorityQueue 有2個子 queue,activeQ 放的是等待排程的 pod,unschedulableQ 放的是已經嘗試過排程,然後失敗了,被標記為 unschedulable 的 pod.
我們看一下 PriorityQueue 結構的定義:
!FILENAME pkg/scheduler/internal/queue/scheduling_queue.go:201
type PriorityQueue struct {
stop <-chan struct{}
clock util.Clock
lock sync.RWMutex
cond sync.Cond
// heap 頭節點存的是最高優先順序的 pod
activeQ *Heap
// unschedulableQ holds pods that have been tried and determined unschedulable.
unschedulableQ *UnschedulablePodsMap
// 儲存已經被指定好要跑在某個 node 的 pod
nominatedPods *nominatedPodMap
// 只要將 pod 從 unschedulableQ 移動到 activeQ,就設定為true;從 activeQ 中 pop 出來 pod的時候設定為 false. 這個欄位表明一個 pod 在被排程的過程中是否接收到了佇列 move 操作,如果發生了 move 操作,那麼這個 pod 就算被認定為 unschedulable,也被放回到 activeQ.
receivedMoveRequest bool
closed bool
}
PriorityQueue 的方法比較好理解,我們看幾個吧:
1、func (p *PriorityQueue) Add(pod *v1.Pod) error
//在 active queue 中新增1個pod
!FILENAME pkg/scheduler/internal/queue/scheduling_queue.go:276
func (p *PriorityQueue) Add(pod *v1.Pod) error {
p.lock.Lock()
defer p.lock.Unlock()
// 直接在 activeQ 中新增 pod
err := p.activeQ.Add(pod)
if err != nil {
klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err)
} else {
// 如果在 unschedulableQ 中找到這個 pod,拋錯誤日誌後移除佇列中該 pod
if p.unschedulableQ.get(pod) != nil {
klog.Errorf("Error: pod %v/%v is already in the unschedulable queue.", pod.Namespace, pod.Name)
p.unschedulableQ.delete(pod)
}
// 佇列的 nominatedPods 屬性中標記該 pod 不指定到任何 node
p.nominatedPods.add(pod, "")
p.cond.Broadcast()
}
return err
}
2、func (p *PriorityQueue) AddIfNotPresent(pod *v1.Pod) error
//如果2個佇列中都不存在該 pod,那麼就新增到 active queue 中
!FILENAME pkg/scheduler/internal/queue/scheduling_queue.go:295
func (p *PriorityQueue) AddIfNotPresent(pod *v1.Pod) error {
p.lock.Lock()
defer p.lock.Unlock()
//如果佇列 unschedulableQ 中有 pod,啥也不做
if p.unschedulableQ.get(pod) != nil {
return nil
}
//如果佇列 activeQ 中有 pod,啥也不做
if _, exists, _ := p.activeQ.Get(pod); exists {
return nil
}
// 新增 pod 到 activeQ
err := p.activeQ.Add(pod)
if err != nil {
klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err)
} else {
p.nominatedPods.add(pod, "")
p.cond.Broadcast()
}
return err
}
3、func (p *PriorityQueue) flushUnschedulableQLeftover()
//重新整理 unschedulableQ 中的 pod,如果一個 pod 的呆的時間超過了 durationStayUnschedulableQ,就移動到 activeQ 中
!FILENAME pkg/scheduler/internal/queue/scheduling_queue.go:346
func (p *PriorityQueue) flushUnschedulableQLeftover() {
p.lock.Lock()
defer p.lock.Unlock()
var podsToMove []*v1.Pod
currentTime := p.clock.Now()
// 遍歷 unschedulableQ 中的 pod
for _, pod := range p.unschedulableQ.pods {
lastScheduleTime := podTimestamp(pod)
// 這裡的預設值是 60s,所以超過 60s 的 pod 將得到進入 activeQ 的機會
if !lastScheduleTime.IsZero() && currentTime.Sub(lastScheduleTime.Time) > unschedulableQTimeInterval {
podsToMove = append(podsToMove, pod)
}
}
if len(podsToMove) > 0 {
// 全部移到 activeQ 中,又有機會被排程了
p.movePodsToActiveQueue(podsToMove)
}
}
4、func (p *PriorityQueue) Pop() (*v1.Pod, error)
//從 activeQ 中 pop 一個 pod
!FILENAME pkg/scheduler/internal/queue/scheduling_queue.go:367
func (p *PriorityQueue) Pop() (*v1.Pod, error) {
p.lock.Lock()
defer p.lock.Unlock()
for len(p.activeQ.data.queue) == 0 {
// 當佇列為空的時候會阻塞
if p.closed {
return nil, fmt.Errorf(queueClosed)
}
p.cond.Wait()
}
obj, err := p.activeQ.Pop()
if err != nil {
return nil, err
}
pod := obj.(*v1.Pod)
// 標記 receivedMoveRequest 為 false,表示新的一次排程開始了
p.receivedMoveRequest = false
return pod, err
}
再看個別 PriorityQueue.nominatedPods
屬性相關操作的方法,也就是 preempt()
函式中多次呼叫到的方法:
**5、func (p *PriorityQueue) UpdateNominatedPodForNode(pod *v1.Pod, nodeName string)**
//pod 搶佔的時候,確定一個 node 可以用於跑這個 pod 時,通過呼叫這個方法將 pod nominated 到 指定的 node 上。
!FILENAME pkg/scheduler/internal/queue/scheduling_queue.go:567
func (p *PriorityQueue) UpdateNominatedPodForNode(pod *v1.Pod, nodeName string) {
p.lock.Lock()
//邏輯在這裡面
p.nominatedPods.add(pod, nodeName)
p.lock.Unlock()
}
先看 nominatedPods 屬性的型別,這個型別用於儲存 pods 被 nominate 到 nodes 的資訊:
!FILENAME pkg/scheduler/internal/queue/scheduling_queue.go:822
type nominatedPodMap struct {
// key 是 node name,value 是 nominated 到這個 node 上的 pods
nominatedPods map[string][]*v1.Pod
// 和上面結構相反,key 是 pod 資訊,值是 node 資訊
nominatedPodToNode map[ktypes.UID]string
}
在看一下add()
方法的實現:
!FILENAME pkg/scheduler/internal/queue/scheduling_queue.go:832
func (npm *nominatedPodMap) add(p *v1.Pod, nodeName string) {
// 不管有沒有,先刪一下,防止重了
npm.delete(p)
nnn := nodeName
// 如果傳入的 nodeName 是 “”
if len(nnn) == 0 {
// 查詢 pod 的 pod.Status.NominatedNodeName
nnn = NominatedNodeName(p)
// 如果 pod.Status.NominatedNodeName 也是 “”,return
if len(nnn) == 0 {
return
}
}
// 邏輯到這裡說明要麼 nodeName 不為空字串,要麼 nodeName 為空字串但是 pod 的 pod.Status.NominatedNodeName 不為空字串,這時候開始下面的賦值
npm.nominatedPodToNode[p.UID] = nnn
for _, np := range npm.nominatedPods[nnn] {
if np.UID == p.UID {
klog.V(4).Infof("Pod %v/%v already exists in the nominated map!", p.Namespace, p.Name)
return
}
}
npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn], p)
}
PodPreemptor
PodPreemptor 用來驅逐 pods 和更新 pod annotations.
!FILENAME pkg/scheduler/factory/factory.go:145
type PodPreemptor interface {
GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error)
DeletePod(pod *v1.Pod) error
SetNominatedNodeName(pod *v1.Pod, nominatedNode string) error
RemoveNominatedNodeName(pod *v1.Pod) error
}
這個 interface 對應的實現型別是:
!FILENAME pkg/scheduler/factory/factory.go:1620
type podPreemptor struct {
Client clientset.Interface
}
這個型別綁定了4個方法:
!FILENAME pkg/scheduler/factory/factory.go:1624
// 新獲取一次 pod 的資訊
func (p *podPreemptor) GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error) {
return p.Client.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{})
}
// 刪除一個 pod
func (p *podPreemptor) DeletePod(pod *v1.Pod) error {
return p.Client.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{})
}
// 設定pod.Status.NominatedNodeName 為指定的 node name
func (p *podPreemptor) SetNominatedNodeName(pod *v1.Pod, nominatedNodeName string) error {
podCopy := pod.DeepCopy()
podCopy.Status.NominatedNodeName = nominatedNodeName
_, err := p.Client.CoreV1().Pods(pod.Namespace).UpdateStatus(podCopy)
return err
}
// 清空 pod.Status.NominatedNodeName
func (p *podPreemptor) RemoveNominatedNodeName(pod *v1.Pod) error {
if len(pod.Status.NominatedNodeName) == 0 {
return nil
}
return p.SetNominatedNodeName(pod, "")
}
xx.Algorithm.Preempt
介面定義
我們回到挺久之前講常規排程過程的時候提過的一個介面:
!FILENAME pkg/scheduler/algorithm/scheduler_interface.go:78
type ScheduleAlgorithm interface {
Schedule(*v1.Pod, NodeLister) (selectedMachine string, err error)
// Preempt 在 pod 排程發生失敗的時候嘗試搶佔低優先順序的 pod.
// 返回發生 preemption 的 node, 被 preempt的 pods 列表,
// nominated node name 需要被移除的 pods 列表,一個 error 資訊.
Preempt(*v1.Pod, NodeLister, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error)
Predicates() map[string]FitPredicate
Prioritizers() []PriorityConfig
}
這個介面上次我們講到的時候關注了Schedule()
、Predicates()
和Prioritizers()
,這次來看Preempt()
是怎麼實現的。
整體流程
Preempt()
同樣由genericScheduler
型別(pkg/scheduler/core/generic_scheduler.go:98
)實現,方法前的一大串英文註釋先來理解一下:
- Preempt 尋找一個在發生搶佔之後能夠成功排程“pod”的node.
- Preempt 選擇一個 node 然後搶佔上面的 pods 資源,返回:
- 這個 node 資訊
- 被搶佔的 pods 資訊
- nominated node name 需要被清理的 node 列表
- 可能有的 error
- Preempt 過程不涉及快照更新(快照的邏輯以後再講)
- 避免出現這種情況:preempt 發現一個不需要驅逐任何 pods 就能夠跑“pod”的 node.
- 當有很多 pending pods 在排程佇列中的時候,a nominated pod 會排到佇列中相同優先順序的 pod 後面.
- The nominated pod 會阻止其他 pods 使用“指定”的資源,哪怕花費了很多時間來等待其他 pending 的 pod.
我們先過整體流程,然後逐個分析子流程呼叫:
!FILENAME pkg/scheduler/core/generic_scheduler.go:251
func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
// 省略幾行
// 判斷執行驅逐操作是否合適
if !podEligibleToPreemptOthers(pod, g.cachedNodeInfoMap) {
klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
return nil, nil, nil, nil
}
// 所有的 nodes
allNodes, err := nodeLister.List()
if err != nil {
return nil, nil, nil, err
}
if len(allNodes) == 0 {
return nil, nil, nil, ErrNoNodesAvailable
}
// 計算潛在的執行驅逐後能夠用於跑 pod 的 nodes
potentialNodes := nodesWherePreemptionMightHelp(allNodes, fitError.FailedPredicates)
if len(potentialNodes) == 0 {
klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name)
// In this case, we should clean-up any existing nominated node name of the pod.
return nil, nil, []*v1.Pod{pod}, nil
}
// 列出 pdb 物件
pdbs, err := g.pdbLister.List(labels.Everything())
if err != nil {
return nil, nil, nil, err
}
// 計算所有 node 需要驅逐的 pods 有哪些等,後面細講
nodeToVictims, err := selectNodesForPreemption(pod, g.cachedNodeInfoMap, potentialNodes, g.predicates,
g.predicateMetaProducer, g.schedulingQueue, pdbs)
if err != nil {
return nil, nil, nil, err
}
// 拓展排程的邏輯
nodeToVictims, err = g.processPreemptionWithExtenders(pod, nodeToVictims)
if err != nil {
return nil, nil, nil, err
}
// 選擇1個 node 用於 schedule
candidateNode := pickOneNodeForPreemption(nodeToVictims)
if candidateNode == nil {
return nil, nil, nil, err
}
// 低優先順序的被 nominate 到這個 node 的 pod 很可能已經不再 fit 這個 node 了,所以
// 需要移除這些 pod 的 nomination,更新這些 pod,挪動到 activeQ 中,讓排程器
// 得以尋找另外一個 node 給這些 pod
nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode.Name)
if nodeInfo, ok := g.cachedNodeInfoMap[candidateNode.Name]; ok {
return nodeInfo.Node(), nodeToVictims[candidateNode].Pods, nominatedPods, err
}
return nil, nil, nil, fmt.Errorf(
"preemption failed: the target node %s has been deleted from scheduler cache",
candidateNode.Name)
}
上面涉及到一些子過程呼叫,我們逐個來看~
podEligibleToPreemptOthers()
// 如何判斷是否適合搶佔?nodesWherePreemptionMightHelp()
// 怎麼尋找能夠用於 preempt 的 nodes?selectNodesForPreemption()
// 這個過程計算的是什麼?pickOneNodeForPreemption()
// 怎麼從選擇最合適被搶佔的 node?
podEligibleToPreemptOthers
podEligibleToPreemptOthers
做的事情是判斷一個 pod 是否應該去搶佔其他 pods. 如果這個 pod 已經搶佔過其他 pods,那些 pods 還在 graceful termination period 中,那就不應該再次發生搶佔。- 如果一個 node 已經被這個 pod nominated,並且這個 node 上有處於 terminating 狀態的 pods,那麼就不考慮驅逐更多的 pods.
這個函式邏輯很簡單,我們直接看原始碼:
!FILENAME pkg/scheduler/core/generic_scheduler.go:1110
func podEligibleToPreemptOthers(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo) bool {
nomNodeName := pod.Status.NominatedNodeName
// 如果 pod.Status.NominatedNodeName 不是空字串
if len(nomNodeName) > 0 {
// 被 nominate 的 node
if nodeInfo, found := nodeNameToInfo[nomNodeName]; found {
for _, p := range nodeInfo.Pods() {
// 有低優先順序的 pod 處於刪除中狀態,就返回 false
if p.DeletionTimestamp != nil && util.GetPodPriority(p) < util.GetPodPriority(pod) {
// There is a terminating pod on the nominated node.
return false
}
}
}
}
return true
}
nodesWherePreemptionMightHelp
nodesWherePreemptionMightHelp
要做的事情是尋找 predicates 階段失敗但是通過搶佔也許能夠排程成功的 nodes.
這個函式也不怎麼長,看下程式碼:
!FILENAME pkg/scheduler/core/generic_scheduler.go:1060
func nodesWherePreemptionMightHelp(nodes []*v1.Node, failedPredicatesMap FailedPredicateMap) []*v1.Node {
// 潛力 node, 用於儲存返回值的 slice
potentialNodes := []*v1.Node{}
for _, node := range nodes {
// 這個為 true 表示一個 node 驅逐 pod 也不一定能適合當前 pod 執行
unresolvableReasonExist := false
// 一個 node 對應的所有失敗的 predicates
failedPredicates, _ := failedPredicatesMap[node.Name]
// 遍歷,看是不是再下面指定的這些原因中,如果在,就標記 unresolvableReasonExist = true
for _, failedPredicate := range failedPredicates {
switch failedPredicate {
case
predicates.ErrNodeSelectorNotMatch,
predicates.ErrPodAffinityRulesNotMatch,
predicates.ErrPodNotMatchHostName,
predicates.ErrTaintsTolerationsNotMatch,
predicates.ErrNodeLabelPresenceViolated,
predicates.ErrNodeNotReady,
predicates.ErrNodeNetworkUnavailable,
predicates.ErrNodeUnderDiskPressure,
predicates.ErrNodeUnderPIDPressure,
predicates.ErrNodeUnderMemoryPressure,
predicates.ErrNodeOutOfDisk,
predicates.ErrNodeUnschedulable,
predicates.ErrNodeUnknownCondition,
predicates.ErrVolumeZoneConflict,
predicates.ErrVolumeNodeConflict,
predicates.ErrVolumeBindConflict:
unresolvableReasonExist = true
// 如果找到一個上述失敗原因,說明這個 node 已經可以排除了,break 後繼續下一個 node 的計算
break
}
}
// false 的時候,也就是這個 node 也許驅逐 pods 後有用,那就新增到 potentialNodes 中
if !unresolvableReasonExist {
klog.V(3).Infof("Node %v is a potential node for preemption.", node.Name)
potentialNodes = append(potentialNodes, node)
}
}
return potentialNodes
}
selectNodesForPreemption
這個函式會併發計算所有的 nodes 是否通過驅逐實現 pod 搶佔。
看這個函式內容之前我們先看一下返回值的型別:
map[*v1.Node]*schedulerapi.Victims
的 key 很好理解,value 是啥呢:
type Victims struct {
Pods []*v1.Pod
NumPDBViolations int
}
這裡的 Pods 是被選中準備要驅逐的;NumPDBViolations 表示的是要破壞多少個 PDB 限制。這裡肯定也就是要儘量符合 PDB 要求,能不和 PDB 衝突就不衝突。
然後看一下這個函式的整體過程:
!FILENAME pkg/scheduler/core/generic_scheduler.go:895
func selectNodesForPreemption(pod *v1.Pod,
nodeNameToInfo map[string]*schedulercache.NodeInfo,
potentialNodes []*v1.Node, // 上一個函式計算出來的 nodes
predicates map[string]algorithm.FitPredicate,
metadataProducer algorithm.PredicateMetadataProducer,
queue internalqueue.SchedulingQueue, // 這裡其實是前面講的優先順序佇列 PriorityQueue
pdbs []*policy.PodDisruptionBudget, // pdb 列表
) (map[*v1.Node]*schedulerapi.Victims, error) {
nodeToVictims := map[*v1.Node]*schedulerapi.Victims{}
var resultLock sync.Mutex
// We can use the same metadata producer for all nodes.
meta := metadataProducer(pod, nodeNameToInfo)
// 這種形式的併發已經不陌生了,前面遇到過幾次了
checkNode := func(i int) {
nodeName := potentialNodes[i].Name
var metaCopy algorithm.PredicateMetadata
if meta != nil {
metaCopy = meta.ShallowCopy()
}
// 這裡有一個子過程呼叫,下面單獨介紹
pods, numPDBViolations, fits := selectVictimsOnNode(pod, metaCopy, nodeNameToInfo[nodeName], predicates, queue, pdbs)
if fits {
resultLock.Lock()
victims := schedulerapi.Victims{
Pods: pods,
NumPDBViolations: numPDBViolations,
}
// 如果 fit,就新增到 nodeToVictims 中,也就是最後的返回值
nodeToVictims[potentialNodes[i]] = &victims
resultLock.Unlock()
}
}
workqueue.ParallelizeUntil(context.TODO(), 16, len(potentialNodes), checkNode)
return nodeToVictims, nil
}
上面這個函式的核心邏輯在 selectVictimsOnNode 中,這個函式嘗試在給定的 node 中尋找最少數量的需要被驅逐的 pods,同時需要保證驅逐了這些 pods 之後,這個 noode 能夠滿足“pod”執行需求。
這些被驅逐的 pods 計算同時需要滿足一個約束,就是能夠刪除低優先順序的 pod 絕不先刪高優先順序的 pod.
這個演算法首選計算當這個 node 上所有的低優先順序 pods 被驅逐之後能否排程“pod”. 如果可以,那就按照優先順序排序,根據 PDB 是否破壞分成兩組,一組是影響 PDB 限制的,另外一組是不影響 PDB. 兩組各自按照優先順序排序。然後開始逐漸釋放影響 PDB 的 group 中的 pod,然後逐漸釋放不影響 PDB 的 group 中的 pod,在這個過程中要保持“pod”能夠 fit 這個 node. 也就是說一旦放過某一個 pod 導致“pod”不 fit 這個 node 了,那就說明這個 pod 不能放過,也就是意味著已經找到了最少 pods 集。
看一下具體的實現吧:
FILENAME pkg/scheduler/core/generic_scheduler.go:983
func selectVictimsOnNode(
pod *v1.Pod,
meta algorithm.PredicateMetadata,
nodeInfo *schedulercache.NodeInfo,
fitPredicates map[string]algorithm.FitPredicate,
queue internalqueue.SchedulingQueue,
pdbs []*policy.PodDisruptionBudget,
) ([]*v1.Pod, int, bool) {
if nodeInfo == nil {
return nil, 0, false
}
// 排個序
potentialVictims := util.SortableList{CompFunc: util.HigherPriorityPod}
nodeInfoCopy := nodeInfo.Clone()
// 定義刪除 pod 函式
removePod := func(rp *v1.Pod) {
nodeInfoCopy.RemovePod(rp)
if meta != nil {
meta.RemovePod(rp)
}
}
// 定義新增 pod 函式
addPod := func(ap *v1.Pod) {
nodeInfoCopy.AddPod(ap)
if meta != nil {
meta.AddPod(ap, nodeInfoCopy)
}
}
// 刪除所有的低優先順序 pod 看是不是能夠滿足排程需求了
podPriority := util.GetPodPriority(pod)
for _, p := range nodeInfoCopy.Pods() {
if util.GetPodPriority(p) < podPriority {
// 刪除的意思其實就是新增元素到 potentialVictims.Items
potentialVictims.Items = append(potentialVictims.Items, p)
removePod(p)
}
}
// 排個序
potentialVictims.Sort()
// 如果刪除了所有的低優先順序 pods 之後還不能跑這個新 pod,那麼差不多就可以判斷這個 node 不適合 preemption 了,還有一點點需要考慮的是這個“pod”的不 fit 的原因是由於 pod affinity 不滿足了。
// 後續可能會增加當前 pod 和低優先順序 pod 之間的 優先順序檢查。
// 這個函式呼叫其實就是之前講到過的預選函式的呼叫邏輯,判斷這個 pod 是否合適跑在這個 node 上。
if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false, nil); !fits {
if err != nil {
klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
}
return nil, 0, false
}
var victims []*v1.Pod
numViolatingVictim := 0
// 嘗試儘量多地釋放這些 pods,也就是說能少殺就少殺;這裡先從 PDB violating victims 中釋放,再從 PDB non-violating victims 中釋放;兩個組都是從高優先順序的 pod 開始釋放。
violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims.Items, pdbs)
// 釋放 pods 的函式,來一個放一個
reprievePod := func(p *v1.Pod) bool {
addPod(p)
fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false, nil)
if !fits {
removePod(p)
victims = append(victims, p)
klog.V(5).Infof("Pod %v is a potential preemption victim on node %v.", p.Name, nodeInfo.Node().Name)
}
return fits
}
// 釋放 violatingVictims 中元素的同時會記錄放了多少個
for _, p := range violatingVictims {
if !reprievePod(p) {
numViolatingVictim++
}
}
// 開始釋放 non-violating victims.
for _, p := range nonViolatingVictims {
reprievePod(p)
}
return victims, numViolatingVictim, true
}
pickOneNodeForPreemption
pickOneNodeForPreemption
要從給定的 nodes 中選擇一個 node,這個函式假設給定的 map 中 value 部分是以 priority 降序排列的。這裡選擇 node 的標準是:
- 最少的 PDB violations
- 最少的高優先順序 victim
- 優先順序總數字最小
- victim 總數最小
- 直接返回第一個
!FILENAME pkg/scheduler/core/generic_scheduler.go:788
func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*schedulerapi.Victims) *v1.Node {
if len(nodesToVictims) == 0 {
return nil
}
// 初始化為最大值
minNumPDBViolatingPods := math.MaxInt32
var minNodes1 []*v1.Node
lenNodes1 := 0
// 這個迴圈要找到 PDBViolatingPods 最少的 node,如果有多個,就全部存在 minNodes1 中
for node, victims := range nodesToVictims {
if len(victims.Pods) == 0 {
// 如果發現一個不需要驅逐 pod 的 node,馬上返回
return node
}
numPDBViolatingPods := victims.NumPDBViolations
if numPDBViolatingPods < minNumPDBViolatingPods {
minNumPDBViolatingPods = numPDBViolatingPods
minNodes1 = nil
lenNodes1 = 0
}
if numPDBViolatingPods == minNumPDBViolatingPods {
minNodes1 = append(minNodes1, node)
lenNodes1++
}
}
// 如果只找到1個 PDB violations 最少的 node,那就直接返回這個 node 就 ok 了
if lenNodes1 == 1 {
return minNodes1[0]
}
// 還剩下多個 node,那就尋找 highest priority victim 最小的 node
minHighestPriority := int32(math.MaxInt32)
var minNodes2 = make([]*v1.Node, lenNodes1)
lenNodes2 := 0
// 這個迴圈要做的事情是看2個 node 上 victims 中最高優先順序的 pod 哪個優先順序更高
for i := 0; i < lenNodes1; i++ {
node := minNodes1[i]
victims := nodesToVictims[node]
// highestPodPriority is the highest priority among the victims on this node.
highestPodPriority := util.GetPodPriority(victims.Pods[0])
if highestPodPriority < minHighestPriority {
minHighestPriority = highestPodPriority
lenNodes2 = 0
}
if highestPodPriority == minHighestPriority {
minNodes2[lenNodes2] = node
lenNodes2++
}
}
// 發現只有1個,那就直接返回
if lenNodes2 == 1 {
return minNodes2[0]
}
// 這時候還沒有抉擇出一個 node,那就開始計算優先順序總和了,看哪個更低
minSumPriorities := int64(math.MaxInt64)
lenNodes1 = 0
for i := 0; i < lenNodes2; i++ {
var sumPriorities int64
node := minNodes2[i]
for _, pod := range nodesToVictims[node].Pods {
// 這裡的累加考慮到了先把優先順序搞成正數。不然會出現1個 node 上有1優先順序為 -3 的 pod,另外一個 node 上有2個優先順序為 -3 的 pod,結果 -3>-6,有2個 pod 的 node 反而被認為總優先順序更低!
sumPriorities += int64(util.GetPodPriority(pod)) + int64(math.MaxInt32+1)
}
if sumPriorities < minSumPriorities {
minSumPriorities = sumPriorities
lenNodes1 = 0
}
if sumPriorities == minSumPriorities {
minNodes1[lenNodes1] = node
lenNodes1++
}
}
if lenNodes1 == 1 {
return minNodes1[0]
}
// 還是沒有分出勝負,於是開始用 pod 總數做比較
minNumPods := math.MaxInt32
lenNodes2 = 0
for i := 0; i < lenNodes1; i++ {
node := minNodes1[i]
numPods := len(nodesToVictims[node].Pods)
if numPods < minNumPods {
minNumPods = numPods
lenNodes2 = 0
}
if numPods == minNumPods {
minNodes2[lenNodes2] = node
lenNodes2++
}
}
// 還是沒有區分出來1個 node 的話,只能放棄區分了,直接返回第一個結果
if lenNodes2 > 0 {
return minNodes2[0]
}
klog.Errorf("Error in logic of node scoring for preemption. We should never reach here!")
return nil
}
小結
咋個說呢,此處應該有總結的,搶佔過程的邏輯比我想象中的複雜,設計很巧妙,行雲流水,大快人心!preemption 可以簡單說成再預選->再優選吧;還是不多說了,一天寫這麼多有點坐不住了,下回再繼續聊排程器~
引用連結:
gitbook:https://farmer-hutao.github.io/k8s-source-code-analysis/
github:https://hub.fastgit.org/daniel-hutao/k8s-source-code-analysis