go中sync.Cond原始碼解讀
阿新 • • 發佈:2021-03-11
- [sync.Cond](#synccond)
- [前言](#%E5%89%8D%E8%A8%80)
- [什麼是sync.Cond](#%E4%BB%80%E4%B9%88%E6%98%AFsynccond)
- [看下原始碼](#%E7%9C%8B%E4%B8%8B%E6%BA%90%E7%A0%81)
- [Wait](#wait)
- [Signal](#signal)
- [Broadcast](#broadcast)
- [總結](#%E6%80%BB%E7%BB%93)
## sync.Cond
### 前言
本次的程式碼是基於`go version go1.13.15 darwin/amd64`
### 什麼是sync.Cond
Go語言標準庫中的條件變數`sync.Cond`,它可以讓一組的`Goroutine`都在滿足特定條件時被喚醒。
每個`Cond`都會關聯一個Lock`(*sync.Mutex or *sync.RWMutex)`
```go
var (
locker = new(sync.Mutex)
cond = sync.NewCond(locker)
)
func listen(x int) {
// 獲取鎖
cond.L.Lock()
// 等待通知 暫時阻塞
cond.Wait()
fmt.Println(x)
// 釋放鎖
cond.L.Unlock()
}
func main() {
// 啟動60個被cond阻塞的執行緒
for i := 1; i <= 60; i++ {
go listen(i)
}
fmt.Println("start all")
// 3秒之後 下發一個通知給已經獲取鎖的goroutine time.Sleep(time.Second * 3)
fmt.Println("++++++++++++++++++++one Signal")
cond.Signal()
// 3秒之後 下發一個通知給已經獲取鎖的goroutine
time.Sleep(time.Second * 3)
fmt.Println("++++++++++++++++++++one Signal")
cond.Signal()
// 3秒之後 下發廣播給所有等待的goroutine
time.Sleep(time.Second * 3)
fmt.Println("++++++++++++++++++++begin broadcast")
cond.Broadcast()
// 阻塞直到所有的全部輸出
time.Sleep(time.Second * 60)
}
```
上面是個簡單的例子,我們啟動了60個執行緒,然後都被`cond`阻塞,主函式通過`Signal()`通知一個`goroutine`接觸阻塞,通過`Broadcast()`通知所有被阻塞的全部解除阻塞。
### 看下原始碼
```go
// Wait 原子式的 unlock c.L, 並暫停執行呼叫的 goroutine。
// 在稍後執行後,Wait 會在返回前 lock c.L. 與其他系統不同,
// 除非被 Broadcast 或 Signal 喚醒,否則等待無法返回。
//
// 因為等待第一次 resume 時 c.L 沒有被鎖定,所以當 Wait 返回時,
// 呼叫者通常不能認為條件為真。相反,呼叫者應該在迴圈中使用 Wait():
//
// c.L.Lock()
// for !condition() {
// c.Wait()
// }
// ... make use of condition ...
// c.L.Unlock()
//
type Cond struct {
// 用於保證結構體不會在編譯期間拷貝
noCopy noCopy
// 鎖
L Locker
// goroutine連結串列,維護等待喚醒的goroutine佇列
notify notifyList
// 保證執行期間不會發生copy
checker copyChecker
}
```
重點分析下:`notifyList`和`copyChecker`
- notify
```go
type notifyList struct {
// 總共需要等待的數量
wait uint32
// 已經通知的數量
notify uint32
// 鎖
lock uintptr
// 指向連結串列頭部
head *sudog
// 指向連結串列尾部
tail *sudog
}
```
這個是核心,所有`wait`的`goroutine`都會被加入到這個連結串列中,然後在通知的時候再從這個連結串列中獲取。
- copyChecker
保證執行期間不會發生copy
```go
type copyChecker uintptr
// copyChecker holds back pointer to itself to detect object copying
func (c *copyChecker) check() {
if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
uintptr(*c) != uintptr(unsafe.Pointer(c)) {
panic("sync.Cond is copied")
}
}
```
#### Wait
```go
func (c *Cond) Wait() {
// 監測是否複製
c.checker.check()
// 更新 notifyList中需要等待的wait的數量
// 返回當前需要插入連結串列節點ticket
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
// 為當前的加入的waiter構建一個連結串列的節點,插入連結串列的尾部
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}
// go/src/runtime/sema.go
// 更新 notifyList中需要等待的wait的數量
// 同時返回當前的加入的 waiter 的 ticket 編號,從0開始
//go:linkname notifyListAdd sync.runtime_notifyListAdd
func notifyListAdd(l *notifyList) uint32 {
// 使用atomic原子的對wait欄位進行加一操作
return atomic.Xadd(&l.wait, 1) - 1
}
// go/src/runtime/sema.go
// 為當前的加入的waiter構建一個連結串列的節點,插入連結串列的尾部
//go:linkname notifyListWait sync.runtime_notifyListWait
func notifyListWait(l *notifyList, t uint32) {
lock(&l.lock)
// 當t小於notifyList中的notify,說明當前節點已經被通知了
if less(t, l.notify) {
unlock(&l.lock)
return
}
// 構建當前節點
s := acquireSudog()
s.g = getg()
s.ticket = t
s.releasetime = 0
t0 := int64(0)
if blockprofilerate > 0 {
t0 = cputicks()
s.releasetime = -1
}
// 頭結點沒構建,插入頭結點
if l.tail == nil {
l.head = s
} else {
// 插入到尾節點
l.tail.next = s
}
l.tail = s
// 將當前goroutine置於等待狀態並解鎖
// 通過呼叫goready(gp),可以使goroutine再次可執行。
// 也就是將 M/P/G 解綁,並將 G 調整為等待狀態,放入 sudog 等待佇列中
goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
if t0 != 0 {
blockevent(s.releasetime-t0, 2)
}
releaseSudog(s)
}
```
梳理流程
1、首先檢測物件的複製行為,如果有複製發生直接丟擲panic;
2、然後呼叫`runtime_notifyListAdd`對`notifynotifyListList`中的`wait`(需要等待的數量)進行加一操作,同時返回一個`ticket`,用來作為當前`wait`的編號,這個編號,會和`notifyList`中的`notify`對應起來;
3、然後呼叫`runtime_notifyListWait`把當前的`wait`封裝成連結串列的一個節點,插入到`notifyList`維護的連結串列的尾部。
#### Signal
```go
// 喚醒一個被wait的goroutine
func (c *Cond) Signal() {
// 監測是否複製
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}
// go/src/runtime/sema.go
// 通知連結串列中的第一個
//go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne
func notifyListNotifyOne(l *notifyList) {
// wait和notify,說明已經全部通知到了
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
lock(&l.lock)
// 這裡做了二次的確認
// wait和notify,說明已經全部通知到了
t := l.notify
if t == atomic.Load(&l.wait) {
unlock(&l.lock)
return
}
// 原子的對notify執行+1操作
atomic.Store(&l.notify, t+1)
// 嘗試找到需要被通知的 g
// 如果目前還沒來得及入隊,是無法找到的
// 但是,當它看到通知編號已經發生改變是不會被 park 的
//
// 這個查詢過程看起來是線性複雜度,但實際上很快就停了
// 因為 g 的佇列與獲取編號不同,因而佇列中會出現少量重排,但我們希望找到靠前的 g
// 而 g 只有在不再 race 後才會排在靠前的位置,因此這個迭代也不會太久,
// 同時,即便找不到 g,這個情況也成立:
// 它還沒有休眠,並且已經失去了我們在佇列上找到的(少數)其他 g 的 race。
for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
// 順序拿到一個節點的ticket,會和上面會和notifyList中的notify做比較,相同才進行後續的操作
// 這個我們分析了,notifyList中的notify和連結串列節點中的ticket是一一對應的
if s.ticket == t {
n := s.next
if p != nil {
p.next = n
} else {
l.head = n
}
if n == nil {
l.tail = p
}
unlock(&l.lock)
s.next = nil
// 通過goready掉起在上面通過goparkunlock掛起的goroutine
readyWithTime(s, 4)
return
}
}
unlock(&l.lock)
}
```
梳理下流程:
1、首先檢測物件的複製行為,如果有複製發生直接丟擲`panic`;
2、判斷`wait`和`notify`,如果兩者相同說明已經已經全部通知到了;
3、呼叫`notifyListNotifyOne`,通過for迴圈,依次遍歷這個連結串列,直到找到和`notifyList`中的`notify`,相匹配的`ticket`的節點;
4、掉起`goroutine`,完成通知。
#### Broadcast
```go
// 喚醒所有被wait的goroutine
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}
// go/src/runtime/sema.go
// notifyListNotifyAll notifies all entries in the list.
//go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll
func notifyListNotifyAll(l *notifyList) {
// wait和notify,說明已經全部通知到了
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
// 加鎖
lock(&l.lock)
s := l.head
l.head = nil
l.tail = nil
// 這個很粗暴,直接將notify的值置換成wait
atomic.Store(&l.notify, atomic.Load(&l.wait))
unlock(&l.lock)
// 迴圈連結串列,一個個喚醒goroutine
for s != nil {
next := s.next
s.next = nil
readyWithTime(s, 4)
s = next
}
}
```
梳理下流程:
1、首先檢測物件的複製行為,如果有複製發生直接丟擲panic;
2、判斷`wait`和`notify`,如果兩者相同說明已經已經全部通知到了;
3、`notifyListNotifyAll`,就相對簡單了,直接將`notify`的值置為`wait`,標註這個已經全部通知了;
4、迴圈連結串列,一個個喚醒`goroutine`。
### 總結
`sync.Cond`不是一個常用的同步機制,但是在條件長時間無法滿足時,與使用`for {}`進行忙碌等待相比,`sync.Cond`能夠讓出處理器的使用權,提供`CPU`的利用率。使用時我們也需要注意以下問題:
1、`sync.Cond.Wait`在呼叫之前一定要使用獲取互斥鎖,否則會觸發程式崩潰;
2、`sync.Cond.Signal` 喚醒的 `Goroutine`都是佇列最前面、等待最久的`Goroutine`;
3、`sync.Cond.Broadcast`會按照一定順序廣播通知等待的全部 `Gorou