1. 程式人生 > 其它 >Kubernetes定時任務的實現 Golang定時器——Timer 和 Ticker 

Kubernetes定時任務的實現 Golang定時器——Timer 和 Ticker 

1、概述

Kubernetes 的各個元件都有一定的定時任務,比如任務的定時輪詢、高可用的實現、日誌處理、快取使用等,Kubernetes 中的定時任務都是通過 wait 包實現的。

2、Golang 的定時任務

在講 Kubernetes 的 wait 包之前,先看下 Golang 應該怎麼實現一個定時任務。

Golang 中的 time 庫包含了很多和時間相關的工具,其中包括了 Ticker 和 Timer 兩個定時器。

  • Ticker 只要完成定義,從計時開始,不需要其他操作,每間隔固定時間便會觸發。

  • 而對於 Timer,在超時之後需要重置才能繼續觸發。

Golang定時任務詳細語法細節請參考:

Golang定時器——Timer 和 Ticker 

3、Kubernetes 的 wait 庫

3.1 常用 API

wait 庫中實現了多種常用的 API,以提供定時執行函式的能力。

定期執行一個函式,永不停止

var NeverStop <-chan struct{} = make(chan struct{})

// Forever calls f every period for ever.
//
// Forever is syntactic sugar on top of Until.
func Forever(f func(), period time.Duration) {
	Until(f, period, NeverStop)
}

該函式支援一個函式變數引數和一個間隔時間,該函式會定期執行,不會停止。

定期執行一個函式,可以接受停止訊號

// Until loops until stop channel is closed, running f every period.
//
// Until is syntactic sugar on top of JitterUntil with zero jitter factor and
// with sliding = true (which means the timer for period starts after the f
// completes).
func Until(f func(), period time.Duration, stopCh <-chan struct{}) {
	JitterUntil(f, period, 0.0, true, stopCh)
}

該函式支援提供一個函式變數引數、間隔時間和發生 stop 訊號的 channel,和 Forever 類似,不過可以通過向 stopCh 釋出訊息來停止。

定期檢查先決條件

// Poll tries a condition func until it returns true, an error, or the timeout
// is reached.
//
// Poll always waits the interval before the run of 'condition'.
// 'condition' will always be invoked at least once.
//
// Some intervals may be missed if the condition takes too long or the time
// window is too short.
//
// If you want to Poll something forever, see PollInfinite.
func Poll(interval, timeout time.Duration, condition ConditionFunc) error

該函式將以 interval 為間隔,定期檢查 condition 是否檢查成功。

3.2 核心程式碼

wait 包的定時任務 API 是基於 JitterUntil 實現的。

JitterUntil 的 5 個引數:

引數名 型別 作用
f func() 需要定時執行的邏輯函式
period time.Duration    定時任務的時間間隔
jitterFactor   float64 如果大於 0.0 間隔時間變為 duration 到 duration + maxFactor * duration 的隨機值
sliding bool 邏輯的執行時間是否不算入間隔時間,如果 sliding 為 true,則在 f() 執行之後計算週期。如果為 false,那麼 period 包含 f() 的執行時間。
stopCh <-chan struct{} 接受停止訊號的 channel
func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) {
	BackoffUntil(f, NewJitteredBackoffManager(period, jitterFactor, &clock.RealClock{}), sliding, stopCh)
}

func BackoffUntil(f func(), backoff BackoffManager, sliding bool, stopCh <-chan struct{}) {
	var t clock.Timer
	for {
		select {
		case <-stopCh:
			return
		default:
		}

		//例項化(NewTimer)或複用(Reset)原生定時器time.Timer進行週期性任務
		if !sliding {
			t = backoff.Backoff()
		}

		func() {
			defer runtime.HandleCrash()
			f()
		}()

		//例項化或複用(Reset)原生定時器time.Timer進行週期性任務
		if sliding {
			t = backoff.Backoff()
		}

		//每隔getNextBackoff()時間間隔觸發定時器
		select {
		case <-stopCh:
			return
		case <-t.C():
		}
	}
}

type jitteredBackoffManagerImpl struct {
	clock        clock.Clock
	duration     time.Duration
	jitter       float64
	backoffTimer clock.Timer
}

// NewJitteredBackoffManager returns a BackoffManager that backoffs with given duration plus given jitter. If the jitter
// is negative, backoff will not be jittered.
func NewJitteredBackoffManager(duration time.Duration, jitter float64, c clock.Clock) BackoffManager {
	return &jitteredBackoffManagerImpl{
		clock:        c,
		duration:     duration,   //最少要延遲多久
		jitter:       jitter,     //給定抖動範圍
		backoffTimer: nil,        //退避計時器,一開始不需要初始化backoffTimer,會由使用者呼叫Backoff方法時由計算後再賦值
	}
}

func (j *jitteredBackoffManagerImpl) Backoff() clock.Timer {
	backoff := j.getNextBackoff()
	//例項化原生定時器time.Timer進行週期性任務
	if j.backoffTimer == nil {
		j.backoffTimer = j.clock.NewTimer(backoff)
	} else {
		//複用timer
		j.backoffTimer.Reset(backoff)
	}
	return j.backoffTimer
}


//計算延遲時間
func (j *jitteredBackoffManagerImpl) getNextBackoff() time.Duration {
	jitteredPeriod := j.duration
	if j.jitter > 0.0 {
		jitteredPeriod = Jitter(j.duration, j.jitter)
	}
	return jitteredPeriod
}

//計算抖動延遲時間
func Jitter(duration time.Duration, maxFactor float64) time.Duration {
	if maxFactor <= 0.0 {
		maxFactor = 1.0
	}
	wait := duration + time.Duration(rand.Float64()*maxFactor*float64(duration))
	return wait
}

//clock包進行對time.Timer做了一層封裝實現,本文只列一下time.Timer例項化方法
type RealClock struct{}

// NewTimer returns a new Timer.
func (RealClock) NewTimer(d time.Duration) Timer {
	return &realTimer{
		timer: time.NewTimer(d),
	}
}

參考:https://blog.ihypo.net/15707766752878.html