Golang中的SingleFlight與CyclicBarrier
SingleFlight將併發請求合併成一個請求,可用於減少下游壓力;CyclicBarrier可重用柵欄併發原語,控制一組請求同時執行;
SingleFlight
在Go中SingleFlight並不是原生提供的,而是開發組提供的擴充套件併發原語。它可實現多個goroutine呼叫通過一函式時,只讓一個goroutine呼叫該函式,等到該goroutine呼叫函式返回結果時再將結果返回給其他同時呼叫的goroutine,從而做到了減少併發呼叫的次數;
在秒殺、快取等場景下SingleFlight作用很明顯,能夠大規模的減少併發數量避免快取穿透、系統崩潰等。將多個併發請求 合併成一,瞬間將下游系統壓力從N減少到1
func flightDemo() { key := "flight" for i := 0; i < 5; i++ { log.Printf("ID: %d 請求獲取快取", i) go func(id int) { value, _ := getCache(key, id) log.Printf("ID :%d 獲取到快取 , key: %v,value: %v", id, key, value) }(i) } time.Sleep(20 * time.Second) } func getCache(key string, id int) (string, error) { var ret, _, _ = group.Do(key, func() (ret interface{}, err error) { time.Sleep(2 * time.Second)//模擬獲取快取 log.Printf("ID: %v 執行獲取快取", id) return id, nil }) return strconv.Itoa(ret.(int)), nil }
執行結果:
2020/12/14 14:35:13 ID: 0 請求獲取快取 2020/12/14 14:35:13 ID: 1 請求獲取快取 2020/12/14 14:35:13 ID: 2 請求獲取快取 2020/12/14 14:35:13 ID: 3 請求獲取快取 2020/12/14 14:35:13 ID: 4 請求獲取快取 2020/12/14 14:35:15 ID: 0 執行獲取快取 2020/12/14 14:35:15 ID :0 獲取到快取 , key: flight,value: 0 2020/12/14 14:35:15 ID :2 獲取到快取 , key: flight,value: 0 2020/12/14 14:35:15 ID :4 獲取到快取 , key: flight,value: 0 2020/12/14 14:35:15 ID :3 獲取到快取 , key: flight,value: 0 2020/12/14 14:35:15 ID :1 獲取到快取 , key: flight,value: 0
這個Demo中有五個goroutine同時發起獲取key為flight的快取,由於使用了SingleFlight物件,ID為0的請求率先發起了獲取快取,其他4個goroutine並不會去執行獲取快取請求邏輯,而是等到ID為0的請求取得到結果後直接使用該結果;
SingleFlight內部使用了互斥鎖Mutex與Map實現,Mutex用於提供併發時的讀防寫,Map用於儲存同一個key的處理請求;SingleFlight提供瞭如下三個方法:
Do: 執行一個函式,返回函式的執行結果;
DoChan: 與Do方法類似,返回的是一個chan,函式fn執行完成產生結果後,可從chan中接受到函式的執行結果;
Forget: 丟棄某個key,之後這個key請求會繼續執行函式fn,不在等待前一個請求fn函式的執行結果;
SingleFlight的實現部分程式碼如下,其中call為具體的的請求、Group代表Singleflight、map[string]*call用於儲存相對應的key所發起的請求;
type call struct {
wg sync.WaitGroup
val interface{}
err error
}
type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized
}
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
//是否已存在該key的請求
if c, ok := g.m[key]; ok {
c.dups++
g.mu.Unlock()
c.wg.Wait() //等待該key第一個請求完成
if e, ok := c.err.(*panicError); ok {
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
return c.val, c.err, true //返回該key第一個請求的結果
}
c := new(call) //第一個請求
c.wg.Add(1)
g.m[key] = c //將請求加入到map中
g.mu.Unlock()
g.doCall(c, key, fn) //呼叫函式fn
return c.val, c.err, c.dups > 0
}
CyclicBarrier
在Go的標準庫中、開發組擴充套件庫中其實也並沒有CyclicBarrier的實現,有個第三方的CyclicBarrier實現:https://github.com/marusama/cyclicbarrier, 它的邏輯為:一組goroutine彼此等待直到所有的goroutine都達到某個執行點,再往下執行。就如柵欄一樣等指定數量的人到齊了,開始抬起柵欄放行;它的執行邏輯與Java的cyclicbarrier類似;
在Go標準庫中有個物件有類似的功能:WaitGroup,但該物件並沒有CyclicBarrier那麼簡單易用;
func cyclicBarrierDemo(){
for i := 0; i < 3; i++ {
go func(id int) {
log.Printf("start: %v", id)
barrier.Await(context.Background())
log.Printf("finish: %v", id)
}(i)
}
time.Sleep(5 * time.Second)
log.Printf("完成")
}
執行結果:
2020/12/14 15:11:57 start: 2
2020/12/14 15:11:57 start: 0
2020/12/14 15:11:57 start: 1
2020/12/14 15:11:57 finish: 1
2020/12/14 15:11:57 finish: 2
2020/12/14 15:11:57 finish: 0
2020/12/14 15:12:02 完成
通過上面Demo可以看到ID為2、0的goroutine輸出start後並沒有繼續往下執行,而是等到ID為0的goroutine執行到start後三個goroutine一起往下執行;
如沒有使用柵欄,則這個Demo的執行結果如下:
2020/12/14 15:09:02 start: 0
2020/12/14 15:09:02 finish: 0
2020/12/14 15:09:02 start: 1
2020/12/14 15:09:02 finish: 2
2020/12/14 15:09:02 start: 2
2020/12/14 15:09:02 finish: 2
2020/12/14 15:09:07 完成