golang 詳解協程——errgroup
阿新 • • 發佈:2022-04-08
為什麼要有sync.errgroup
go支援併發,一般採用的是 channel 、 sync.WaitGroup 、context,來實現各個協程之間的流程控制和訊息傳遞。
但是對於開啟的成千上萬的協程,如果在每個協程內都自行去列印 錯誤日誌的話,會造成日誌分散,不好分析。
所以我們要實現一種能統一處理各個協程錯誤的工具
什麼是 sync.errgroup
Go團隊在實驗倉庫中添加了一個名為sync.errgroup的新軟體包。 sync.ErrGroup再sync.WaitGroup功能的基礎上,增加了錯誤傳遞,以及在發生不可恢復的錯誤時取消整個goroutine集合,或者等待超時
主要是利用了 waitgroup,context以及sync.Once,對這三個不熟悉的應先去看下相應的知識點
獲取方法
go get golang.org/x/sync
errgroup 的功能
1、處理子協程 error
func main() { var g errgroup.Group // 宣告一個group例項 var urls = []string{ "http://www.golang.org/", "http://www.google.com/", "http://www.somestupidname.com/", } for _, url := range urls { // 分別獲取網站內容 url := url //url是區域性變數,for迴圈中對多個協程傳遞值時,需要重新進行賦值 g.Go(func() error { // group 的go方法,啟一個協程去執行程式碼 // Fetch the URL. resp, err := http.Get(url) if err == nil { resp.Body.Close() } return err }) } if err := g.Wait(); err == nil { //group 的wait方法,等待上面的 g.go的協程執行完成,並且可以接受錯誤 fmt.Println("Successfully fetched all URLs.") } }
上面這個例子是簡單的利用 errgroup 進行的 waitGroup和error的處理,下面我們對關鍵的程式碼做一個分析,並結合原始碼來看
var g errgroup.Group
宣告一個 group的例項,我們看下 group 包含哪些東西
type Group struct { cancel func() wg sync.WaitGroup errOnce sync.Once err error }
group是一個結構體,包含四個部分
- cancel 一個取消的函式,主要來包裝context.WithCancel的CancelFunc
- wg 藉助於WaitGroup實現的
- errOnce 使用sync.Once實現只輸出第一個err
- err 記錄下錯誤的資訊
g.Go(func() error {}
啟動goroutine 執行程式碼
記錄第一個出錯的goroutine的err資訊。我們看下原始碼
func (g *Group) Go(f func() error) { g.wg.Add(1) // 和WaitGroup 一樣,每執行一個新的g,通過add方法 加1 go func() { defer g.wg.Done() // 執行結束後 呼叫 Done方法,減1 if err := f(); err != nil { // 執行傳入的匿名函式 g.errOnce.Do(func() { // 如果匿名函式返回錯誤,會記錄錯誤資訊。注意這裡用的 once.Do,只執行一次,僅會記錄第一個出現的err g.err = err if g.cancel != nil { // 如果初始化的有 cancel 函式,會呼叫 cancel退出 g.cancel() } }) } }() }
再來看下 g.Wait()
func (g *Group) Wait() error { g.wg.Wait() // 和 WaitGroup 一樣,在主執行緒呼叫 wait 方法,阻塞等待所有g執行完成 if g.cancel != nil { // 如果初始化了 cancel 函式,就執行 g.cancel() } return g.err // 返回第一個出現的err資訊 }
2、結合 context 來使用
package main import ( "context" "fmt" "golang.org/x/sync/errgroup" "time" ) func main() { ctx, cancel := context.WithCancel(context.Background()) group, errCtx := errgroup.WithContext(ctx) for index := 0; index < 3; index++ { indexTemp := index // 新建子協程 group.Go(func() error { fmt.Printf("indexTemp=%d \n", indexTemp) if indexTemp == 0 { // 第一個協程 fmt.Println("indexTemp == 0 start ") fmt.Println("indexTemp == 0 end") } else if indexTemp == 1 { // 第二個協程 fmt.Println("indexTemp == 1 start") //這裡一般都是某個協程發生異常之後,呼叫cancel() //這樣別的協程就可以通過errCtx獲取到err資訊,以便決定是否需要取消後續操作 cancel() // 第二個協程異常退出 fmt.Println("indexTemp == 1 err ") } else if indexTemp == 2 { fmt.Println("indexTemp == 2 begin") // 休眠1秒,用於捕獲子協程2的出錯 time.Sleep(1 * time.Second) //檢查 其他協程已經發生錯誤,如果已經發生異常,則不再執行下面的程式碼 err := CheckGoroutineErr(errCtx) // 第三個協程感知第二個協程是否正常 if err != nil { return err } fmt.Println("indexTemp == 2 end ") } return nil }) } // 捕獲err err := group.Wait() if err == nil { fmt.Println("都完成了") } else { fmt.Printf("get error:%v", err) } } //校驗是否有協程已發生錯誤 func CheckGoroutineErr(errContext context.Context) error { select { case <-errContext.Done(): return errContext.Err() default: return nil } }