1. 程式人生 > >client-go中的golang技巧

client-go中的golang技巧

client-go中有很多比較有意思的實現,如定時器,同步機制等,可以作為移植使用。下面就遇到的一些技術講解,首先看第一個:

  • sets.String(k8s.io/apimachinery/pkg/util/sets/string.go)

實現了對golang map的key的處理,如計算交集,並集等。實際中可能會遇到需要判斷兩個map的key是否重合的場景,此時可以使用下述方式實現,sets.StringKeySet函式將入參的map的key抽取成一個String型別,這樣就可以使用String的方法操作key

ps:更多功能參見原始碼

package main

import (
    "fmt"
    "k8s.io/apimachinery/pkg/util/sets"
)

func main(){
    map1 := map[string]int{"aaa":1,"bbb":2,"ccc":3}
    map2 := map[string]int{"ccc":1,"ddd":2,"eee":3}
    newmap1 := sets.StringKeySet(map1)
    newmap2 := sets.StringKeySet(map2)
    fmt.Println(newmap1.List(),newmap2.List())
    fmt.Println(newmap1.HasAny(newmap2.List()...)) //3個點用於把陣列打散為單個元素
}

結果:true
  • 同步機制
    • sync.Mutex(golang 內建方法),用於資料同步

有2個方法:

func (m *Mutex) Lock()
func (m *Mutex) Unlock()

類似C語言執行緒的互斥鎖,用於對資料進行加解鎖操作。當資料被加鎖後,未獲得該鎖的程式將無法讀取被加鎖的資料。從下面例子可以看出在資料被解鎖前其他協程無法對該資料進行讀寫操作。

ps: read data的資料也可能為“data”

package main

import (
    "fmt"
    "sync"
)

type LockTest struct {
    l sync.Mutex
    data string
}

func main(){
    lockTest := LockTest{sync.Mutex{},"data"}
    go func() {
        lockTest.l.Lock()
        fmt.Println("sleep begin")
        time.Sleep(time.Second*2)
        fmt.Println("sleep end")
        lockTest.l.Unlock()
    }()
    
    time.Sleep(time.Second*1)
    
    go func() {
        lockTest.l.Lock()
        fmt.Println("read data:",lockTest.data)
        lockTest.l.Unlock()
    }()

    go func() {
        lockTest.l.Lock()
        fmt.Println("write data begin")
        lockTest.data="new data"
        fmt.Println("write data end")
        lockTest.l.Unlock()
    }()

    time.Sleep(time.Second*5)
}

結果:
sleep begin
sleep end
write data begin
write data end
read data: new data
    • sync.RWMutex(golang 內建方法),用於資料同步

讀寫鎖,含4個方法,前2個為讀鎖,後2個為寫鎖,使用時要一一對應。寫鎖會阻塞讀寫操作,讀鎖不會阻塞寫操作,讀鎖可以有多個,讀鎖之間不會相互阻塞,適用於讀多寫少的場景。因此如果單純使用RWMutex.Lock/RWMutex.UnLock與使用Mutex.Lock/Mutex.UnLock效果相同

func (rw *RWMutex) RLock()
func (rw *RWMutex) RUnlock()
func (rw *RWMutex) Lock()
func (rw *RWMutex) Unlock()

讀寫鎖一般是讀鎖和寫鎖結合使用的。在有寫鎖的時候,讀鎖會被阻塞,等待寫鎖釋放後才能進行讀操作。

ps:寫鎖內部僅能對共享資源進行讀操作,如果執行寫操作會導致資料異常。sync.Mutex和sync.RWMutex一般都是內建在結構體中使用,用於保護本結構體的資料

package main

import (
    "fmt"
    "sync"
)
type LockTest struct {
    l sync.RWMutex
    data string
}

func main(){
    lockTest := LockTest{sync.RWMutex{},"data"}
    go func() {
        lockTest.l.Lock()
        fmt.Println("write data begin")
        lockTest.data="new data"
        time.Sleep(time.Second*3)
        fmt.Println("write data end")
        lockTest.l.Unlock()
    }()

    time.Sleep(time.Second*1)

    go func() {
        lockTest.l.RLock()  //阻塞等待寫鎖釋放
        fmt.Println("read begin")
        fmt.Println("read data:",lockTest.data)
        fmt.Println("read begin")
        lockTest.l.RUnlock()
    }()

    time.Sleep(time.Second*5)
}

結果:
write data begin write data end read begin read data: new data read begin
    • sync.Cond(golang 內建方法),用於條件變數

sync.Cond用於條件等待,在滿足某些條件時程式才能繼續執行。它包含如下3個方法:Wait()會掛起其所在的協程等待Signal()或Broadcast()的喚醒。

func (c *Cond) Wait() 
func (c *Cond) Signal()
func (c *Cond) Broadcast() 

官方推薦的典型用法如下。由於喚醒協程並不意味著條件已就緒,因此在喚醒後需要檢測是否本協程的條件已經滿足。

c.L.Lock()
for !condition() {
    c.Wait()
}
... make use of condition ...
c.L.Unlock()

使用Signal()喚醒的方式如下,Signal()用於當次喚醒一個協程。如果註釋掉下例中的Signal(),那麼兩個協程會一直Wait(),並不會繼續執行。

package main

import (
    "fmt"
    "sync"
)

func main(){
    l := sync.Mutex{}
    c := sync.NewCond(&l)
    condition1 := false
    condition2 := false
go func() { c.L.Lock() for !condition1 { c.Wait() } fmt.Println("condition1=true,run1") c.L.Unlock() }() go func() { c.L.Lock() for !condition2 { c.Wait() } fmt.Println("condition2=true,run2") c.L.Unlock() }()
time.Sleep(time.Second*1) fmt.Println("signal-1") condition1=true c.Signal() time.Sleep(time.Second*1) fmt.Println("signal-2") condition2=true c.Signal() time.Sleep(time.Second*10) } 結果: signal-1 condition1=true,run1 signal-2 condition2=true,run2

使用Signal()喚醒協程時需要注意,在多個協程等待時,該函式並沒有指定需要喚醒哪一個協程。下面程式的輸出可能為“condition1=true,run1”也可能為“condition2=true,run2”。因此Signal一般適用於僅有一個協程等待的情況,否則可能造成混亂。

package main

import (
    "fmt"
    "sync"
)

func main(){
    l := sync.Mutex{}
    c := sync.NewCond(&l)
    condition1 := false
    condition2 := false
    go func() {
        c.L.Lock()
        for !condition1 {
            c.Wait()
        }
        fmt.Println("condition1=true,run1")
        c.L.Unlock()
    }()

    go func() {
        c.L.Lock()
        for !condition2 {
            c.Wait()
        }
        fmt.Println("condition2=true,run2")
        c.L.Unlock()
    }()
    time.Sleep(time.Second*1)
    condition1=true
    condition2=true
    c.Signal()
    time.Sleep(time.Second*10)
}

Broadcast()比較簡單,即喚醒所有等待的協程

package main

import (
    "fmt"
    "sync"
)

func main(){
    l := sync.Mutex{}
    c := sync.NewCond(&l)
    condition1 := false
    condition2 := false
    go func() {
        c.L.Lock()
        for !condition1 {
            c.Wait()
        }
        fmt.Println("condition1=true,run1")
        c.L.Unlock()
    }()

    go func() {
        c.L.Lock()
        for !condition2 {
            c.Wait()
        }
        fmt.Println("condition2=true,run2")
        c.L.Unlock()
    }()
    time.Sleep(time.Second*1)
    condition1=true
    condition2=true
    c.Broadcast()
    time.Sleep(time.Second*10)
}

結果:
condition1=true,run1
condition2=true,run2
    • sync.waitgroup,用於等待協程執行完成

sync.waitgroup有如下3個方法,Add(delta int)入參表示需要等待的協程的個數,如2表示需要等待2個協程完成;Done()表示該協程結束;Wait()用於阻塞主協程,等待所有協程結束後釋放。

func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait() 

舉例如下,啟動10個協程,Wait()會阻塞,直到所有的協程執行Done()。

ps: Add(delta int)函式的入參很重要,入參大於實際需要等待的協程會導致主協程一致阻塞,小於需要等待的協程會導致某些協程提前退出

import (
    "fmt"
    "sync"
)

func main(){
    wg := sync.WaitGroup{}
    wg.Add(10)

    for i := 0; i < 10; i++ {
        go func(i int) {
            defer wg.Done()
            fmt.Print(i, " ")
        }(i)
    }

    wg.Wait()
}

結果:
9 4 0 1 2 3 6 5 7 8 
    • 協程間使用chan進行同步

下例中使用chan實現主協程控制write,並使用write控制read。協程關閉使用close()函式

ps:使用chan進行協程同步一般將chan作為入參傳入,或在函式內部實現協程間的同步。為方便驗證,下面例子將所有chan作為全域性變數

package main

import (
    "fmt"
    "sync"
)
var speakCh = make(chan string)
var stopReadChan = make(chan struct{})
var stopWriteChan = make(chan struct{})

func readChan(stopCh <-chan struct{}){
    for {
        select {
        case words := <- speakCh:
            fmt.Println("received:",words)
        case <- stopCh:
            fmt.Println("stop read!")
            return
        }
    }
}

func writeChan(stopCh <-chan struct{}){
    for {
        select {
        case <- stopCh:
            fmt.Println("stop write!")
            close(stopReadChan)
            return
        default:
        }
        speakCh <- "hi"
        time.Sleep(time.Second*2)
    }
}

func main(){
    go readChan(stopReadChan)
    go writeChan(stopWriteChan)

    time.Sleep(time.Second*6)
    close(stopWriteChan)
    time.Sleep(time.Second*6)
}

結果:
received: hi
received: hi
received: hi
stop write!
stop read!
    • 協程間使用context進行同步

context用於對協程進行管理,如主動退出協程,超時退出協程等,可以看作是使用chan管理協程的擴充套件。在使用時首先建立一個context,使用cancel()可以取消context,並使用Done()返回的chan管理協程。

官方推薦的用法如下:

func Stream(ctx context.Context, out chan<- Value) error {
    for {
        v, err := DoSomething(ctx)
        if err != nil {
            return err
        }
        select {
        case <-ctx.Done():
            return ctx.Err()
        case out <- v:
        }
    }
}

下例中使用context.WithCancel建立一個context,使用cancel()給這一組context傳送訊號,在協程中使用Done()處理退出事件。

package main

import (
    "fmt"
    "context"
)

func main(){
    ctx,cancel := context.WithCancel(context.Background())
    go testCtx(ctx,"ctx1")
    go testCtx(ctx,"ctx2")
    go testCtx(ctx,"ctx3")
    time.Sleep(time.Second*3)
    cancel()

    time.Sleep(time.Second*5)
}

func testCtx(ctx context.Context, name string) error{
    for {
        select {
        case <-ctx.Done():
            fmt.Println("ctx.Done:",name)
            return ctx.err()
        default:
            fmt.Println("default:",name)
            time.Sleep(time.Second*2)
        }
    }
}

結果:
default: ctx1
default: ctx3
default: ctx2
default: ctx3
default: ctx1
default: ctx2
ctx.Done: ctx1
ctx.Done: ctx3
ctx.Done: ctx2

建立context的方式如下,其餘三個可以看作是WithCancel的擴充套件

func WithCancel(parent Context) (ctx Context, cancel CancelFunc)              //需要主動取消context
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)   //在deadline時間點後取消context
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) //在超時後取消context
func WithValue(parent Context, key, val interface{}) Context

再看一個WithTimeout的例子,下面設定context的超時時間為3s且沒有主動cancel(),3s超時後可以看到該context對應的協程正常退出

func main(){
    ctx,_ := context.WithTimeout(context.Background(),time.Second*3)
    go testCtx(ctx,"ctx1")
    go testCtx(ctx,"ctx2")
    go testCtx(ctx,"ctx3")
    time.Sleep(time.Second*5)
}

結果:
default: ctx3
default: ctx1
default: ctx2
default: ctx3
default: ctx1
default: ctx2
ctx.Done: ctx3
ctx.Done: ctx2
ctx.Done: ctx1

context可以看作是一個樹,當cancel一個context時,會同時cancle它的子context。下面首先建立一個ctx,然後在此ctx下面建立一個subctx。當執行cancle() ctx時會同時cancel() 該的subctx。

context.Background()就是已經實現的首個context。

func main(){
    ctx,cancel := context.WithCancel(context.Background())
    subctx,_ := context.WithCancel(ctx)
    go testCtx(ctx,"ctx1")
    go testCtx(subctx,"subctx1")
    go testCtx(subctx,"subctx2")
    time.Sleep(time.Second*3)
    canclel()

    time.Sleep(time.Second*10)
}

結果:
default: subctx2
default: ctx1
default: subctx1
default: subctx2
default: ctx1
default: subctx1
timeout
ctx.Done: ctx1
ctx.Done: subctx1
ctx.Done: subctx2

下例中僅cancel() subctx,可以看到並沒有影響subctx的parent。

func main(){
    ctx, _:= context.WithCancel(context.Background())
    subctx,subcancel := context.WithCancel(ctx)
    go testCtx(ctx,"ctx1")
    go testCtx(subctx,"subctx1")
    go testCtx(subctx,"subctx2")
    time.Sleep(time.Second*3)
    subcancel()

    time.Sleep(time.Second*10)
}

結果:
default: subctx1
default: subctx2
default: ctx1
default: ctx1
default: subctx1
default: subctx2
timeout
ctx.Done: subctx2
default: ctx1
ctx.Done: subctx1
default: ctx1
default: ctx1
default: ctx1
default: ctx1
    • wait.Group(k8s.io/apimachinery/pkg/util/wait/wait.go)

client-go中的wait.Group創造性地將sync.WaitGroup與chan和ctx結合,實現了協程間同步和等待全部Group中的協程結束的功能。由於StartWithChannel和StartWithContext的入參函式型別比較固定,因此使用上並不通用,但可以作為參考,在實際中擴充套件使用。下例中給出了簡單用法。

func (g *Group) Wait() 
func (g *Group) StartWithChannel(stopCh <-chan struct{}, f func(stopCh <-chan struct{}))
func (g *Group) StartWithContext(ctx context.Context, f func(context.Context))
func main(){
    f1:= func(ctx context.Context) {
        for {
            select {
            case <- ctx.Done():
                return
            default:
                fmt.Println("hi11")
                time.Sleep(time.Second)
            }
        }
    }
    wg := wait.Group{}
    ctx, cancel := context.WithCancel(context.Background())
    wg.StartWithContext(ctx,f1)
    time.Sleep(time.Second*3)
    cancel()
    wg.Wait()
}

結果:
hi
hi
hi

 

  •  定時器
    • ticker定時器

首先看一下一般使用的定時器,client-go中比較複雜的定時器也是在此基礎上封裝的。下面例子中給出的是ticker定時器,它會按照一定的時間頻率往Ticker.C中發time.Time型別的資料,可以在協程中通過判斷Ticker.C來執行定時任務。下例來自官方,實現每秒執行一次列印,

import (
    "fmt"
    "time"
)

func main(){
    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()
    done := make(chan bool)
    go func() {
        time.Sleep(10 * time.Second)
        done <- true
    }()
    for {
        select {
        case <-done:
            fmt.Println("Done!")
            return
        case t := <-ticker.C:
            fmt.Println("Current time: ", t)
        }
    }
}

結果:
Current time:  2019-07-04 14:30:37.9088968 +0800 CST m=+5.328291301
Current time:  2019-07-04 14:30:38.9089349 +0800 CST m=+6.328328801
Current time:  2019-07-04 14:30:39.9101415 +0800 CST m=+7.329534901
Current time:  2019-07-04 14:30:40.9095174 +0800 CST m=+8.328910201
Current time:  2019-07-04 14:30:41.9092961 +0800 CST m=+9.328688301
Current time:  2019-07-04 14:30:42.9087682 +0800 CST m=+10.328159801
Current time:  2019-07-04 14:30:43.9088604 +0800 CST m=+11.328251401
Current time:  2019-07-04 14:30:44.909609 +0800 CST m=+12.328999501
Current time:  2019-07-04 14:30:45.9094782 +0800 CST m=+13.328868101
Current time:  2019-07-04 14:30:46.909006 +0800 CST m=+14.328395401
Done!

需要注意的是使用ticker並不能保證程式被精確性排程,如果程式的執行時間大於ticker的排程週期,那麼程式的觸發週期會發生偏差(可能由於系統cpu佔用過高,網路延遲等原因)。如下面例子中,ticker觸發週期為1s,但程式執行大於2s,此時會出現程式執行頻率不一致的情況。適用於週期性觸發一個任務。

func main(){
    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()
    done := make(chan bool)
    go func() {
        time.Sleep(10 * time.Second)
        done <- true
    }()
    for {
        select {
        case <-done:
            fmt.Println("Done!")
            return
        case t := <-ticker.C:
            time.Sleep(time.Second*2)
            fmt.Println("Current time: ", t)
        }
    }
}

結果:
Current time:  2019-07-04 14:56:52.5446526 +0800 CST m=+5.281916601  
Current time:  2019-07-04 14:56:53.5452488 +0800 CST m=+6.282512201  //和上一條相差1s,但和下一條相差2s
Current time:  2019-07-04 14:56:55.5443528 +0800 CST m=+8.281615101
Current time:  2019-07-04 14:56:57.5449183 +0800 CST m=+10.282179401
Current time:  2019-07-04 14:56:59.5448671 +0800 CST m=+12.282127101
Done!
    • timer定時器

timer的機制和ticker相同,在定時器超時後往一個chan中傳送time.Time資料。不同的是ticker可以週期性排程,timer只會執行一次,如果需要重複排程,需要使用Reset函式重置timer。利用該機制,可以在同一個timer上以不同間隔排程程式。

func main(){
    timer := time.NewTimer(time.Second)
    defer timer.Stop()
    t := <-timer.C
    fmt.Println("Current time: ", t)
    timer.Reset(time.Second*2)
    t = <-timer.C
    fmt.Println("Current time: ", t)
    timer.Reset(time.Second*3)
    t = <-timer.C
    fmt.Println("Current time: ", t)
}

結果:
Current time:  2019-07-04 15:47:01.7518201 +0800 CST m=+5.312710501
Current time:  2019-07-04 15:47:03.7766692 +0800 CST m=+7.337558501
Current time:  2019-07-04 15:47:06.7770913 +0800 CST m=+10.337978901

使用timer需要注意Reset函式只能在timer超時後使用,否則將無效。因為Timer.C的長度只有1,如果前面一個定時器結束前執行了Reset,那麼前面的定時器會被取消。具體可以參見這裡

func NewTimer(d Duration) *Timer {
    c := make(chan Time, 1)
    ...
}

下面例子中可以看出,多次執行Reset並不會多次觸發定時任務,在前一個定時器超時前執行Reset,會取消前一個定時器並以Reset中的duration開始計時。

func main(){
    fmt.Println("now time: "time.Now())
    timer := time.NewTimer(time.Second*5)
    
    defer timer.Stop()
    timer.Reset(time.Second*2)
    timer.Reset(time.Second*2)
    timer.Reset(time.Second*2)


    go func() {
        for ; ;  {
            select {
            case t:=<- timer.C:
                fmt.Println("Current time: ", t)
            }
        }
    }()
    
    time.Sleep(time.Second*10)
}

結果:
now time:      2019-07-04 16:16:31.7246084 +0800 CST m=+4.281414201
Current time:  2019-07-04 16:16:33.7505395 +0800 CST m=+6.307344201

官方推薦的用法如下,由於沒有加鎖,此方法不能在多個協程中同時使用。

if !t.Stop() {
    <-t.C
}
t.Reset(d)

更多timer的用法可以參見官方文件

  • wait實現(k8s.io/apimachinery/pkg/util/wait/wait.go)
    • wait中實現了很多與定時相關的函式,首先來看第一組:
func Forever(f func(), period time.Duration) 
func Until(f func(), period time.Duration, stopCh <-chan struct{}) 
func UntilWithContext(ctx context.Context, f func(context.Context), period time.Duration) 
func NonSlidingUntil(f func(), period time.Duration, stopCh <-chan struct{}) 
func NonSlidingUntilWithContext(ctx context.Context, f func(context.Context), period time.Duration) 

Until函式每period會排程f函式,如果stopCh中有停止訊號,則退出。當程式執行時間超過period時,也不會退出排程迴圈,該特性和Ticker相同。底層使用Timer實現。

Until和NonSlidingUntil為一對,UntilWithContext和NonSlidingUntilWithContext為一對,區別只是定時器啟動時間點不同,可以簡單用下圖表示。可以看到帶“NonSliding”字首的函式。

這兩種(帶“NonSliding”字首的)函式在處理正常程式時沒有什麼區別,但在一些場景下會有不同的地方。下面例子中使用wait.NonSlidingUntil處理的程式中sleep了2s,這可以表示程式因為某種原因導致超出正常處理時間。此時可以看到結果中的“num 1”和“num 2”是同時呼叫的

func main(){
    first := true
    num := 0
    stopCh:=make(chan struct{} )
    
    go func() {
        time.Sleep(time.Second*10)
        close(stopCh)
        fmt.Println("done")
    }()

    go wait.NonSlidingUntil(func(){
        if true == first{
            time.Sleep(time.Second*2)
            first=false
        }
num = num + 1 fmt.Println("num:",num,"time",time.Now()) },time.Second*1,stopCh) time.Sleep(time.Second*100) } 結果: num: 1 time 2019-07-04 21:05:59.5298524 +0800 CST m=+26.277103101 num: 2 time 2019-07-04 21:05:59.554999 +0800 CST m=+26.302249701 num: 3 time 2019-07-04 21:06:00.5559679 +0800 CST m=+27.303218601 num: 4 time 2019-07-04 21:06:01.5566608 +0800 CST m=+28.303911501

將上述程式的wait.NonSlidingUntil替換為wait.Until,得到如下結果,可以看到首次(異常)和第二次(正常)的間隔正好是wait.Until中設定的排程週期,即1s。

ps:大部分場景下兩者使用上並沒有什麼不同,畢竟正常情況下程式執行時間必然小於程式排程週期。如果需要在程式處理延時的情況下儘快進行下一次排程,則選擇帶”NonSliding“字首的函式

結果:
num: 1 time 2019-07-04 21:09:14.9643889 +0800 CST m=+2.010865201 num: 2 time 2019-07-04 21:09:15.9935285 +0800 CST m=+3.040004801 num: 3 time 2019-07-04 21:09:16.9956846 +0800 CST m=+4.042160901
    • func Forever(f func(), period time.Duration)

該函式比較簡單,就是取消了用於控制Until停止的stopCh。以永遠不停止的方式週期性執行f函式

    •  func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error 

ExponentialBackoff可以實現在函式執行錯誤後實現以指數退避方式的延時重試。ExponentialBackoff內部使用的是time.Sleep

ExponentialBackoff的首個入參Backoff如下:

  • Duration:表示初始的延時時間
  • Factor:指數退避的因子
  • Jitter:可以看作是偏差因子,該值越大,每次重試的延時的可選區間越大
  • Steps:指數退避的步數,可以看作程式的最大重試次數
  • Cap:用於在Factor非0時限制最大延時時間和最大重試次數,為0表示不限制最大延時時間
type Backoff struct {
    // The initial duration.
    Duration time.Duration
    // Duration is multiplied by factor each iteration. Must be greater
    // than or equal to zero.
    Factor float64
    // The amount of jitter applied each iteration. Jitter is applied after
    // cap.
    Jitter float64
    // The number of steps before duration stops changing. If zero, initial
    // duration is always used. Used for exponential backoff in combination
    // with Factor.
    Steps int
    // The returned duration will never be greater than cap *before* jitter
    // is applied. The actual maximum cap is `cap * (1.0 + jitter)`.
    Cap time.Duration
}

第二個引數ConditionFunc表示執行的函式,返回的bool值表示該函式是否執行成功,如果執行成功則會退出指數退避

type ConditionFunc func() (done bool, err error)

下面做幾組測試:

=> 當Factor和Jitter都為0時,可以看到排程週期是相同的,即Duration的值(1s)。

import (
    "fmt"
    "k8s.io/apimachinery/pkg/util/wait"
    "time"
)


func main(){
    var DefaultRetry = wait.Backoff{
        Steps:    5,
        Duration: 1 * time.Second,
        Factor:   0,
        Jitter:   0,
    }

    fmt.Println(wait.ExponentialBackoff(DefaultRetry,func() (bool, error){
        fmt.Println(time.Now())
        return false,nil
    }))
}

結果:
2019-07-05 10:17:33.9610108 +0800 CST m=+0.079831101
2019-07-05 10:17:34.961132 +0800 CST m=+1.079952301
2019-07-05 10:17:35.961512 +0800 CST m=+2.080332301
2019-07-05 10:17:36.9625144 +0800 CST m=+3.081334701
2019-07-05 10:17:37.9636334 +0800 CST m=+4.082453701
timed out waiting for the condition

=> 先看Jitter對duration的影響,Jitter(duration, b.Jitter)的計算方式如下,如果入參的Factor為0,而Jitter非0,則將Factor調整為1。rand.Float64()為[0.0,1.0)的偽隨機數。

將Jitter調整為0.5,根據下面計算方式預期duration為[1s,1.5s)。執行程式得出如下結果,觀察可以發現,duration大概是1.4s

if maxFactor <= 0.0 {
    maxFactor = 1.0
}
wait := duration + time.Duration(rand.Float64()*maxFactor*float64(duration))
var DefaultRetry = wait.Backoff{
    Steps:    5,
    Duration: 1 * time.Second,
    Factor:   0,
    Jitter:   0.5,
}

結果:
2019-07-05 10:21:49.5993445 +0800 CST m=+2.382669101
2019-07-05 10:21:50.9026701 +0800 CST m=+3.685994701
2019-07-05 10:21:52.3759019 +0800 CST m=+5.159226401
2019-07-05 10:21:53.7086265 +0800 CST m=+6.491951001
2019-07-05 10:21:54.9283913 +0800 CST m=+7.711715901
timed out waiting for the condition

=> Factor非0且Jitter為0時,對duration的調整如下

if b.Factor != 0 {
    b.Duration = time.Duration(float64(b.Duration) * b.Factor)
    if b.Cap > 0 && b.Duration > b.Cap {
        b.Duration = b.Cap
        b.Steps = 0
    }
}

從公式中可以得出,Factor對程式執行的延的影響如下,可以看到Factor為1時並沒有什麼作用

duration(1) = duration
duration(2) = Factor * duration(1)
duration(3) = Factor * duration(2)
...
duration(n) = Factor * duration(n-1)

Factor為1時,可以看到函式執行間隔均為1s

var DefaultRetry = wait.Backoff{
    Steps:    5,
    Duration: 1 * time.Second,
    Factor:   1,
    Jitter:   0,
}

結果:
2019-07-05 10:28:50.8481017 +0800 CST m=+2.363983901
2019-07-05 10:28:51.8482274 +0800 CST m=+3.364109601
2019-07-05 10:28:52.8482359 +0800 CST m=+4.364118201
2019-07-05 10:28:53.848687 +0800 CST m=+5.364569301
2019-07-05 10:28:54.849409 +0800 CST m=+6.365291201
timed out waiting for the condition

調整Factor為3,預期延時時間為1s,3s,9s,27s,從測試結果看與預期相符

var DefaultRetry = wait.Backoff{
    Steps:    5,
    Duration: 1 * time.Second,
    Factor:   3,
    Jitter:   0,
}

結果:
2019-07-05 10:35:06.9030165 +0800 CST m=+0.077746101
2019-07-05 10:35:07.9038392 +0800 CST m=+1.078568701
2019-07-05 10:35:10.9038733 +0800 CST m=+4.078602901
2019-07-05 10:35:19.9042141 +0800 CST m=+13.078943601
2019-07-05 10:35:46.904647 +0800 CST m=+40.079376501
timed out waiting for the condition

=> 當Factor和Jitter非0時的延遲計算方式如下:

    save_duration(0) = duration
duration(1) =  Jitter(save_duration(0) , b.Jitter)
    save_duration(1) = Factor * save_duration(0) 

duration(2) = Jitter(save_duration(1), b.Jitter)
    save_duration(2) = Factor * save_duration(1)

duration(3) = Jitter(save_duration(2), b.Jitter)
    save_duration = Factor * save_duration(2)
...
duration(n) = Jitter(save_duration(n-1), b.Jitter)

設定Backoff引數如下,按照上述公式得出的期望延時為[1,1.1),[3,3.3),  [9,9.9), [27,29.7)。實際執行如下,小數點一位後四捨五入得出實際延時為1.1, 3.3, 9.6, 28.2,與預期相符。

var DefaultRetry = wait.Backoff{
    Steps:    5,
    Duration: 1 * time.Second,
    Factor:   3,
    Jitter:   0.1,
}

結果:
2019-07-05 11:42:54.8779046 +0800 CST m=+0.135740401
2019-07-05 11:42:55.9399737 +0800 CST m=+1.197782901
2019-07-05 11:42:59.2240904 +0800 CST m=+4.481817401
2019-07-05 11:43:08.8232438 +0800 CST m=+14.080730501
2019-07-05 11:43:37.0058953 +0800 CST m=+42.262752301
timed out waiting for the condition

=> 最後看下Backoff.Cap的影響。設定Cap為10s,預期會比上面不帶Cap的少執行2次(不帶Cap限制的在Step為0時還會執行一次)。實際執行上也是如此

var DefaultRetry = wait.Backoff{
    Steps:    5,
    Duration: 1 * time.Second,
    Factor:   3,
    Jitter:   0.1,
    Cap:      time.Second*10,
}

結果:
2019-07-05 12:02:43.8678742 +0800 CST m=+0.120673901
2019-07-05 12:02:44.9294079 +0800 CST m=+1.182202101
2019-07-05 12:02:48.2125558 +0800 CST m=+4.465333301

ExponentialBackoff借鑑了TCP協議的指數退避演算法,適用於可能會產生資源競爭的場景。指數退避可以有效地在沒有快取處理場景下減小服務端的壓力。

    •  wait庫的第二組
func Poll(interval, timeout time.Duration, condition ConditionFunc) error 
func PollImmediate(interval, timeout time.Duration, condition ConditionFunc) error 
func PollInfinite(interval time.Duration, condition ConditionFunc) error 
func PollImmediateInfinite(interval time.Duration, condition ConditionFunc) error 
func PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error 
func PollImmediateUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error 

Poll表示以interval的週期執行condition函式,直到timeout超時或condition返回true/err非空。

wait.Poll和wait.Until使用上還是有些類似的,區別在於一個使用timeout限制超時時間,一個使用chan提供主動停止排程。

import (
    "fmt"
    "k8s.io/apimachinery/pkg/util/wait"
    "time"
)


func main(){

    wait.Poll(time.Second, time.Second*5, func() (done bool, err error) {
        fmt.Println(time.Now())
        return false,nil
    })

結果:
2019-07-05 13:43:31.2622405 +0800 CST m=+1.069324901
2019-07-05 13:43:32.2619663 +0800 CST m=+2.069050701
2019-07-05 13:43:33.2626114 +0800 CST m=+3.069695801
2019-07-05 13:43:34.2626876 +0800 CST m=+4.069772001
2019-07-05 13:43:35.2624168 +0800 CST m=+5.069501201
2019-07-05 13:43:35.2624168 +0800 CST m=+5.069501201

PollInfinite相比Poll取消了timeout的限制。

PollUntil相比Until來說,PollUntil在condition函式返回true或error的時候會退出排程。

Poll和PollImmediate為一組,PollInfinite和PollImmediateInfinite為一組,PollUntil和PollImmediateUntil為一組,它們的細微差別在於前者在執行condition函式前會等待interval時間,後者則會首先執行condition函式,然後再檢查是否需要等待(condition返回true或err非空時不會再等待)。如果不關注這點差異,用哪個都可以。

    •  heap 堆(k8s.io/client-go/tools/cache)

實現heap需要實現下面Interface介面,heap使用佇列實現了一個完全二叉樹

// heap.Interface
type Interface interface {
    sort.Interface
    Push(x interface{}) // add x as element Len()
    Pop() interface{}   // remove and return element Len() - 1.
}

// sort.Interface
type Interface interface {
    // Len is the number of elements in the collection.
    Len() int
    // Less reports whether the element with
    // index i should sort before the element with index j.
    Less(i, j int) bool
    // Swap swaps the elements with indexes i and j.
    Swap(i, j int)
}

heap對外提供的方法為如下:

func Init(h Interface)
func Push(h Interface, x interface{})
func Pop(h Interface) interface{}
func Remove(h Interface, i int) interface{}
func Fix(h Interface, i int) // 當修改完佇列中的index=i的元素後,重新排序

例子如下:

import (
    "container/heap"
    "fmt"
)
    
func GetAllHeapItems(t Heap_t,name string){
    items := []interface{}{}
    for t.Len() != 0{
        items = append(items, heap.Pop(&t))
    }
    fmt.Println(name,":",items)
}

type Heap_t []int
func (h Heap_t)Len() int{return len(h)}
func (h Heap_t)Less(i,j int)bool {return h[i]<h[j]}
func (h Heap_t)Swap(i,j int){h[i], h[j] = h[j], h[i]}
func (h *Heap_t)Push(x interface{}){*h = append(*h,x.(int))}
func (h *Heap_t)Pop() interface{}{
    if h.Len() == 0{
        return nil
    }
    x := (*h)[len(*h)-1]
    *h = (*h)[0:(len(*h) - 1)]
    return x
}

func main(){
    h := &Heap_t{4,2,6,80,100,45} //[1 2 4 8 80 45 6 23 56 100]
    heap.Init(h)
    GetAllHeapItems(*h,"h")

    h1 := &Heap_t{4,2,6,80,100,45}
    heap.Init(h1)
    h1.Push(3)
    GetAllHeapItems(*h1,"h1")

    h2 := &Heap_t{4,2,6,80,100,45}
    heap.Init(h2)
    GetAllHeapItems(*h2,"h2")

    h3 := &Heap_t{4,2,6,80,100,45}
    heap.Init(h3)
    (*h3)[2] = 200
    fmt.Println(1111,h3)
    heap.Fix(h3,2)
    fmt.Println(2222,h3)
    GetAllHeapItems(*h3,"h3")
}

結果:
h : [2 4 6 45 80 100]
h1 : [2 3 4 6 45 80 100]
h2 : [2 4 6 45 80 100]
1111 &[2 4 200 80 100 45]
2222 &[2 4 45 80 100 200]
h3 : [2 4 45 80 100 200]

heap的實現比較巧妙,使用佇列實現了完全二叉樹,比較適用於查詢頻繁的場景,原理解析可以參見這裡

更多使用和例子參見官方文件

  • klog(k8s.io/klog) 實現執行日誌列印
  • 使用select{}實現主協程不退出
func main(){
    ...
    select{}
}
  •  可以使用switch對地址進行判斷
package main

import (
    "fmt"
)

func main(){
    type emptyCtx int
    background := new(emptyCtx)
    todo       := new(emptyCtx)
    typeSwitch := func (i interface{}) {
        switch i {
        case background:
            fmt.Println("background")
        case todo:
            fmt.Println("todo")
        default:
            fmt.Println("default")
        }
    }

    typeSwitch(background)
}

結果:
true

 

 參考:

https://www.flysnow.org/2017/05/12/go-in-action-go-context.