1. 程式人生 > 其它 >Kubernetes Job Controller 原理和原始碼分析(三)

Kubernetes Job Controller 原理和原始碼分析(三)

概述Job controller 的啟動processNextWorkItem()核心調諧邏輯入口 - syncJob()Pod 數量管理 - manageJob()小結

概述

原始碼版本:kubernetes master 分支 commit-fe62fc(2021年10月14日)

Job 是主要的 Kubernetes 原生 Workload 資源之一,是在 Kubernetes 之上執行批處理任務最簡單的方式,在 AI 模型訓練等場景下最基礎的實現版本就是拉起一個 Job 來完成一次訓練任務,然後才是各種自定義 “Job” 實現進階處理,比如分散式訓練需要一個 “Job” 同時拉起多個 Pod,但是每個 Pod 的啟動引數會有差異。所以深入理解 Job 的功能和實現細節是進一步開發自定義 “Job” 型別工作負載的基礎。

我們在《Kubernetes Job Controller 原理和原始碼分析(一)》中詳細介紹了 Job 的特性,在《Kubernetes Job Controller 原理和原始碼分析(二)》 中一路從 Job 控制器原始碼入口跟到所有 EventHandler 的實現,今天我們繼續從 workqueue 的另外一端看下任務出隊後的主要調諧邏輯實現。

注意:閱讀 Job 原始碼需要有一定的自定義控制器工作原理基礎,裡面涉及到了 Informer 工作機制、workqueue(延時工作佇列)、ResourceEventHandler 等等邏輯,沒有相關知識儲備直接看本文會有一定挑戰,建議先閱讀

《深入理解 K8S 原理與實現》系列目錄裡列的相關文章。


《Kubernetes Job Controller 原理和原始碼分析》分為三講:

Job controller 的啟動

繼續來看 Run() 方法

  • pkg/controller/job/job_controller.go
 1func(jm*Controller)Run(workersint,stopCh<-chanstruct{}){
2deferutilruntime.HandleCrash()
3deferjm.queue.ShutDown()
4deferjm.orphanQueue.ShutDown()
5
6klog.Infof("Startingjobcontroller")
7deferklog.Infof("Shuttingdownjobcontroller")
8
9if!cache.WaitForNamedCacheSync("job",stopCh,jm.podStoreSynced,jm.jobStoreSynced){
10return
11}
12
13fori:=0;i<workers;i++{
14gowait.Until(jm.worker,time.Second,stopCh)
15}
16
17gowait.Until(jm.orphanWorker,time.Second,stopCh)
18
19<-stopCh
20}

可以看到這裡的邏輯在兩個 worker 裡,繼續看 jm.workerjm.orphanWorker 是什麼邏輯

1func(jm*Controller)worker(){
2forjm.processNextWorkItem(){
3}
4}

worker() 方法裡簡單呼叫了 processNextWorkItem() 方法,另外一個 orphanWorker() 也是類似邏輯

1func(jm*Controller)orphanWorker(){
2forjm.processNextOrphanPod(){
3}
4}

繼續看

processNextWorkItem()

 1func(jm*Controller)processNextWorkItem()bool{
2key,quit:=jm.queue.Get()
3ifquit{
4returnfalse
5}
6deferjm.queue.Done(key)
7
8forget,err:=jm.syncHandler(key.(string))//核心邏輯
9iferr==nil{
10ifforget{
11jm.queue.Forget(key)
12}
13returntrue
14}
15
16utilruntime.HandleError(fmt.Errorf("syncingjob:%w",err))
17if!apierrors.IsConflict(err){
18jm.queue.AddRateLimited(key)
19}
20
21returntrue
22}

這裡的核心邏輯只有一行,就是 jm.syncHandler(key.(string)) 呼叫。我們往裡跟這個 sincHandler 方法可以看到其實現是 func (jm *Controller) syncJob(key string) (forget bool, rErr error) 方法,這個方法裡就是主要的調諧邏輯了。看下具體邏輯:

核心調諧邏輯入口 - syncJob()

從 workqueue 出來任務之後主要的業務邏輯從這裡開始,這個方法很長……

  • pkg/controller/job/job_controller.go:588
  1func(jm*Controller)syncJob(keystring)(forgetbool,rErrerror){
2startTime:=time.Now()
3deferfunc(){
4klog.V(4).Infof("Finishedsyncingjob%q(%v)",key,time.Since(startTime))
5}()
6//key的結構一般是namespace/name的格式
7ns,name,err:=cache.SplitMetaNamespaceKey(key)
8iferr!=nil{
9returnfalse,err
10}
11iflen(ns)==0||len(name)==0{
12returnfalse,fmt.Errorf("invalidjobkey%q:eithernamespaceornameismissing",key)
13}
14//這裡叫sharedJob是因為取的是本地cache的job,通過Indexer提供的能力
15sharedJob,err:=jm.jobLister.Jobs(ns).Get(name)
16iferr!=nil{
17//如果找不到,說明被其他goroutine刪掉了,忽略
18ifapierrors.IsNotFound(err){
19klog.V(4).Infof("Jobhasbeendeleted:%v",key)
20jm.expectations.DeleteExpectations(key)
21jm.finalizerExpectations.deleteExpectations(key)
22returntrue,nil
23}
24returnfalse,err
25}
26//拷貝一份避免修改
27job:=*sharedJob.DeepCopy()
28
29//通過JobCondition的Type是否為"Complete"/"Failed"來判斷job是否已經完成了
30ifIsJobFinished(&job){
31returntrue,nil
32}
33
34//這個feature是1.22版本進入beta的,如果兩邊配置不一致,則無法繼續處理
35//本質通過.Spec.CompletionMode=="Indexed"來判斷
36if!feature.DefaultFeatureGate.Enabled(features.IndexedJob)&&isIndexedJob(&job){
37jm.recorder.Event(&job,v1.EventTypeWarning,"IndexedJobDisabled","SkippedIndexedJobsyncbecausefeatureisdisabled.")
38returnfalse,nil
39}
40//CompletionMode為"NonIndexed"/"Indexed",如果是其他值則不識別
41ifjob.Spec.CompletionMode!=nil&&*job.Spec.CompletionMode!=batch.NonIndexedCompletion&&*job.Spec.CompletionMode!=batch.IndexedCompletion{
42jm.recorder.Event(&job,v1.EventTypeWarning,"UnknownCompletionMode","SkippedJobsyncbecausecompletionmodeisunknown")
43returnfalse,nil
44}
45
46//配置當前的completionMode,預設為"NonIndexed"
47completionMode:=string(batch.NonIndexedCompletion)
48ifisIndexedJob(&job){
49completionMode=string(batch.IndexedCompletion)
50}
51//"reconciling"
52action:=metrics.JobSyncActionReconciling
53
54//metrics邏輯
55deferfunc(){
56result:="success"
57ifrErr!=nil{
58result="error"
59}
60
61metrics.JobSyncDurationSeconds.WithLabelValues(completionMode,result,action).Observe(time.Since(startTime).Seconds())
62metrics.JobSyncNum.WithLabelValues(completionMode,result,action).Inc()
63}()
64
65varexpectedRmFinalizerssets.String
66varuncounted*uncountedTerminatedPods
67//處理podfinalizer,1.22版本alpha的特性
68iftrackingUncountedPods(&job){
69klog.V(4).InfoS("TrackinguncountedPodswithpodfinalizers","job",klog.KObj(&job))
70ifjob.Status.UncountedTerminatedPods==nil{
71job.Status.UncountedTerminatedPods=&batch.UncountedTerminatedPods{}
72}
73uncounted=newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods)
74expectedRmFinalizers=jm.finalizerExpectations.getExpectedUIDs(key)
75//刪除job的"batch.kubernetes.io/job-tracking"註解
76}elseifpatch:=removeTrackingAnnotationPatch(&job);patch!=nil{
77iferr:=jm.patchJobHandler(&job,patch);err!=nil{
78returnfalse,fmt.Errorf("removingtrackingfinalizerfromjob%s:%w",key,err)
79}
80}
81
82jobNeedsSync:=jm.expectations.SatisfiedExpectations(key)
83
84//提取相關pods
85pods,err:=jm.getPodsForJob(&job,uncounted!=nil)
86iferr!=nil{
87returnfalse,err
88}
89//判斷依據是PodPhase不為"Succeeded"和"Failed"兩個結果態
90activePods:=controller.FilterActivePods(pods)
91active:=int32(len(activePods))
92//計算"Succeeded"和"Failed"狀態pod的數量
93succeeded,failed:=getStatus(&job,pods,uncounted,expectedRmFinalizers)
94//滿足這個條件說明這個pod是新建立的,這時候需要設定.Status.StartTime
95ifjob.Status.StartTime==nil&&!jobSuspended(&job){
96now:=metav1.Now()
97job.Status.StartTime=&now
98//如果ActiveDeadlineSeconds不為空,則在ActiveDeadlineSeconds時間到後再次調諧
99ifjob.Spec.ActiveDeadlineSeconds!=nil{
100klog.V(4).Infof("Job%shasActiveDeadlineSecondswillsyncafter%dseconds",
101key,*job.Spec.ActiveDeadlineSeconds)
102jm.queue.AddAfter(key,time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second)
103}
104}
105
106varmanageJobErrerror
107varfinishedCondition*batch.JobCondition
108
109//有新增failed到pod
110jobHasNewFailure:=failed>job.Status.Failed
111
112//有新的failedpod產生,而且active的pod數量不等於併發數,而且已經失敗的pod數量大於重試次數限制
113exceedsBackoffLimit:=jobHasNewFailure&&(active!=*job.Spec.Parallelism)&&
114(failed>*job.Spec.BackoffLimit)
115//pastBackoffLimitOnFailure計算的是當pod重啟策略為OnFailure時重啟次數是否超過限制
116ifexceedsBackoffLimit||pastBackoffLimitOnFailure(&job,pods){
117//重試次數達到上限,Condition更新為"Failed"
118finishedCondition=newCondition(batch.JobFailed,v1.ConditionTrue,"BackoffLimitExceeded","Jobhasreachedthespecifiedbackofflimit")
119//超時了
120}elseifpastActiveDeadline(&job){
121finishedCondition=newCondition(batch.JobFailed,v1.ConditionTrue,"DeadlineExceeded","Jobwasactivelongerthanspecifieddeadline")
122}
123//計算索引
124varprevSucceededIndexes,succeededIndexesorderedIntervals
125ifisIndexedJob(&job){
126prevSucceededIndexes,succeededIndexes=calculateSucceededIndexes(&job,pods)
127succeeded=int32(succeededIndexes.total())
128}
129suspendCondChanged:=false
130//如果job失敗了,這時候在Active狀態的Pod需要直接刪除
131iffinishedCondition!=nil{
132deleted,err:=jm.deleteActivePods(&job,activePods)
133ifuncounted==nil{
134deleted=active
135}elseifdeleted!=active{
136finishedCondition=nil
137}
138active-=deleted
139failed+=deleted
140manageJobErr=err
141}else{
142manageJobCalled:=false
143ifjobNeedsSync&&job.DeletionTimestamp==nil{
144//manageJob()方法是根據Spec管理執行中Pod數量的核心方法
145active,action,manageJobErr=jm.manageJob(&job,activePods,succeeded,succeededIndexes)
146manageJobCalled=true
147}
148//判斷job已經完成
149complete:=false
150ifjob.Spec.Completions==nil{
151complete=succeeded>0&&active==0
152}else{
153complete=succeeded>=*job.Spec.Completions&&active==0
154}
155ifcomplete{
156finishedCondition=newCondition(batch.JobComplete,v1.ConditionTrue,"","")
157//Job掛起是1.22版本beta的新特性
158}elseiffeature.DefaultFeatureGate.Enabled(features.SuspendJob)&&manageJobCalled{
159//如果配置了掛起
160ifjob.Spec.Suspend!=nil&&*job.Spec.Suspend{
161//只有沒完成的Job可以被掛起
162varisUpdatedbool
163job.Status.Conditions,isUpdated=ensureJobConditionStatus(job.Status.Conditions,batch.JobSuspended,v1.ConditionTrue,"JobSuspended","Jobsuspended")
164ifisUpdated{
165suspendCondChanged=true
166jm.recorder.Event(&job,v1.EventTypeNormal,"Suspended","Jobsuspended")
167}
168}else{
169//掛起狀態喚醒
170varisUpdatedbool
171job.Status.Conditions,isUpdated=ensureJobConditionStatus(job.Status.Conditions,batch.JobSuspended,v1.ConditionFalse,"JobResumed","Jobresumed")
172ifisUpdated{
173suspendCondChanged=true
174jm.recorder.Event(&job,v1.EventTypeNormal,"Resumed","Jobresumed")
175now:=metav1.Now()
176//重置StartTime
177job.Status.StartTime=&now
178}
179}
180}
181}
182
183forget=false
184//檢查成功的pod是否多了
185ifjob.Status.Succeeded<succeeded{
186forget=true
187}
188
189ifuncounted!=nil{
190//掛起狀態變更或者activepod數量變更
191needsStatusUpdate:=suspendCondChanged||active!=job.Status.Active
192job.Status.Active=active
193//Finalizer相關邏輯
194err=jm.trackJobStatusAndRemoveFinalizers(&job,pods,prevSucceededIndexes,*uncounted,expectedRmFinalizers,finishedCondition,needsStatusUpdate)
195iferr!=nil{
196returnfalse,fmt.Errorf("trackingstatus:%w",err)
197}
198jobFinished:=IsJobFinished(&job)
199ifjobHasNewFailure&&!jobFinished{
200returnforget,fmt.Errorf("failedpod(s)detectedforjobkey%q",key)
201}
202forget=true
203returnforget,manageJobErr
204}
205//移除所有Finalizer
206iferr:=jm.removeTrackingFinalizersFromAllPods(pods);err!=nil{
207returnfalse,fmt.Errorf("removingdisabledfinalizersfromjobpods%s:%w",key,err)
208}
209
210//判斷狀態是否需要更新
211ifjob.Status.Active!=active||job.Status.Succeeded!=succeeded||job.Status.Failed!=failed||suspendCondChanged||finishedCondition!=nil{
212job.Status.Active=active
213job.Status.Succeeded=succeeded
214job.Status.Failed=failed
215ifisIndexedJob(&job){
216job.Status.CompletedIndexes=succeededIndexes.String()
217}
218job.Status.UncountedTerminatedPods=nil
219jm.enactJobFinished(&job,finishedCondition)
220
221if_,err:=jm.updateStatusHandler(&job);err!=nil{
222returnforget,err
223}
224
225ifjobHasNewFailure&&!IsJobFinished(&job){
226//returninganerrorwillre-enqueueJobafterthebackoffperiod
227returnforget,fmt.Errorf("failedpod(s)detectedforjobkey%q",key)
228}
229
230forget=true
231}
232
233returnforget,manageJobErr
234}

Pod 數量管理 - manageJob()

上面 syncJob() 中有一個 manageJob() 方法呼叫,manageJob() 具體控制一個 Job 下應該有多少個 Active 的 Pod,執行“多刪少建”工作。這個方法也很長……

  • pkg/controller/job/job_controller.go:1245
  1func(jm*Controller)manageJob(job*batch.Job,activePods[]*v1.Pod,succeededint32,succeededIndexes[]interval)(int32,string,error){
2//執行中的pod數量
3active:=int32(len(activePods))
4//併發度
5parallelism:=*job.Spec.Parallelism
6jobKey,err:=controller.KeyFunc(job)
7iferr!=nil{
8utilruntime.HandleError(fmt.Errorf("Couldn'tgetkeyforjob%#v:%v",job,err))
9return0,metrics.JobSyncActionTracking,nil
10}
11//掛起狀態
12ifjobSuspended(job){
13klog.V(4).InfoS("Deletingallactivepodsinsuspendedjob","job",klog.KObj(job),"active",active)
14podsToDelete:=activePodsForRemoval(job,activePods,int(active))
15jm.expectations.ExpectDeletions(jobKey,len(podsToDelete))
16//掛起需要直接刪除所有Active的Pod
17removed,err:=jm.deleteJobPods(job,jobKey,podsToDelete)
18active-=removed
19returnactive,metrics.JobSyncActionPodsDeleted,err
20}
21
22wantActive:=int32(0)
23ifjob.Spec.Completions==nil{
24//對應沒有配置Completions的場景,這時候執行中的Pod需要和併發數相等,有一個Pod成功時判斷Job狀態為成功
25ifsucceeded>0{
26wantActive=active
27}else{
28wantActive=parallelism
29}
30}else{
31//指定了Completions場景,這時候執行中的Pod數量不應該超過Completions減去已經成功的數量
32wantActive=*job.Spec.Completions-succeeded
33//不能超過併發數
34ifwantActive>parallelism{
35wantActive=parallelism
36}
37ifwantActive<0{
38wantActive=0
39}
40}
41//如果實際active數量大於應該active的數量,就需要刪除幾個
42rmAtLeast:=active-wantActive
43ifrmAtLeast<0{
44rmAtLeast=0
45}
46//計算哪些Pod需要被刪除
47podsToDelete:=activePodsForRemoval(job,activePods,int(rmAtLeast))
48iflen(podsToDelete)>MaxPodCreateDeletePerSync{
49podsToDelete=podsToDelete[:MaxPodCreateDeletePerSync]
50}
51//執行刪除動作
52iflen(podsToDelete)>0{
53jm.expectations.ExpectDeletions(jobKey,len(podsToDelete))
54klog.V(4).InfoS("Toomanypodsrunningforjob","job",klog.KObj(job),"deleted",len(podsToDelete),"target",wantActive)
55removed,err:=jm.deleteJobPods(job,jobKey,podsToDelete)
56active-=removed
57returnactive,metrics.JobSyncActionPodsDeleted,err
58}
59//實際執行的pod數量不夠場景
60ifactive<wantActive{
61diff:=wantActive-active
62//如果大於上限500,則設定為500
63ifdiff>int32(MaxPodCreateDeletePerSync){
64diff=int32(MaxPodCreateDeletePerSync)
65}
66
67jm.expectations.ExpectCreations(jobKey,int(diff))
68errCh:=make(chanerror,diff)
69klog.V(4).Infof("Toofewpodsrunningjob%q,need%d,creating%d",jobKey,wantActive,diff)
70
71wait:=sync.WaitGroup{}
72
73varindexesToAdd[]int
74ifisIndexedJob(job){
75indexesToAdd=firstPendingIndexes(activePods,succeededIndexes,int(diff),int(*job.Spec.Completions))
76diff=int32(len(indexesToAdd))
77}
78active+=diff
79//提取pod模板
80podTemplate:=job.Spec.Template.DeepCopy()
81ifisIndexedJob(job){
82addCompletionIndexEnvVariables(podTemplate)
83}
84iftrackingUncountedPods(job){
85podTemplate.Finalizers=appendJobCompletionFinalizerIfNotFound(podTemplate.Finalizers)
86}
87//batchSize從1開始,然後指數遞增,2、4、8……的方式,這是一種“慢啟動”過程,防止一下子嘗試建立大量Pod,但是由於相同的原因批量失敗
88forbatchSize:=int32(integer.IntMin(int(diff),controller.SlowStartInitialBatchSize));diff>0;batchSize=integer.Int32Min(2*batchSize,diff){
89errorCount:=len(errCh)
90wait.Add(int(batchSize))
91fori:=int32(0);i<batchSize;i++{
92//-1
93completionIndex:=unknownCompletionIndex
94iflen(indexesToAdd)>0{
95completionIndex=indexesToAdd[0]
96indexesToAdd=indexesToAdd[1:]
97}
98gofunc(){
99template:=podTemplate
100generateName:=""
101ifcompletionIndex!=unknownCompletionIndex{
102template=podTemplate.DeepCopy()
103addCompletionIndexAnnotation(template,completionIndex)
104//設定Hostname
105template.Spec.Hostname=fmt.Sprintf("%s-%d",job.Name,completionIndex)
106generateName=podGenerateNameWithIndex(job.Name,completionIndex)
107}
108deferwait.Done()
109//建立pod
110err:=jm.podControl.CreatePodsWithGenerateName(job.Namespace,template,job,metav1.NewControllerRef(job,controllerKind),generateName)
111iferr!=nil{
112ifapierrors.HasStatusCause(err,v1.NamespaceTerminatingCause){
113return
114}
115}
116iferr!=nil{
117deferutilruntime.HandleError(err)
118klog.V(2).Infof("Failedcreation,decrementingexpectationsforjob%q/%q",job.Namespace,job.Name)
119jm.expectations.CreationObserved(jobKey)
120atomic.AddInt32(&active,-1)
121errCh<-err
122}
123}()
124}
125wait.Wait()
126skippedPods:=diff-batchSize
127iferrorCount<len(errCh)&&skippedPods>0{
128klog.V(2).Infof("Slow-startfailure.Skippingcreationof%dpods,decrementingexpectationsforjob%q/%q",skippedPods,job.Namespace,job.Name)
129active-=skippedPods
130fori:=int32(0);i<skippedPods;i++{
131jm.expectations.CreationObserved(jobKey)
132}
133//忽略的pod在下次調諧過程中繼續嘗試“慢啟動”
134break
135}
136//成功處理的數量減掉
137diff-=batchSize
138}
139returnactive,metrics.JobSyncActionPodsCreated,errorFromChannel(errCh)
140}
141
142returnactive,metrics.JobSyncActionTracking,nil
143}

小結

Job 控制器實現的邏輯並不複雜,就像我們一開始在《Kubernetes Job Controller 原理和原始碼分析(一)》中介紹的那些特性一樣,Job 要支援的功能並不多。但是要完整理解 Pod 控制器全部原始碼並不簡單,一方面需要對控制器模式本身有一定的理解,知道控制器的整體工作流;另外一方面 Job 控制器的實現中有大量健壯性程式碼,在實現功能的基礎上程式碼量大了很多,要理清所有的細節還是有一定燒腦。最後這個原始碼組織結構著實看著不舒服,幾百行程式碼“懟”到一個函式裡,對“閱讀者”不太友好。

(轉載請保留本文原始連結 https://www.danielhu.cn)