1. 程式人生 > >go中sync.Cond原始碼解讀

go中sync.Cond原始碼解讀

- [sync.Cond](#synccond) - [前言](#%E5%89%8D%E8%A8%80) - [什麼是sync.Cond](#%E4%BB%80%E4%B9%88%E6%98%AFsynccond) - [看下原始碼](#%E7%9C%8B%E4%B8%8B%E6%BA%90%E7%A0%81) - [Wait](#wait) - [Signal](#signal) - [Broadcast](#broadcast) - [總結](#%E6%80%BB%E7%BB%93) ## sync.Cond ### 前言 本次的程式碼是基於`go version go1.13.15 darwin/amd64` ### 什麼是sync.Cond Go語言標準庫中的條件變數`sync.Cond`,它可以讓一組的`Goroutine`都在滿足特定條件時被喚醒。 每個`Cond`都會關聯一個Lock`(*sync.Mutex or *sync.RWMutex)` ```go var ( locker = new(sync.Mutex) cond = sync.NewCond(locker) ) func listen(x int) { // 獲取鎖 cond.L.Lock() // 等待通知 暫時阻塞 cond.Wait() fmt.Println(x) // 釋放鎖 cond.L.Unlock() } func main() { // 啟動60個被cond阻塞的執行緒 for i := 1; i <= 60; i++ { go listen(i) } fmt.Println("start all") // 3秒之後 下發一個通知給已經獲取鎖的goroutine time.Sleep(time.Second * 3) fmt.Println("++++++++++++++++++++one Signal") cond.Signal() // 3秒之後 下發一個通知給已經獲取鎖的goroutine time.Sleep(time.Second * 3) fmt.Println("++++++++++++++++++++one Signal") cond.Signal() // 3秒之後 下發廣播給所有等待的goroutine time.Sleep(time.Second * 3) fmt.Println("++++++++++++++++++++begin broadcast") cond.Broadcast() // 阻塞直到所有的全部輸出 time.Sleep(time.Second * 60) } ``` 上面是個簡單的例子,我們啟動了60個執行緒,然後都被`cond`阻塞,主函式通過`Signal()`通知一個`goroutine`接觸阻塞,通過`Broadcast()`通知所有被阻塞的全部解除阻塞。
### 看下原始碼 ```go // Wait 原子式的 unlock c.L, 並暫停執行呼叫的 goroutine。 // 在稍後執行後,Wait 會在返回前 lock c.L. 與其他系統不同, // 除非被 Broadcast 或 Signal 喚醒,否則等待無法返回。 // // 因為等待第一次 resume 時 c.L 沒有被鎖定,所以當 Wait 返回時, // 呼叫者通常不能認為條件為真。相反,呼叫者應該在迴圈中使用 Wait(): // // c.L.Lock() // for !condition() { // c.Wait() // } // ... make use of condition ... // c.L.Unlock() // type Cond struct { // 用於保證結構體不會在編譯期間拷貝 noCopy noCopy // 鎖 L Locker // goroutine連結串列,維護等待喚醒的goroutine佇列 notify notifyList // 保證執行期間不會發生copy checker copyChecker } ``` 重點分析下:`notifyList`和`copyChecker` - notify ```go type notifyList struct { // 總共需要等待的數量 wait uint32 // 已經通知的數量 notify uint32 // 鎖 lock uintptr // 指向連結串列頭部 head *sudog // 指向連結串列尾部 tail *sudog } ``` 這個是核心,所有`wait`的`goroutine`都會被加入到這個連結串列中,然後在通知的時候再從這個連結串列中獲取。 - copyChecker 保證執行期間不會發生copy ```go type copyChecker uintptr // copyChecker holds back pointer to itself to detect object copying func (c *copyChecker) check() { if uintptr(*c) != uintptr(unsafe.Pointer(c)) && !atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) && uintptr(*c) != uintptr(unsafe.Pointer(c)) { panic("sync.Cond is copied") } } ``` #### Wait ```go func (c *Cond) Wait() { // 監測是否複製 c.checker.check() // 更新 notifyList中需要等待的wait的數量 // 返回當前需要插入連結串列節點ticket t := runtime_notifyListAdd(&c.notify) c.L.Unlock() // 為當前的加入的waiter構建一個連結串列的節點,插入連結串列的尾部 runtime_notifyListWait(&c.notify, t) c.L.Lock() } // go/src/runtime/sema.go // 更新 notifyList中需要等待的wait的數量 // 同時返回當前的加入的 waiter 的 ticket 編號,從0開始 //go:linkname notifyListAdd sync.runtime_notifyListAdd func notifyListAdd(l *notifyList) uint32 { // 使用atomic原子的對wait欄位進行加一操作 return atomic.Xadd(&l.wait, 1) - 1 } // go/src/runtime/sema.go // 為當前的加入的waiter構建一個連結串列的節點,插入連結串列的尾部 //go:linkname notifyListWait sync.runtime_notifyListWait func notifyListWait(l *notifyList, t uint32) { lock(&l.lock) // 當t小於notifyList中的notify,說明當前節點已經被通知了 if less(t, l.notify) { unlock(&l.lock) return } // 構建當前節點 s := acquireSudog() s.g = getg() s.ticket = t s.releasetime = 0 t0 := int64(0) if blockprofilerate >
0 { t0 = cputicks() s.releasetime = -1 } // 頭結點沒構建,插入頭結點 if l.tail == nil { l.head = s } else { // 插入到尾節點 l.tail.next = s } l.tail = s // 將當前goroutine置於等待狀態並解鎖 // 通過呼叫goready(gp),可以使goroutine再次可執行。 // 也就是將 M/P/G 解綁,並將 G 調整為等待狀態,放入 sudog 等待佇列中 goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3) if t0 != 0 { blockevent(s.releasetime-t0, 2) } releaseSudog(s) } ``` 梳理流程 1、首先檢測物件的複製行為,如果有複製發生直接丟擲panic; 2、然後呼叫`runtime_notifyListAdd`對`notifynotifyListList`中的`wait`(需要等待的數量)進行加一操作,同時返回一個`ticket`,用來作為當前`wait`的編號,這個編號,會和`notifyList`中的`notify`對應起來; 3、然後呼叫`runtime_notifyListWait`把當前的`wait`封裝成連結串列的一個節點,插入到`notifyList`維護的連結串列的尾部。
#### Signal ```go // 喚醒一個被wait的goroutine func (c *Cond) Signal() { // 監測是否複製 c.checker.check() runtime_notifyListNotifyOne(&c.notify) } // go/src/runtime/sema.go // 通知連結串列中的第一個 //go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne func notifyListNotifyOne(l *notifyList) { // wait和notify,說明已經全部通知到了 if atomic.Load(&l.wait) == atomic.Load(&l.notify) { return } lock(&l.lock) // 這裡做了二次的確認 // wait和notify,說明已經全部通知到了 t := l.notify if t == atomic.Load(&l.wait) { unlock(&l.lock) return } // 原子的對notify執行+1操作 atomic.Store(&l.notify, t+1) // 嘗試找到需要被通知的 g // 如果目前還沒來得及入隊,是無法找到的 // 但是,當它看到通知編號已經發生改變是不會被 park 的 // // 這個查詢過程看起來是線性複雜度,但實際上很快就停了 // 因為 g 的佇列與獲取編號不同,因而佇列中會出現少量重排,但我們希望找到靠前的 g // 而 g 只有在不再 race 後才會排在靠前的位置,因此這個迭代也不會太久, // 同時,即便找不到 g,這個情況也成立: // 它還沒有休眠,並且已經失去了我們在佇列上找到的(少數)其他 g 的 race。 for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next { // 順序拿到一個節點的ticket,會和上面會和notifyList中的notify做比較,相同才進行後續的操作 // 這個我們分析了,notifyList中的notify和連結串列節點中的ticket是一一對應的 if s.ticket == t { n := s.next if p != nil { p.next = n } else { l.head = n } if n == nil { l.tail = p } unlock(&l.lock) s.next = nil // 通過goready掉起在上面通過goparkunlock掛起的goroutine readyWithTime(s, 4) return } } unlock(&l.lock) } ``` 梳理下流程: 1、首先檢測物件的複製行為,如果有複製發生直接丟擲`panic`; 2、判斷`wait`和`notify`,如果兩者相同說明已經已經全部通知到了; 3、呼叫`notifyListNotifyOne`,通過for迴圈,依次遍歷這個連結串列,直到找到和`notifyList`中的`notify`,相匹配的`ticket`的節點; 4、掉起`goroutine`,完成通知。 #### Broadcast ```go // 喚醒所有被wait的goroutine func (c *Cond) Broadcast() { c.checker.check() runtime_notifyListNotifyAll(&c.notify) } // go/src/runtime/sema.go // notifyListNotifyAll notifies all entries in the list. //go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll func notifyListNotifyAll(l *notifyList) { // wait和notify,說明已經全部通知到了 if atomic.Load(&l.wait) == atomic.Load(&l.notify) { return } // 加鎖 lock(&l.lock) s := l.head l.head = nil l.tail = nil // 這個很粗暴,直接將notify的值置換成wait atomic.Store(&l.notify, atomic.Load(&l.wait)) unlock(&l.lock) // 迴圈連結串列,一個個喚醒goroutine for s != nil { next := s.next s.next = nil readyWithTime(s, 4) s = next } } ``` 梳理下流程: 1、首先檢測物件的複製行為,如果有複製發生直接丟擲panic; 2、判斷`wait`和`notify`,如果兩者相同說明已經已經全部通知到了; 3、`notifyListNotifyAll`,就相對簡單了,直接將`notify`的值置為`wait`,標註這個已經全部通知了; 4、迴圈連結串列,一個個喚醒`goroutine`。 ### 總結 `sync.Cond`不是一個常用的同步機制,但是在條件長時間無法滿足時,與使用`for {}`進行忙碌等待相比,`sync.Cond`能夠讓出處理器的使用權,提供`CPU`的利用率。使用時我們也需要注意以下問題: 1、`sync.Cond.Wait`在呼叫之前一定要使用獲取互斥鎖,否則會觸發程式崩潰; 2、`sync.Cond.Signal` 喚醒的 `Goroutine`都是佇列最前面、等待最久的`Goroutine`; 3、`sync.Cond.Broadcast`會按照一定順序廣播通知等待的全部 `Gorou