1. 程式人生 > 實用技巧 >Golang中的SingleFlight與CyclicBarrier

Golang中的SingleFlight與CyclicBarrier

  SingleFlight將併發請求合併成一個請求,可用於減少下游壓力;CyclicBarrier可重用柵欄併發原語,控制一組請求同時執行;

SingleFlight

  在Go中SingleFlight並不是原生提供的,而是開發組提供的擴充套件併發原語。它可實現多個goroutine呼叫通過一函式時,只讓一個goroutine呼叫該函式,等到該goroutine呼叫函式返回結果時再將結果返回給其他同時呼叫的goroutine,從而做到了減少併發呼叫的次數;
  在秒殺快取等場景下SingleFlight作用很明顯,能夠大規模的減少併發數量避免快取穿透系統崩潰等。將多個併發請求 合併成一,瞬間將下游系統壓力從N減少到1

func flightDemo() {
    key := "flight"
	for i := 0; i < 5; i++ {
            log.Printf("ID: %d 請求獲取快取", i)
            go func(id int) {
		value, _ := getCache(key, id)
		log.Printf("ID :%d 獲取到快取 , key: %v,value: %v", id, key, value)
	    }(i)
	}
        time.Sleep(20 * time.Second)
}

func getCache(key string, id int) (string, error) {
	var ret, _, _ = group.Do(key, func() (ret interface{}, err error) {
		time.Sleep(2 * time.Second)//模擬獲取快取
		log.Printf("ID: %v 執行獲取快取", id)
		return id, nil
	})
	return strconv.Itoa(ret.(int)), nil
}

執行結果:

2020/12/14 14:35:13 ID: 0 請求獲取快取
2020/12/14 14:35:13 ID: 1 請求獲取快取
2020/12/14 14:35:13 ID: 2 請求獲取快取
2020/12/14 14:35:13 ID: 3 請求獲取快取
2020/12/14 14:35:13 ID: 4 請求獲取快取
2020/12/14 14:35:15 ID: 0 執行獲取快取
2020/12/14 14:35:15 ID :0 獲取到快取 , key: flight,value: 0
2020/12/14 14:35:15 ID :2 獲取到快取 , key: flight,value: 0
2020/12/14 14:35:15 ID :4 獲取到快取 , key: flight,value: 0
2020/12/14 14:35:15 ID :3 獲取到快取 , key: flight,value: 0
2020/12/14 14:35:15 ID :1 獲取到快取 , key: flight,value: 0

  這個Demo中有五個goroutine同時發起獲取key為flight的快取,由於使用了SingleFlight物件,ID為0的請求率先發起了獲取快取,其他4個goroutine並不會去執行獲取快取請求邏輯,而是等到ID為0的請求取得到結果後直接使用該結果

  SingleFlight內部使用了互斥鎖Mutex與Map實現,Mutex用於提供併發時的讀防寫,Map用於儲存同一個key的處理請求;SingleFlight提供瞭如下三個方法:
  Do: 執行一個函式,返回函式的執行結果;
  DoChan: 與Do方法類似,返回的是一個chan,函式fn執行完成產生結果後,可從chan中接受到函式的執行結果;
  Forget: 丟棄某個key,之後這個key請求會繼續執行函式fn,不在等待前一個請求fn函式的執行結果;

  SingleFlight的實現部分程式碼如下,其中call為具體的的請求、Group代表Singleflight、map[string]*call用於儲存相對應的key所發起的請求;

type call struct {
   wg  sync.WaitGroup
   val interface{}
   err error
}

type Group struct {
   mu sync.Mutex       // protects m
   m  map[string]*call // lazily initialized
}

func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
	g.mu.Lock()
	if g.m == nil {
		g.m = make(map[string]*call)
	}
	//是否已存在該key的請求
	if c, ok := g.m[key]; ok {
 		c.dups++
		g.mu.Unlock()
		c.wg.Wait()    //等待該key第一個請求完成
		if e, ok := c.err.(*panicError); ok {
			panic(e)
		} else if c.err == errGoexit {
			runtime.Goexit()
		}
		return c.val, c.err, true   //返回該key第一個請求的結果
	}
	c := new(call)   //第一個請求
	c.wg.Add(1)
	g.m[key] = c     //將請求加入到map中
	g.mu.Unlock()
	g.doCall(c, key, fn) //呼叫函式fn
	return c.val, c.err, c.dups > 0
}

CyclicBarrier

  在Go的標準庫中、開發組擴充套件庫中其實也並沒有CyclicBarrier的實現,有個第三方的CyclicBarrier實現:https://github.com/marusama/cyclicbarrier, 它的邏輯為:一組goroutine彼此等待直到所有的goroutine都達到某個執行點,再往下執行。就如柵欄一樣等指定數量的人到齊了,開始抬起柵欄放行;它的執行邏輯與Java的cyclicbarrier類似;
  在Go標準庫中有個物件有類似的功能:WaitGroup,但該物件並沒有CyclicBarrier那麼簡單易用;

func cyclicBarrierDemo(){
	for i := 0; i < 3; i++ {
		go func(id int) {
            log.Printf("start: %v", id)
		barrier.Await(context.Background())
			log.Printf("finish: %v", id)		
                }(i)
        }

	time.Sleep(5 * time.Second)
	log.Printf("完成")
}

執行結果:

2020/12/14 15:11:57 start: 2
2020/12/14 15:11:57 start: 0
2020/12/14 15:11:57 start: 1
2020/12/14 15:11:57 finish: 1
2020/12/14 15:11:57 finish: 2
2020/12/14 15:11:57 finish: 0
2020/12/14 15:12:02 完成

  通過上面Demo可以看到ID為2、0的goroutine輸出start後並沒有繼續往下執行,而是等到ID為0的goroutine執行到start後三個goroutine一起往下執行;

  如沒有使用柵欄,則這個Demo的執行結果如下:

2020/12/14 15:09:02 start: 0
2020/12/14 15:09:02 finish: 0
2020/12/14 15:09:02 start: 1
2020/12/14 15:09:02 finish: 2
2020/12/14 15:09:02 start: 2
2020/12/14 15:09:02 finish: 2
2020/12/14 15:09:07 完成