Kubernetes原始碼client-go的workqueue
一、佇列介紹
client-go為什麼要實現佇列呢?在client-go中,佇列有點類似於golang中的chan,主要用於開發並行程式間的資料同步。比如各種Controller通過client-go的informer監聽物件變化,當有資源變化時通過回撥函式將資源寫入佇列中,再由其他的協程完成處理。這裡肯定有人會問為什麼不用chan呢?原因很簡單,就是chan的功能過於單一,無法滿足各類場景的需求,比如限制資料佇列的寫入速度。
由於kubernetes很多模組都有佇列的需求,而且很多需求都一樣,所以統一實現在了client-go中,不僅可以用於kubernetes內部,同時可以供呼叫client-go的模組使用。
client-go中抽象了幾種佇列,包括通用佇列、延時佇列、限速佇列等,後面的章節會逐一介紹各種佇列。
二、通用佇列
好啦,話不多說,直接上程式碼,看看client-go中的通用佇列是怎麼定義的:
// 程式碼源自client-go/util/workqueue/queue.go // 這是一個interface型別,說明有其他的各種各樣的實現 type Interface interface { Add(item interface{}) // 向佇列中新增一個元素,interface{}型別,說明可以新增任何型別的元素 Len() int // 佇列長度,就是元素的個數 Get() (item interface{}, shutdown bool) // 從佇列中獲取一個元素,雙返回值,這個和chan的<-很像,第二個返回值告知佇列是否已經關閉了 Done(item interface{}) // 告知佇列該元素已經處理完了 ShutDown() // 關閉佇列,呼叫此方法將關閉此佇列 ShuttingDown() bool // 查詢佇列是否正在關閉,如果佇列關閉返回true,如果佇列沒有關閉返回false }
感覺佇列和chan很像,尤其是Get()函式和<-都是兩個返回值,第一個返回元素,第二個告知佇列是否已經關閉。但是佇列和chan不同的地方在於Get()返回了元素但是不會從佇列中移除,只有執行Done()函式後佇列才會執行移除操作,而chan的<-呼叫直接從chan的物件中刪除了。
看過了通用佇列的抽象定義,本章節將會對client-go中實現的通用佇列做說明,說明之前先看看幾個重要的型別:
// 程式碼源於client-go/util/workqueue/queue.go type Type struct { queue []t // 元素陣列 dirty set // dirty的元素集合 processing set // 正在處理的元素集合 cond *sync.Cond shuttingDown bool // 關閉標記 metrics queueMetrics // 這個metrics和prometheus的metrics概念相同,此處不做過多說明,知道功能就行
...... } type empty struct{} // 空型別,因為sizeof(struct{})=0 type t interface{} // 元素型別是泛型 type set map[t]empty // 用map實現的set,所有的value是空資料就行了
從client-go.util.workqueue.Type(總感覺帶上包名可讀性好點,否則Type感覺不咋友好~)的定義對於有經驗的工程師來說基本想象出實現方法,算是比較簡單的功能。Type.dirty的存在又多了一點神祕性,我們一步一步揭開真相。
我們先看看Add()函式的實現:
// 程式碼源自client-go/util/workqueue/queue.go func (q *Type) Add(item interface{}) { // golang的cond自帶了互斥鎖 q.cond.L.Lock() defer q.cond.L.Unlock() // 佇列如果關閉,直接返回 if q.shuttingDown { return } // 已經標記為髒的資料,也直接返回,因為儲存在了髒資料的集合中 if q.dirty.has(item) { return } // 告知metrics添加了元素 q.metrics.add(item) // 新增到髒資料集合中 q.dirty.insert(item) // 元素剛被拿走處理,那就直接返回 if q.processing.has(item) { return } // 追加到元素陣列的尾部 q.queue = append(q.queue, item) // 通知有新元素到了,此時有協程阻塞就會被喚醒 q.cond.Signal() } func (s set) has(item t) bool { _, exists := s[item] return exists }
讓我們分析一下,佇列新增元素有幾種狀態:
- 佇列關閉了,所以不接受任何資料,上面程式碼也是這麼實現的;
- 佇列中沒有該元素,那就直接儲存在佇列中;佇列中已經有了該元素,這個該如何判斷?set(map[t]empty)型別肯定最快,陣列需要遍歷效率太低,這也是dirty存在的價值之一,上面的程式碼也通過dirty判斷元素是否存在的;
- 佇列曾經儲存過該元素,但是已經被拿走還沒有呼叫Done()時,也就是正在處理中的元素,此時再添加當前的元素應該是最新鮮的,處理中的應該是過時的,也就是髒的,我認為dirty的來源就是這個吧~(結合資源controller解釋下這句話:資源controller方法邏輯中先拿走元素,即呼叫佇列的Get方法,然後根據拿出的元素執行控制迴圈邏輯處理元素,即呼叫資源controller的reconcile方法,在執行控制迴圈邏輯期間又添加了當前元素,所以認為處理中的是過時的,也就是髒的)
綜合以上幾種狀態就比較好理解dirty的存在了,正常情況下元素會只會在processing和dirty存在一份,同時存在就說明該元素在被處理的同時又被添加了一次,那麼先前的那次可以理解為髒的,後續新增的要再被處理。
我們再來看看Get()函式是如何實現的:
// 程式碼源自client-go/util/workqueue/queue.go func (q *Type) Get() (item interface{}, shutdown bool) { // 加鎖解鎖不解釋 q.cond.L.Lock() defer q.cond.L.Unlock() // 沒有資料,阻塞協程 for len(q.queue) == 0 && !q.shuttingDown { q.cond.Wait() } // 協程被啟用但還沒有資料,說明佇列被關閉了,這個和chan一樣 if len(q.queue) == 0 { return nil, true } // 彈出第一個元素 item, q.queue = q.queue[0], q.queue[1:] // 通知metrics元素被取走了 q.metrics.get(item) // 從dirty集合中移除,加入到processing集合,經過前面的分析這裡就很好理解了 q.processing.insert(item) q.dirty.delete(item) return item, false }
由於有Add()函式後面的小總結,再看Get()函式就輕鬆很多了,而且程式碼註釋就已經足夠了,不在廢話了,最後我們看看Done()函式的實現:
// 程式碼源自client-go/util/workqueue/queue.go func (q *Type) Done(item interface{}) { // 加鎖解鎖不解釋 q.cond.L.Lock() defer q.cond.L.Unlock() // 通知metrics元素處理完了 q.metrics.done(item) // 從processing集合中刪除 q.processing.delete(item) // 重點來啦,此處判斷髒元素集合,看看處理期間是不是又被新增,如果是那就在放到佇列中,完全符合我們的分析 if q.dirty.has(item) { q.queue = append(q.queue, item) q.cond.Signal() } }
至於其他的函式Len(),Shutdown()和ShuttingDown()過於簡單,底下貼上一下程式碼就不做說明了。
func (q *Type) Len() int { q.cond.L.Lock() defer q.cond.L.Unlock() return len(q.queue) } func (q *Type) ShutDown() { q.cond.L.Lock() defer q.cond.L.Unlock() q.shuttingDown = true // 喚醒 q.cond.Broadcast() } func (q *Type) ShuttingDown() bool { q.cond.L.Lock() defer q.cond.L.Unlock() return q.shuttingDown }
三、延時佇列
client-go中對於延時佇列的抽象如下:
// 程式碼源自client-go/util/workqueue/delaying_queue.go type DelayingInterface interface { Interface // 繼承了通用佇列所有介面 AddAfter(item interface{}, duration time.Duration) // 增加了延遲新增的介面 }
從延時佇列的抽象來看,和通用佇列基本一樣,只是多了延遲新增的介面,也就增加了一些機制實現元素的延遲新增,這一點可以從延時佇列的實現型別上可以看出:
// 程式碼源自client-go/util/workqueue/delaying_queue.go type delayingType struct { Interface // 通用佇列的實現 clock clock.Clock // 時鐘,用於獲取時間 stopCh chan struct{} // 延時就意味著非同步,就要有另一個協程處理,所以需要退出訊號 stopOnce sync.Once heartbeat clock.Ticker // 定時器,在沒有任何資料操作時可以定時的喚醒處理協程,定義為心跳沒毛病 waitingForAddCh chan *waitFor // 所有延遲新增的元素封裝成waitFor放到chan中 metrics retryMetrics // 和通用佇列中的metrics功能類似 } // type waitFor struct { data t // 元素資料,這個t就是在通用佇列中定義的型別interface{} readyAt time.Time // 在什麼時間新增到佇列中 index int // 這是個索引,後面會詳細說明 }
在分析延時佇列如何利用上面定義的資料結構實現功能之前我們需要了解另一個東西:
// 程式碼源自client-go/util/workqueue/delaying_queue.go // waitFor的定義上面有,是需要延時新增的元素都要封裝成這個型別 // waitForPriorityQueue就把需要延遲的元素形成了一個佇列,佇列按照元素的延時新增的時間(readyAt)從小到大排序 // 實現的策略就是實現了go/src/container/heap/heap.go中的Interface型別,讀者可以自行了解heap // 這裡只需要知道waitForPriorityQueue這個陣列是有序的,排序方式是按照時間從小到大 type waitForPriorityQueue []*waitFor // heap需要實現的介面,告知佇列長度 func (pq waitForPriorityQueue) Len() int { return len(pq) } // heap需要實現的介面,告知第i個元素是否比第j個元素小 func (pq waitForPriorityQueue) Less(i, j int) bool { return pq[i].readyAt.Before(pq[j].readyAt) // 此處對比的就是時間,所以排序按照時間排序 } // heap需要實現的介面,實現第i和第j個元素換 func (pq waitForPriorityQueue) Swap(i, j int) { pq[i], pq[j] = pq[j], pq[i] pq[i].index = i // 因為heap沒有索引,所以需要自己記錄索引,這也是為什麼waitFor定義索引引數的原因 pq[j].index = j } // heap需要實現的介面,用於向佇列中新增資料 func (pq *waitForPriorityQueue) Push(x interface{}) { n := len(*pq) item := x.(*waitFor) item.index = n // 記錄索引值 *pq = append(*pq, item) // 放到了陣列尾部 } // heap需要實現的介面,用於從佇列中彈出最後一個數據 func (pq *waitForPriorityQueue) Pop() interface{} { n := len(*pq) item := (*pq)[n-1] item.index = -1 *pq = (*pq)[0:(n - 1)] // 縮小陣列,去掉了最後一個元素 return item } // 返回第一個元素 func (pq waitForPriorityQueue) Peek() interface{} { return pq[0] }
因為延時佇列利用waitForPriorityQueue管理所有延時新增的元素,所有的元素在waitForPriorityQueue中按照時間從小到大排序,對於延時佇列的處理就會方便很多了。
接下來我們就可以分析延時佇列的實現了,因為延時佇列整合通用佇列,所以這裡只對新增的函式做說明:
// 程式碼源自client-go/util/workqueue/delaying_queue.go func (q *delayingType) AddAfter(item interface{}, duration time.Duration) { // 如果佇列關閉就直接退出 if q.ShuttingDown() { return } // 記錄metrics q.metrics.retry() // 不需要延遲,那就直接像通用佇列一樣新增 if duration <= 0 { q.Add(item) return } // 把元素封裝成waitFor傳入chan,切記select沒有default,所以可能會被阻塞 // 這裡面用到了stopChan,因為有阻塞的可能,所以用stopChan可以保證退出 select { case <-q.stopCh: case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}: } }
AddAfter()就是簡單把元素送到chan中,所以核心實現在從chan中獲取資料那部分,獲取資料部分方法在建立延遲佇列例項後就會執行
func NewDelayingQueueWithCustomClock(clock clock.Clock, name string) DelayingInterface { ret := &delayingType{ Interface: NewNamed(name), clock: clock, heartbeat: clock.NewTicker(maxWait), stopCh: make(chan struct{}), waitingForAddCh: make(chan *waitFor, 1000), metrics: newRetryMetrics(name), } go ret.waitingLoop() return ret }
從chan中獲取資料那部分方法邏輯如下所示:
// 程式碼源自client-go/util/workqueue/delaying_queue.go // 這部分就是演示佇列的核心程式碼 func (q *delayingType) waitingLoop() { defer utilruntime.HandleCrash() // 這個變數後面會用到,當沒有元素需要延時新增的時候利用這個變數實現長時間等待 never := make(<-chan time.Time) // 構造我們上面提到的有序隊列了,並且初始化 waitingForQueue := &waitForPriorityQueue{} heap.Init(waitingForQueue) // 這個map是用來避免物件重複新增的,如果重複新增就只更新時間 waitingEntryByData := map[t]*waitFor{} // 開始無限迴圈 for { // 佇列關閉了,就可以返回了 if q.Interface.ShuttingDown() { return } // 獲取當前時間 now := q.clock.Now() // 有序佇列中是否有元素,有人肯定會問還沒向有序佇列裡新增呢判斷啥啊?後面會有新增哈 for waitingForQueue.Len() > 0 { // Peek函式我們前面註釋了,獲取第一個元素,注意:不會從佇列中取出哦 entry := waitingForQueue.Peek().(*waitFor) // 元素指定新增的時間過了麼?如果沒有過那就跳出迴圈 if entry.readyAt.After(now) { break } // 既然時間已經過了,那就把它從有序佇列拿出來放入通用佇列中,這裡面需要注意幾點: // 1.heap.Pop()彈出的是第一個元素,waitingForQueue.Pop()彈出的是最後一個元素 // 2.從有序佇列把元素彈出,同時要把元素從上面提到的map刪除,因為不用再判斷重複添加了 // 3.此處是唯一一個地方把元素從有序佇列移到通用佇列,後面主要是等待時間到過程 entry = heap.Pop(waitingForQueue).(*waitFor) q.Add(entry.data) delete(waitingEntryByData, entry.data) } // 如果有序佇列中沒有元素,那就不用等一段時間了,也就是永久等下去 // 如果有序佇列中有元素,那就用第一個元素指定的時間減去當前時間作為等待時間,邏輯挺簡單 // 有序佇列是用時間排序的,後面的元素需要等待的時間更長,所以先處理排序靠前面的元素 nextReadyAt := never if waitingForQueue.Len() > 0 { entry := waitingForQueue.Peek().(*waitFor) nextReadyAt = q.clock.After(entry.readyAt.Sub(now)) } // 進入各種等待 select { // 有退出訊號麼? case <-q.stopCh: return // 定時器,每過一段時間沒有任何資料,那就再執行一次大迴圈,從理論上講這個沒用,但是這個具備容錯能力,避免BUG死等 case <-q.heartbeat.C(): // 這個就是有序佇列裡面需要等待時間訊號了,時間到就會有訊號 case <-nextReadyAt: // 這裡是從chan中獲取元素的,AddAfter()放入chan中的元素 case waitEntry := <-q.waitingForAddCh: // 如果時間已經過了就直接放入通用佇列,沒過就插入到有序佇列 if waitEntry.readyAt.After(q.clock.Now()) { insert(waitingForQueue, waitingEntryByData, waitEntry) } else { q.Add(waitEntry.data) } // 下面的程式碼看似有點多,目的就是把chan中的元素一口氣全部取乾淨,注意用了default意味著chan中沒有資料就會立刻停止 drained := false for !drained { select { case waitEntry := <-q.waitingForAddCh: if waitEntry.readyAt.After(q.clock.Now()) { insert(waitingForQueue, waitingEntryByData, waitEntry) } else { q.Add(waitEntry.data) } default: drained = true } } } } } // 下面的程式碼是把元素插入有序佇列的實現 func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) { // 看看元素是不是被新增過?如果新增過看誰的時間靠前就用誰的時間 existing, exists := knownEntries[entry.data] if exists { if existing.readyAt.After(entry.readyAt) { existing.readyAt = entry.readyAt heap.Fix(q, existing.index) } return } // 把元素放入有序佇列中,並記錄在map裡面,這個map就是上面那個用於判斷物件是否重複新增的map // 注意,這裡面呼叫的是heap.Push,不是waitForPriorityQueue.Push heap.Push(q, entry) knownEntries[entry.data] = entry }
到這裡延時佇列核心程式碼基本分析完了,其重要的一點就是golang的heap,他輔助實現了元素按時間先後進行排序,這樣延時佇列就可以一個一個的等待超時添加了。
四、限速佇列
限速佇列應用非常廣泛,比如我們做某些操作失敗時希望重試幾次,但是立刻重試很有可能還會失敗,我們希望延遲一段時間在重試,而且失敗次數越多延遲時間越長,這個時候就有限速的概念在裡面了。在分析限速佇列前,我們需要知道限速器。
限速器
限速器是client-go的一種抽象,具體實現可以有很多種,比如比較極端的就是不限制任何速度,我們來看看限速器的抽象是如何定義的:
// 程式碼源自client-go/util/workqueue/default_rate_limiter.go type RateLimiter interface { When(item interface{}) time.Duration // 返回元素需要等待多長時間 Forget(item interface{}) // 拋棄該元素,意味著該元素已經被處理了 NumRequeues(item interface{}) int // 元素放入佇列多少次了 }
如果對於限速器還沒有比較直觀的認識,我們可以通過幾個實際的限速器加深認識。
ItemExponentialFailureRateLimiter
ItemExponentialFailureRateLimiter是比較常用的限速器,他會根據元素錯誤次數逐漸累加等待時間,具體實現如下:
// 程式碼源自client-go/util/workqueue/default_rate_limiters.go // 限速器的定義 type ItemExponentialFailureRateLimiter struct { failuresLock sync.Mutex // 互斥鎖 failures map[interface{}]int // 記錄每個元素錯誤次數,每呼叫一次When累加一次 baseDelay time.Duration // 元素延遲基數,演算法後面會有說明 maxDelay time.Duration // 元素最大的延遲時間 } // 實現限速器的When介面 func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration { r.failuresLock.Lock() defer r.failuresLock.Unlock() // 累加錯誤計數,比較好理解 exp := r.failures[item] r.failures[item] = r.failures[item] + 1 // 通過錯誤次數計算延遲時間,公式是2^i * baseDelay,按指數遞增,符合Exponential名字 backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp)) if backoff > math.MaxInt64 { return r.maxDelay } // 計算後的延遲值和最大延遲值二者取最小值 calculated := time.Duration(backoff) if calculated > r.maxDelay { return r.maxDelay } return calculated } // 實現限速器的NumRequeues介面,很簡單,沒什麼好說的 func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int { r.failuresLock.Lock() defer r.failuresLock.Unlock() return r.failures[item] } // 實現限速器的Forget介面,也很簡單,沒什麼好說的 func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) { r.failuresLock.Lock() defer r.failuresLock.Unlock() delete(r.failures, item) }
使用ItemExponentialFailureRateLimiter的可以認為是錯誤後不斷嘗試的過程,而且隨著嘗試次數的增加按照指數增加延遲時間。
ItemFastSlowRateLimiter
ItemFastSlowRateLimiter和ItemExponentialFailureRateLimiter很像,都是用於錯誤嘗試的,但是ItemFastSlowRateLimiter的限速策略是嘗試次數超過閾值用長延遲,否則用短延遲。我們來看看具體實現:
// 程式碼源自client-go/util/workqueue/default_rate_limiters.go // 限速器定義 type ItemFastSlowRateLimiter struct { failuresLock sync.Mutex // 互斥鎖 failures map[interface{}]int // 錯誤次數計數 maxFastAttempts int // 錯誤嘗試閾值 fastDelay time.Duration // 短延遲時間 slowDelay time.Duration // 長延遲時間 } // 限速器實現When介面 func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration { r.failuresLock.Lock() defer r.failuresLock.Unlock() // 累加錯誤計數 r.failures[item] = r.failures[item] + 1 // 錯誤次數超過閾值用長延遲,否則用短延遲 if r.failures[item] <= r.maxFastAttempts { return r.fastDelay } return r.slowDelay } // 限速器實現NumRequeues介面,比較簡單不多解釋 func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int { r.failuresLock.Lock() defer r.failuresLock.Unlock() return r.failures[item] } // 限速器實現Forget介面,比較簡單不多解釋 func (r *ItemFastSlowRateLimiter) Forget(item interface{}) { r.failuresLock.Lock() defer r.failuresLock.Unlock() delete(r.failures, item) }
MaxOfRateLimiter
MaxOfRateLimiter是一個非常有意思的限速器,他內部有多個限速器,每次返回最悲觀的。何所謂最悲觀的,比如內部有三個限速器,When()介面返回的就是三個限速器裡面延遲最大的。讓我們看看具體實現:
// 程式碼源自client-go/util/workqueue/default_rate_limiters.go type MaxOfRateLimiter struct { limiters []RateLimiter // 限速器陣列,建立該限速器需要提供一個限速器陣列 } // 限速器實現When介面 func (r *MaxOfRateLimiter) When(item interface{}) time.Duration { ret := time.Duration(0) // 這裡在獲取所有限速裡面時間最大的 for _, limiter := range r.limiters { curr := limiter.When(item) if curr > ret { ret = curr } } return ret } // 限速器實現NumRequeues介面 func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int { ret := 0 // Requeues也是取最大值 for _, limiter := range r.limiters { curr := limiter.NumRequeues(item) if curr > ret { ret = curr } } return ret } // 限速器實現Forget介面 func (r *MaxOfRateLimiter) Forget(item interface{}) { // 逐一遍歷Forget就行了,比較簡單 for _, limiter := range r.limiters { limiter.Forget(item) } }
BucketRateLimiter
BucketRateLimiter是利用golang.org.x.time.rate.Limiter實現固定速率(qps)的限速器,至於golang.org.x.time.rate.Limiter的實現原理讀者可以自行分析,此處只對BucketRateLimiter做說明。
// 程式碼源自client-go/util/workqueue/default_rate_limiters.go type BucketRateLimiter struct { *rate.Limiter // 這個就是golang.org.x.time.rate.Limiter } func (r *BucketRateLimiter) When(item interface{}) time.Duration { return r.Limiter.Reserve().Delay() // 獲取延遲,這個延遲會是個相對固定的週期 } func (r *BucketRateLimiter) NumRequeues(item interface{}) int { return 0 // 因為固定頻率的,也就不存在重試什麼的了 } func (r *BucketRateLimiter) Forget(item interface{}) { }
BucketRateLimiter主要依靠golang.org.x.time.rate.Limiter,所以本身的程式碼基本沒有啥內容,對於我們來說只要知道這個限速器可以提供穩定的QPS就可以啦~
限速佇列實現
上面說了好多限速器,也該看看限速佇列是怎麼實現的了,client-go中對於限速佇列的抽象如下:
// 程式碼源自client-go/util/workqueue/rate_limiting_queue.go type RateLimitingInterface interface { DelayingInterface // 繼承了延時佇列 AddRateLimited(item interface{}) // 按照限速方式新增元素的介面 Forget(item interface{}) // 丟棄指定元素 NumRequeues(item interface{}) int // 查詢元素放入佇列的次數 } // 這個是限速佇列的實現 type rateLimitingType struct { DelayingInterface // 同樣要繼承延遲佇列 rateLimiter RateLimiter // 哈哈,這就對了嘛,加一個限速器就可以了 }
有了限速器的概念再來看限速佇列就簡單多了,通過限速器獲取物件的延遲時間,然後通過延時方式放入佇列,這樣佇列的內容就會按照我們要求的速率進入了。下面就是相應的程式碼,非常簡單:
// 程式碼源自client-go/util/workqueue/rate_limitting_queue.go func (q *rateLimitingType) AddRateLimited(item interface{}) { // 通過限速器獲取延遲時間,然後加入到延時佇列 q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item)) } func (q *rateLimitingType) NumRequeues(item interface{}) int { return q.rateLimiter.NumRequeues(item) // 太簡單了,不解釋了 } func (q *rateLimitingType) Forget(item interface{}) { q.rateLimiter.Forget(item) // 太簡單了,不解釋了 }
上面之所以感覺非常簡單,是因為前面大量基礎知識的鋪墊,否則直接奔主題肯定懵逼~
英文單詞翻譯:
ShuttingDown:處理過程中當掉; 停止;
參考:https://blog.csdn.net/weixin_42663840/article/details/81482553