Kubernetes定時任務的實現 Golang定時器——Timer 和 Ticker
阿新 • • 發佈:2022-03-17
1、概述
Kubernetes 的各個元件都有一定的定時任務,比如任務的定時輪詢、高可用的實現、日誌處理、快取使用等,Kubernetes 中的定時任務都是通過 wait 包實現的。
2、Golang 的定時任務
在講 Kubernetes 的 wait 包之前,先看下 Golang 應該怎麼實現一個定時任務。
Golang 中的 time 庫包含了很多和時間相關的工具,其中包括了 Ticker 和 Timer 兩個定時器。
-
Ticker 只要完成定義,從計時開始,不需要其他操作,每間隔固定時間便會觸發。
-
而對於 Timer,在超時之後需要重置才能繼續觸發。
Golang定時任務詳細語法細節請參考: Golang定時器——Timer 和 Ticker 。
3、Kubernetes 的 wait 庫
3.1 常用 API
wait 庫中實現了多種常用的 API,以提供定時執行函式的能力。
定期執行一個函式,永不停止
var NeverStop <-chan struct{} = make(chan struct{}) // Forever calls f every period for ever. // // Forever is syntactic sugar on top of Until. func Forever(f func(), period time.Duration) { Until(f, period, NeverStop) }
該函式支援一個函式變數引數和一個間隔時間,該函式會定期執行,不會停止。
定期執行一個函式,可以接受停止訊號
// Until loops until stop channel is closed, running f every period. // // Until is syntactic sugar on top of JitterUntil with zero jitter factor and // with sliding = true (which means the timer for period starts after the f // completes). func Until(f func(), period time.Duration, stopCh <-chan struct{}) { JitterUntil(f, period, 0.0, true, stopCh) }
該函式支援提供一個函式變數引數、間隔時間和發生 stop 訊號的 channel,和 Forever 類似,不過可以通過向 stopCh 釋出訊息來停止。
定期檢查先決條件
// Poll tries a condition func until it returns true, an error, or the timeout // is reached. // // Poll always waits the interval before the run of 'condition'. // 'condition' will always be invoked at least once. // // Some intervals may be missed if the condition takes too long or the time // window is too short. // // If you want to Poll something forever, see PollInfinite. func Poll(interval, timeout time.Duration, condition ConditionFunc) error
該函式將以 interval 為間隔,定期檢查 condition 是否檢查成功。
3.2 核心程式碼
wait 包的定時任務 API 是基於 JitterUntil
實現的。
JitterUntil
的 5 個引數:
引數名 | 型別 | 作用 |
---|---|---|
f | func() |
需要定時執行的邏輯函式 |
period | time.Duration |
定時任務的時間間隔 |
jitterFactor | float64 |
如果大於 0.0 間隔時間變為 duration 到 duration + maxFactor * duration 的隨機值 |
sliding | bool |
邏輯的執行時間是否不算入間隔時間,如果 sliding 為 true,則在 f() 執行之後計算週期。如果為 false,那麼 period 包含 f() 的執行時間。 |
stopCh | <-chan struct{} |
接受停止訊號的 channel |
func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) { BackoffUntil(f, NewJitteredBackoffManager(period, jitterFactor, &clock.RealClock{}), sliding, stopCh) } func BackoffUntil(f func(), backoff BackoffManager, sliding bool, stopCh <-chan struct{}) { var t clock.Timer for { select { case <-stopCh: return default: } //例項化(NewTimer)或複用(Reset)原生定時器time.Timer進行週期性任務 if !sliding { t = backoff.Backoff() } func() { defer runtime.HandleCrash() f() }() //例項化或複用(Reset)原生定時器time.Timer進行週期性任務 if sliding { t = backoff.Backoff() } //每隔getNextBackoff()時間間隔觸發定時器 select { case <-stopCh: return case <-t.C(): } } } type jitteredBackoffManagerImpl struct { clock clock.Clock duration time.Duration jitter float64 backoffTimer clock.Timer } // NewJitteredBackoffManager returns a BackoffManager that backoffs with given duration plus given jitter. If the jitter // is negative, backoff will not be jittered. func NewJitteredBackoffManager(duration time.Duration, jitter float64, c clock.Clock) BackoffManager { return &jitteredBackoffManagerImpl{ clock: c, duration: duration, //最少要延遲多久 jitter: jitter, //給定抖動範圍 backoffTimer: nil, //退避計時器,一開始不需要初始化backoffTimer,會由使用者呼叫Backoff方法時由計算後再賦值 } } func (j *jitteredBackoffManagerImpl) Backoff() clock.Timer { backoff := j.getNextBackoff() //例項化原生定時器time.Timer進行週期性任務 if j.backoffTimer == nil { j.backoffTimer = j.clock.NewTimer(backoff) } else { //複用timer j.backoffTimer.Reset(backoff) } return j.backoffTimer } //計算延遲時間 func (j *jitteredBackoffManagerImpl) getNextBackoff() time.Duration { jitteredPeriod := j.duration if j.jitter > 0.0 { jitteredPeriod = Jitter(j.duration, j.jitter) } return jitteredPeriod } //計算抖動延遲時間 func Jitter(duration time.Duration, maxFactor float64) time.Duration { if maxFactor <= 0.0 { maxFactor = 1.0 } wait := duration + time.Duration(rand.Float64()*maxFactor*float64(duration)) return wait } //clock包進行對time.Timer做了一層封裝實現,本文只列一下time.Timer例項化方法 type RealClock struct{} // NewTimer returns a new Timer. func (RealClock) NewTimer(d time.Duration) Timer { return &realTimer{ timer: time.NewTimer(d), } }