7.深入k8s:任務呼叫Job與CronJob及原始碼分析
阿新 • • 發佈:2020-08-23
> 轉載請宣告出處哦~,本篇文章釋出於luozhiyun的部落格:https://www.luozhiyun.com
在使用job中,我會結合原始碼進行一定的講解,我們也可以從原始碼中一窺究竟,一些細節k8s是如何處理的,從而感受k8s的魅力。原始碼版本是[1.19](https://github.com/kubernetes/kubernetes/tree/release-1.19)
![img](https://img.luozhiyun.com/20200823163612.png)
## Job
### Job的基本使用
Job主要是用來任務呼叫,可以一個或多個 Pod,並確保指定數量的 Pod 可以成功執行到程序正常結束。
建立一個Job:
```yaml
apiVersion: batch/v1
kind: Job
metadata:
name: pi
spec:
template:
spec:
containers:
- name: pi
image: perl
command: ["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"]
restartPolicy: Never
backoffLimit: 4
```
這個Job會建立一個容器,然後執行命令進行π的計算,
然後我們建立這個pod:
```shell
$ kubectl create -f job.yaml
$ kubectl describe jobs/pi
Name: pi
Namespace: default
Selector: controller-uid=cf78ebe4-07f9-4234-b8f9-2fe92df352ea
Labels: controller-uid=cf78ebe4-07f9-4234-b8f9-2fe92df352ea
job-name=pi
Annotations: Parallelism: 1
Completions: 1
...
Pods Statuses: 0 Running / 1 Succeeded / 0 Failed
Pod Template:
Labels: controller-uid=cf78ebe4-07f9-4234-b8f9-2fe92df352ea
job-name=pi
Containers:
pi:
Image: resouer/ubuntu-bc
...
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal SuccessfulCreate 29m job-controller Created pod: pi-g9fs4
Normal Completed 27m job-controller Job completed
```
可以看到建立物件後,Pod模板中,被自動加上了一個controller-uid=< 一個隨機字串 > 這樣的 Label。而這個 Job 物件本身,則被自動加上了這個 Label 對應的 Selector,從而 保證了 Job 與它所管理的 Pod 之間的匹配關係。這個uid避免了不同Job物件的Pod不會重合。
```shell
$ kubectl get pod
NAME READY STATUS RESTARTS AGE
pi-g9fs4 0/1 Completed 0 33m
$ kubectl describe pod pi-g9fs4
...
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal Scheduled 35m default-scheduler Successfully assigned default/pi-g9fs4 to 192.168.13.130
Normal Pulling 35m kubelet, 192.168.13.130 Pulling image "resouer/ubuntu-bc"
Normal Pulled 35m kubelet, 192.168.13.130 Successfully pulled image "resouer/ubuntu-bc"
Normal Created 35m kubelet, 192.168.13.130 Created container pi
Normal Started 35m kubelet, 192.168.13.130 Started container pi
```
我們可以看到Pod在建立好執行完畢之後會進入到Completed狀態。上面的yaml定義中restartPolicy=Never也保證了這個Pod只會執行一次。
如果建立的Pod執行失敗了,那麼Job Controller會不斷建立一個新的Pod:
```shell
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
pi-55h89 0/1 ContainerCreating 0 2s
pi-tqbcz 0/1 Error 0 5s
```
#### 引數說明
**spec.backoffLimit**
我們在上面的欄位中定義了為4,表示重試次數為4。
**restartPolicy**
在執行過程中,可能發生各種系統問題導致的Pod執行失敗,如果設定restartPolicy為OnFailure,那麼在執行中發生的失敗後Job Controller會重啟Pod裡面的容器,而不是建立新的Pod。
還可以設定為Never,表示容器執行失敗之後不會重啟。更多具體的參見[Pod生命週期](https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#example-states)。
**spec.activeDeadlineSeconds**
表示最長執行時間,單位是秒。如:
```yaml
spec:
backoffLimit: 5
activeDeadlineSeconds: 100
```
這樣設定之後會進入pastActiveDeadline進行校驗`job.Spec.ActiveDeadlineSeconds`是不是為空,不是空的話,會比較Pod的執行時間duration是否大於`job.Spec.ActiveDeadlineSeconds`設定的值,如果大於,那麼會標記Pod終止的原因是DeadlineExceeded。
在job Controller的原始碼中,我們可以看到這部分的邏輯:
job Controller首先會去校驗任務是不是處理次數是不是超過了BackoffLimit設定,如果沒有超過的話就校驗有沒有設定ActiveDeadlineSeconds,如果設定了的話,就校驗當前job執行時間是否超過了ActiveDeadlineSeconds設定的的時間,超過了那麼會打上標記,表示這個job執行失敗。
```go
...
jobHaveNewFailure := failed > job.Status.Failed
exceedsBackoffLimit := jobHaveNewFailure && (active != *job.Spec.Parallelism) &&
(int32(previousRetry)+1 > *job.Spec.BackoffLimit)
if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) {
// check if the number of pod restart exceeds backoff (for restart OnFailure only)
// OR if the number of failed jobs increased since the last syncJob
jobFailed = true
failureReason = "BackoffLimitExceeded"
failureMessage = "Job has reached the specified backoff limit"
} else if pastActiveDeadline(&job) {
jobFailed = true
failureReason = "DeadlineExceeded"
failureMessage = "Job was active longer than specified deadline"
}
...
func pastActiveDeadline(job *batch.Job) bool {
if job.Spec.ActiveDeadlineSeconds == nil || job.Status.StartTime == nil {
return false
}
now := metav1.Now()
start := job.Status.StartTime.Time
duration := now.Time.Sub(start)
allowedDuration := time.Duration(*job.Spec.ActiveDeadlineSeconds) * time.Second
return duration >= allowedDuration
}
```
### Job的並行任務
在 Job 物件中,負責並行控制的引數有兩個:
1. `spec.parallelism`表示一個 Job 在任意時間最多可以啟動多少個 Pod 同時執行;
2. `spec.completions`表示Job 的最小完成數。
舉例:
```yaml
apiVersion: batch/v1
kind: Job
metadata:
name: pi
spec:
parallelism: 2
completions: 4
template:
spec:
containers:
- name: pi
image: perl
command: ["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"]
restartPolicy: Never
backoffLimit: 4
```
在建立任務之後,我們可以看到最多隻會有兩個Pod同時執行:
```shell
$ kubectl get pod
NAME READY STATUS RESTARTS AGE
pi-8fsrn 0/1 ContainerCreating 0 30s
pi-job-67kwg 0/1 Completed 0 14h
pi-wlbm5 0/1 ContainerCreating 0 30s
```
每當有一個 Pod 完成計算進入 Completed 狀態時,就會有一個新的 Pod 被自動創建出來,並且快速地從 Pending 狀態進入到 ContainerCreating 狀態。
最終我們可以看到job的COMPLETIONS會標記全部完成:
```shell
$ kubectl get job
NAME COMPLETIONS DURATION AGE
pi 4/4 2m52s 2m52s
```
Job Controller中會會根據配置的併發數來確認當前處於 active 的 pods 數量是否合理,如果不合理的話則進行調整。
如果處於 active 狀態的 pods 數大於 job 設定的併發數 `job.Spec.Parallelism`,則併發刪除多餘的 active pods。
### Job原始碼分析
通過上面的使用例子,我們可以看到job的使用時非常的簡單的,下面我們通過原始碼來理解一下這job的執行邏輯。
核心原始碼位置在job_controller.go中Controller類的syncJob方法中:
syncJob方法很長,我還是想要將這個方法拆開來進行說明。
**Controller#syncJob**
```go
func (jm *Controller) syncJob(key string) (bool, error) {
...
job := *sharedJob
// if job was finished previously, we don't want to redo the termination
// 如果job已經跑完了,那麼直接返回,避免重跑
if IsJobFinished(&job) {
return true, nil
}
// retrieve the previous number of retry
// 獲取job的重試次數
previousRetry := jm.queue.NumRequeues(key)
jobNeedsSync := jm.expectations.SatisfiedExpectations(key)
//獲取這個job的pod列表
pods, err := jm.getPodsForJob(&job)
if err != nil {
return false, err
}
//找到這個job中仍然活躍的pod
activePods := controller.FilterActivePods(pods)
active := int32(len(activePods))
//獲取job中執行成功的pod數和執行失敗的pod數
succeeded, failed := getStatus(pods)
conditions := len(job.Status.Conditions)
// job first start
//設定job 的啟動時間
if job.Status.StartTime == nil {
now := metav1.Now()
job.Status.StartTime = &now
// enqueue a sync to check if job past ActiveDeadlineSeconds
if job.Spec.ActiveDeadlineSeconds != nil {
klog.V(4).Infof("Job %s has ActiveDeadlineSeconds will sync after %d seconds",
key, *job.Spec.ActiveDeadlineSeconds)
jm.queue.AddAfter(key, time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second)
}
}
...
}
```
這部分的程式碼會校驗job是否已經跑完了,如果跑完了直接返回;
然後獲取job的重試次數,以及與job關聯的pod列表,並計算出活躍的pod數量、執行成功的pod數量、以及失敗的pod數量;
接下來如果job是首次啟動,那麼需要設定job的啟動時間。
繼續:
```go
func (jm *Controller) syncJob(key string) (bool, error) {
...
var manageJobErr error
jobFailed := false
var failureReason string
var failureMessage string
//failed次數超過了job.Status.Failed說明有新的pod執行失敗了
jobHaveNewFailure := failed > job.Status.Failed
// new failures happen when status does not reflect the failures and active
// is different than parallelism, otherwise the previous controller loop
// failed updating status so even if we pick up failure it is not a new one
//如果有新的pod執行失敗,並且活躍的pod不等於並行Parallelism數
//並且重試次數超過了BackoffLimit
exceedsBackoffLimit := jobHaveNewFailure && (active != *job.Spec.Parallelism) &&
(int32(previousRetry)+1 > *job.Spec.BackoffLimit)
//重試次數是否超標
if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) {
// check if the number of pod restart exceeds backoff (for restart OnFailure only)
// OR if the number of failed jobs increased since the last syncJob
jobFailed = true
failureReason = "BackoffLimitExceeded"
failureMessage = "Job has reached the specified backoff limit"
// job執行時間是否超過了ActiveDeadlineSeconds
} else if pastActiveDeadline(&job) {
jobFailed = true
failureReason = "DeadlineExceeded"
failureMessage = "Job was active longer than specified deadline"
}
...
}
```
這段程式碼是用來判斷job是否執行失敗,判斷依據是job重試次數是否超過了BackoffLimit,以及job的執行時間是否超過了設定的ActiveDeadlineSeconds。
上面這裡會獲取上一次執行的Failed次數和這次的job的failed次數進行比較,如果failed多了表示又產生了新的執行失敗的pod。如果執行失敗會標識出失敗原因,以及設定jobFailed為true。
在上面的程式碼中呼叫了pastBackoffLimitOnFailure方法和pastActiveDeadline方法,我們分別看一下:
**pastBackoffLimitOnFailure**
```go
func pastBackoffLimitOnFailure(job *batch.Job, pods []*v1.Pod) bool {
//如果RestartPolicy為OnFailure,那麼直接返回
if job.Spec.Template.Spec.RestartPolicy != v1.RestartPolicyOnFailure {
return false
}
result := int32(0)
for i := range pods {
po := pods[i]
//如果pod狀態為Running或Pending
//獲取到pod對應的重啟次數以及Container狀態,包含pod中的InitContainer
if po.Status.Phase == v1.PodRunning || po.Status.Phase == v1.PodPending {
for j := range po.Status.InitContainerStatuses {
stat := po.Status.InitContainerStatuses[j]
result += stat.RestartCount
}
for j := range po.Status.ContainerStatuses {
stat := po.Status.ContainerStatuses[j]
result += stat.RestartCount
}
}
}
//如果BackoffLimit等於,那麼只要重啟了一次,則返回true
if *job.Spec.BackoffLimit == 0 {
return result > 0
}
//比較重啟次數是否超過了BackoffLimit
return result >= *job.Spec.BackoffLimit
}
```
這個方法會校驗job的RestartPolicy策略,不是OnFailure才繼續往下執行。然後會遍歷pod列表,將pod列表中的重啟次數累加並與BackoffLimit進行比較,超過了則返回true。
**pastActiveDeadline**
```go
func pastActiveDeadline(job *batch.Job) bool {
if job.Spec.ActiveDeadlineSeconds == nil || job.Status.StartTime == nil {
return false
}
now := metav1.Now()
start := job.Status.StartTime.Time
duration := now.Time.Sub(start)
allowedDuration := time.Duration(*job.Spec.ActiveDeadlineSeconds) * time.Second
return duration >= allowedDuration
}
```
這個方法會算出job的執行時間duration,然後和ActiveDeadlineSeconds進行比較,如果超過了則返回true。
我們回到syncJob中繼續往下:
```go
func (jm *Controller) syncJob(key string) (bool, error) {
...
//job執行失敗
if jobFailed {
errCh := make(chan error, active)
//將job裡面的active的pod刪除
jm.deleteJobPods(&job, activePods, errCh)
select {
case manageJobErr = <-errCh:
if manageJobErr != nil {
break
}
default:
}
// update status values accordingly
//清空active數
failed += active
active = 0
job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, failureReason, failureMessage))
jm.recorder.Event(&job, v1.EventTypeWarning, failureReason, failureMessage)
} else {
//如果job需要同步,並且job沒有被刪除,則呼叫manageJob進行同步工作
if jobNeedsSync && job.DeletionTimestamp == nil {
active, manageJobErr = jm.manageJob(activePods, succeeded, &job)
}
//完成數等於pod 執行成功的數量
completions := succeeded
complete := false
//如果沒有設定Completions,那麼只要有pod完成,那麼job就算完成
if job.Spec.Completions == nil {
if succeeded > 0 && active == 0 {
complete = true
}
} else {
//如果實際完成數大於或等於Completions
if completions >= *job.Spec.Completions {
complete = true
//如果還有pod處於active狀態,傳送EventTypeWarning事件
if active > 0 {
jm.recorder.Event(&job, v1.EventTypeWarning, "TooManyActivePods", "Too many active pods running after completion count reached")
}
//如果實際完成數大於Completions,傳送EventTypeWarning事件
if completions > *job.Spec.Completions {
jm.recorder.Event(&job, v1.EventTypeWarning, "TooManySucceededPods", "Too many succeeded pods running after completion count reached")
}
}
}
//job完成了則更新 job.Status.Conditions 和 job.Status.CompletionTime 欄位
if complete {
job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete, "", ""))
now := metav1.Now()
job.Status.CompletionTime = &now
jm.recorder.Event(&job, v1.EventTypeNormal, "Completed", "Job completed")
}
}
...
}
```
這一段中會根據jobFailed的狀態進行判斷。
如果jobFailed為true則表示這個job執行失敗,需要刪除這個job關聯的所有pod,並且清空active數。
如果jobFailed為false則表示這個job處於非false狀態。如果job需要同步,並且job沒有被刪除,則呼叫manageJob進行同步工作;
接下來會對設定的Completions進行處理,如果Completions沒有設定,那麼只要有一個pod執行完畢,那麼這個pod就算完成;
如果實際完成的pod數量大於completions或仍然有pod處於active中,則傳送相應的事件資訊。最後更新job的狀態為完成。
我們接下來一口氣看看manageJob中這個同步方法裡面做了什麼,這個方法是job管理pod執行數量的核心方法:
**Controller#manageJob**
```go
func (jm *Controller) manageJob(activePods []*v1.Pod, succeeded int32, job *batch.Job) (int32, error) {
...
//如果處於 active 狀態的 pods 數大於 job 設定的併發數 job.Spec.Parallelism
if active > parallelism {
//多出的個數
diff := active - parallelism
errCh = make(chan error, diff)
jm.expectations.ExpectDeletions(jobKey, int(diff))
klog.V(4).Infof("Too many pods running job %q, need %d, deleting %d", jobKey, parallelism, diff)
//pods 排序,以便可以優先刪除一些pod:
// 判斷 pod 狀態:Not ready < ready
// 是否已經被排程:unscheduled< scheduled
//判斷 pod phase :pending < running
sort.Sort(controller.ActivePods(activePods))
active -= diff
wait := sync.WaitGroup{}
wait.Add(int(diff))
for i := int32(0); i < diff; i++ {
//併發刪除多餘的 active pods
go func(ix int32) {
defer wait.Done()
if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name, job); err != nil {
// Decrement the expected number of deletes because the informer won't observe this deletion
jm.expectations.DeletionObserved(jobKey)
if !apierrors.IsNotFound(err) {
klog.V(2).Infof("Failed to delete %v, decremented expectations for job %q/%q", activePods[ix].Name, job.Namespace, job.Name)
activeLock.Lock()
active++
activeLock.Unlock()
errCh <- err
utilruntime.HandleError(err)
}
}
}(i)
}
wait.Wait()
//若處於 active 狀態的 pods 數小於 job 設定的併發數,則需要創建出新的 pod
} else if active < parallelism {
wantActive := int32(0)
//如果沒有宣告Completions,那麼active的pod應該等於parallelism,如果有pod已經完成了,那麼不再建立新的。
if job.Spec.Completions == nil {
if succeeded > 0 {
wantActive = active
} else {
wantActive = parallelism
}
// 如果聲明瞭Completions,那麼需要比較Completions和succeeded
// 如果wantActive大於parallelism,那麼需要建立的Pod數等於parallelism
} else {
// Job specifies a specific number of completions. Therefore, number
// active should not ever exceed number of remaining completions.
wantActive = *job.Spec.Completions - succeeded
if wantActive > parallelism {
wantActive = parallelism
}
}
//計算出 diff 數
diff := wantActive - active
if diff < 0 {
utilruntime.HandleError(fmt.Errorf("More active than wanted: job %q, want %d, have %d", jobKey, wantActive, active))
diff = 0
}
//表示已經有足夠的pod,不需要再建立了
if diff == 0 {
return active, nil
}
jm.expectations.ExpectCreations(jobKey, int(diff))
errCh = make(chan error, diff)
klog.V(4).Infof("Too few pods running job %q, need %d, creating %d", jobKey, wantActive, diff)
active += diff
wait := sync.WaitGroup{}
//建立的 pod 數依次為 1、2、4、8......,呈指數級增長
for batchSize := int32(integer.IntMin(int(diff), controller.SlowStartInitialBatchSize)); diff > 0; batchSize = integer.Int32Min(2*batchSize, diff) {
errorCount := len(errCh)
wait.Add(int(batchSize))
for i := int32(0); i < batchSize; i++ {
//併發程建立pod
go func() {
defer wait.Done()
//建立pod
err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, metav1.NewControllerRef(job, controllerKind))
if err != nil {
...
}
//建立失敗的處理
if err != nil {
defer utilruntime.HandleError(err)
klog.V(2).Infof("Failed creation, decrementing expectations for job %q/%q", job.Namespace, job.Name)
jm.expectations.CreationObserved(jobKey)
activeLock.Lock()
active--
activeLock.Unlock()
errCh <- err
}
}()
}
wait.Wait()
...
diff -= batchSize
}
}
...
return active, nil
}
```
這個方法的邏輯十分的清晰,我們下面擼一擼~
這段程式碼在開始用一個if判斷來校驗active的pod是否超過了parallelism,如果超過了需要算出超過了多少,存在diff欄位中;然後需要刪除多餘的pod,不過這個時候有個細節的地方,這裡會根據pod的狀態進行排序,會首先刪除一些不是ready狀態、unscheduled、pending狀態的pod;
若active的pod小於parallelism,那麼首先需要判斷Completions,如果沒有被設定,並且已經有pod執行成功了,那麼不需要建立新的pod,否則還是需要建立pod至parallelism指定個數;如果設定了Completions,那麼還需要根據pod完成的數量來做一個判斷需要建立多少新的pod;
如果需要建立的pod數小於active的pod數,那麼直接返回即可;
接下來會在一個for迴圈中迴圈併發建立pod,不過建立的數量是依次指數遞增,避免一下子建立太多pod。
## 定時任務CronJob
### 基本使用
我們從一個例子開始,如下:
```yaml
apiVersion: batch/v1beta1
kind: CronJob
metadata:
name: hello
spec:
schedule: "*/1 * * * *"
jobTemplate:
spec:
template:
spec:
containers:
- name: hello
image: busybox
args:
- /bin/sh
- -c
- date; echo Hello from the Kubernetes cluster
restartPolicy: OnFailure
```
這個CronJob會每分鐘建立一個Pod:
```shell
$ kubectl get pod
NAME READY STATUS RESTARTS AGE
hello-1596406740-tqnlb 0/1 ContainerCreating 0 8s
```
cronjob會記錄最近的排程時間:
```shell
$ kubectl get cronjob hello
NAME SCHEDULE SUSPEND ACTIVE LAST SCHEDULE AGE
hello */1 * * * * False 1 16s 2m33s
```
**spec.concurrencyPolicy**
如果設定的間隔時間太短,那麼可能會導致任務還沒執行完成又建立了新的Pod。所以我們可以通過修改`spec.concurrencyPolicy`來定義處理策略:
* Allow,這也是預設情況,這意味著這些 Job 可以同時存在;
* Forbid,這意味著不會建立新的 Pod,該建立週期被跳過;
* Replace,這意味著新產生的 Job 會替換舊的、沒有執行完的 Job。
如果某一次 Job 建立失敗,這次建立就會被標記為“miss”。當在指定的時間視窗內,miss 的數目達到 100 時,那麼 CronJob 會停止再建立這個 Job。
`spec.startingDeadlineSeconds`可以指定這個時間視窗。startingDeadlineSeconds=200意味著過去 200 s 裡,如果 miss 的數目達到了 100 次,那麼這個 Job 就不會被建立執行了。
### cronjob原始碼分析
CronJob的原始碼在cronjob_controller.go中,主要實現是在Controller的syncAll方法中。
下面我們看看CronJob是在原始碼中如何建立執行的:
**Controller#syncAll**
```go
func (jm *Controller) syncAll() {
//列出所有的job
jobListFunc := func(opts metav1.ListOptions) (runtime.Object, error) {
return jm.kubeClient.BatchV1().Jobs(metav1.NamespaceAll).List(context.TODO(), opts)
}
js := make([]batchv1.Job, 0)
//遍歷jobListFunc然後將狀態正常的job放入到js集合中
err := pager.New(pager.SimplePageFunc(jobListFunc)).EachListItem(context.Background(), metav1.ListOptions{}, func(object runtime.Object) error {
jobTmp, ok := object.(*batchv1.Job)
if !ok {
return fmt.Errorf("expected type *batchv1.Job, got type %T", jobTmp)
}
js = append(js, *jobTmp)
return nil
})
...
//列出所有的cronJobs
cronJobListFunc := func(opts metav1.ListOptions) (runtime.Object, error) {
return jm.kubeClient.BatchV1beta1().CronJobs(metav1.NamespaceAll).List(context.TODO(), opts)
}
//遍歷所有的jobs,根據ObjectMeta.OwnerReference欄位確定該job是否由cronJob所建立
//key為uid,value為job集合
jobsByCj := groupJobsByParent(js)
klog.V(4).Infof("Found %d groups", len(jobsByCj))
//遍歷cronJobs
err = pager.New(pager.SimplePageFunc(cronJobListFunc)).EachListItem(context.Background(), metav1.ListOptions{}, func(object runtime.Object) error {
cj, ok := object.(*batchv1beta1.CronJob)
if !ok {
return fmt.Errorf("expected type *batchv1beta1.CronJob, got type %T", cj)
}
//進行同步
syncOne(cj, jobsByCj[cj.UID], time.Now(), jm.jobControl, jm.cjControl, jm.recorder)
//清理所有已經完成的jobs
cleanupFinishedJobs(cj, jobsByCj[cj.UID], jm.jobControl, jm.cjControl, jm.recorder)
return nil
})
if err != nil {
utilruntime.HandleError(fmt.Errorf("Failed to extract cronJobs list: %v", err))
return
}
}
```
syncAll方法會列出所有job以及對應的cronJobs,然後按照cronJobs來進行歸類,然後遍歷這個列表呼叫syncOne方法進行同步,之後再呼叫cleanupFinishedJobs清理所有已經完成的jobs。
然後我們在看看syncOne是具體怎麼處理job的:
**syncOne**
```go
func syncOne(cj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobControlInterface, cjc cjControlInterface, recorder record.EventRecorder) {
nameForLog := fmt.Sprintf("%s/%s", cj.Namespace, cj.Name)
childrenJobs := make(map[types.UID]bool)
//遍歷job列表
for _, j := range js {
childrenJobs[j.ObjectMeta.UID] = true
//檢視這個job是否是在Active列表中
found := inActiveList(*cj, j.ObjectMeta.UID)
//如果這個job不是在Active列表中,並且這個job還沒有跑完,傳送一個異常事件。
if !found && !IsJobFinished(&j) {
recorder.Eventf(cj, v1.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %s", j.Name)
// 如果該job在Active列表中,並且已經跑完了,那麼從Active列表移除
} else if found && IsJobFinished(&j) {
_, status := getFinishedStatus(&j)
deleteFromActiveList(cj, j.ObjectMeta.UID)
recorder.Eventf(cj, v1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %s, status: %v", j.Name, status)
}
}
//反向再遍歷Active列表,如果存在上面記錄的jobs,那麼就移除
for _, j := range cj.Status.Active {
if found := childrenJobs[j.UID]; !found {
recorder.Eventf(cj, v1.EventTypeNormal, "MissingJob", "Active job went missing: %v", j.Name)
deleteFromActiveList(cj, j.UID)
}
}
//上面做了cronJob的Active列表的修改,所以需要更新一下狀態
updatedCJ, err := cjc.UpdateStatus(cj)
if err != nil {
klog.Errorf("Unable to update status for %s (rv = %s): %v", nameForLog, cj.ResourceVersion, err)
return
}
*cj = *updatedCJ
//cronJob已經被刪除了,直接返回
if cj.DeletionTimestamp != nil {
return
}
//cronJob處於suspend,直接返回
if cj.Spec.Suspend != nil && *cj.Spec.Suspend {
klog.V(4).Infof("Not starting job for %s because it is suspended", nameForLog)
return
}
//獲取最近的排程時間
times, err := getRecentUnmetScheduleTimes(*cj, now)
if err != nil {
recorder.Eventf(cj, v1.EventTypeWarning, "FailedNeedsStart", "Cannot determine if job needs to be started: %v", err)
klog.Errorf("Cannot determine if %s needs to be started: %v", nameForLog, err)
return
}
//等於0說明還沒有開始排程
if len(times) == 0 {
klog.V(4).Infof("No unmet start times for %s", nameForLog)
return
}
if len(times) > 1 {
klog.V(4).Infof("Multiple unmet start times for %s so only starting last one", nameForLog)
}
//獲取列表中的最後一次時間
scheduledTime := times[len(times)-1]
tooLate := false
//如果設定了StartingDeadlineSeconds,那麼計算是否滿足條件
if cj.Spec.StartingDeadlineSeconds != nil {
tooLate = scheduledTime.Add(time.Second * time.Duration(*cj.Spec.StartingDeadlineSeconds)).Before(now)
}
if tooLate {
klog.V(4).Infof("Missed starting window for %s", nameForLog)
recorder.Eventf(cj, v1.EventTypeWarning, "MissSchedule", "Missed scheduled time to start a job: %s", scheduledTime.Format(time.RFC1123Z))
return
}
//處理concurrencyPolicy策略
//如果設定的是Forbid,並且Active列表大於0,直接return
if cj.Spec.ConcurrencyPolicy == batchv1beta1.ForbidConcurrent && len(cj.Status.Active) > 0 {
klog.V(4).Infof("Not starting job for %s because of prior execution still running and concurrency policy is Forbid", nameForLog)
return
}
//如果設定的是Replace,則刪除所有的Active列表,等後面重新建立
if cj.Spec.ConcurrencyPolicy == batchv1beta1.ReplaceConcurrent {
for _, j := range cj.Status.Active {
klog.V(4).Infof("Deleting job %s of %s that was still running at next scheduled start time", j.Name, nameForLog)
job, err := jc.GetJob(j.Namespace, j.Name)
if err != nil {
recorder.Eventf(cj, v1.EventTypeWarning, "FailedGet", "Get job: %v", err)
return
}
if !deleteJob(cj, job, jc, recorder) {
return
}
}
}
//根據cronJob.spec.JobTemplate填充job的完整資訊
jobReq, err := getJobFromTemplate(cj, scheduledTime)
if err != nil {
klog.Errorf("Unable to make Job from template in %s: %v", nameForLog, err)
return
}
//建立job
jobResp, err := jc.CreateJob(cj.Namespace, jobReq)
if err != nil {
if !errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
recorder.Eventf(cj, v1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err)
}
return
}
klog.V(4).Infof("Created Job %s for %s", jobResp.Name, nameForLog)
recorder.Eventf(cj, v1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name)
ref, err := getRef(jobResp)
if err != nil {
klog.V(2).Infof("Unable to make object reference for job for %s", nameForLog)
} else {
//把建立好的job資訊放入到Active列表中
cj.Status.Active = append(cj.Status.Active, *ref)
}
cj.Status.LastScheduleTime = &metav1.Time{Time: scheduledTime}
if _, err := cjc.UpdateStatus(cj); err != nil {
klog.Infof("Unable to update status for %s (rv = %s): %v", nameForLog, cj.ResourceVersion, err)
}
return
}
```
在syncOne維護了cronJob的Active列表,在遍歷cronJob對應的job列表的時候會判斷該job是不是應該從Active列表中刪除,操作完之後會更新cronJob的狀態。
然後會檢視當千的cronJob是否已被刪除、是否處於suspend狀態、判斷是否最近有job被排程,並獲取最後一次排程時間判斷是否滿足StartingDeadlineSeconds條件等。
接下來會根據ConcurrencyPolicy來判斷是Forbid還是Replace。如果是Forbid那麼直接略過此次排程,如果是Replace那麼會刪除所有的Active列表,等後面重新建立。
最後呼叫CreateJob建立job。
## 總結
這篇文章我們首先介紹了Job和CronJob的具體使用方法,以及其中需要注意的引數配置,然後通過原始碼來解釋相應的配置會產生什麼樣的結果。例如job來說,如果我們設定的completions小於parallelism,那麼在實際執行的時候實際完成的pod數量是可能超過completions的等等。通過原始碼我們對job以及cronjob也有了一個更好的理解。
## Reference
https://kubernetes.io/docs/concepts/workloads/controllers/job/
https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/
https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#example-states
https://kubernetes.feisky.xyz/concepts/objects/cronjob
https://kubernetes.feisky.xyz/concepts/objects/job
《深入理解k8s》
《k8s in Ac