1. 程式人生 > 實用技巧 >學習筆記 sync/Cond原始碼

學習筆記 sync/Cond原始碼

ond需要指定一個Locker,通常是一個*Mutex或*RWMutex。

func (c *Cond) Broadcast() 和 func (c *Cond) Signal() 喚醒因wait condition而掛起goroutine,區別是Signal只喚醒一個,而Broadcast喚醒所有。允許呼叫者獲取基礎鎖Locker之後再呼叫喚醒,但非必需。

func (c *Cond) Wait()方法在呼叫時會釋放底層鎖Locker,並且將當前goroutine掛起,直到另一個goroutine執行Signal或者Broadcase,該goroutine才有機會重新喚醒,並嘗試獲取Locker,完成後續邏輯。

使用Wait 方法之前,我們必須先獲取外部鎖,原因是:先當前協程佔有著鎖,並掛起當前協程等待,其他協程的 通知喚醒,好走後續的業務邏輯,(佔有著鎖,是不想別人拿到鎖,而自己走不到Wait這一步,而Wait是掛起了當前協程,等待別人通知,這樣做,就知道只要通知一來,肯定是當前協程可以繼續往下走了),這裡自己通過對比 Wait的使用及Wait的原始碼自己就明白了,使用示例:

package main
 
import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)
 
var locker = new(sync.Mutex)
var cond = sync.NewCond(locker)
 
var capacity = 10 var consumerNum = 3 var producerNum = 5 func producer(out chan<- int) { for i := 0; i < producerNum; i++ { go func(nu int) { for { cond.L.Lock() for len(out) == capacity { fmt.Println("Capacity Full, stop Produce
") cond.Wait() } num := rand.Intn(100) out <- num fmt.Printf("Produce %d produce: num %d\n", nu, num) cond.L.Unlock() cond.Signal() time.Sleep(time.Second) } }(i) } } func consumer(in <-chan int) { for i := 0; i < consumerNum; i++ { go func(nu int) { for { cond.L.Lock() for len(in) == 0 { fmt.Println("Capacity Empty, stop Consume") cond.Wait() } num := <-in fmt.Printf("Goroutine %d: consume num %d\n", nu, num) cond.L.Unlock() time.Sleep(time.Millisecond * 500) cond.Signal() } }(i) } } func main() { rand.Seed(time.Now().UnixNano()) quit := make(chan bool) product := make(chan int, capacity) producer(product) consumer(product) <-quit }

sync/Cond.go原始碼

package sync
 
import (
    "sync/atomic"
    "unsafe"
)
 
// Cond implements a condition variable, a rendezvous point
// for goroutines waiting for or announcing the occurrence
// of an event.
//
// Each Cond has an associated Locker L (often a *Mutex or *RWMutex),
// which must be held when changing the condition and
// when calling the Wait method.
//
// A Cond must not be copied after first use.
// Cond實現了一個條件變數,一個等待或宣佈事件發生的goroutines的集合點。
// 每個Cond都有一個相關的Locker L(通常是* Mutex或* RWMutex)
type Cond struct {
    // 不允許複製,一個結構體,有一個Lock()方法,嵌入別的結構體中,表示不允許複製
    // noCopy物件,擁有一個Lock方法,使得Cond物件在進行go vet掃描的時候,能夠被檢測到是否被複制
    noCopy noCopy
 
    // L is held while observing or changing the condition
    // 鎖的具體實現,通常為 mutex 或者rwmutex
    L Locker
 
    // 通知列表,呼叫Wait()方法的goroutine會被放入list中,每次喚醒,從這裡取出
    // notifyList物件,維護等待喚醒的goroutine佇列,使用連結串列實現
    // 在 sync 包中被實現, src/sync/runtime.go
    notify  notifyList
 
    // 複製檢查,檢查cond例項是否被複制
    // copyChecker物件,實際上是uintptr物件,儲存自身物件地址
    checker copyChecker
}
 
// NewCond returns a new Cond with Locker l.
// NewCond方法傳入一個實現了Locker介面的物件,返回一個新的Cond物件指標,
// 保證在多goroutine使用cond的時候,持有的是同一個例項
func NewCond(l Locker) *Cond {
    return &Cond{L: l}
}
 
// Wait atomically unlocks c.L and suspends execution
// of the calling goroutine. After later resuming execution,
// Wait locks c.L before returning. Unlike in other systems,
// Wait cannot return unless awoken by Broadcast or Signal.
//
// Because c.L is not locked when Wait first resumes, the caller
// typically cannot assume that the condition is true when
// Wait returns. Instead, the caller should Wait in a loop:
// 等待原子解鎖c.L並暫停執行呼叫goroutine。
// 稍後恢復執行後,Wait會在返回之前鎖定c.L.
// 與其他系統不同,除非被廣播或訊號喚醒,否則等待無法返回。
// 因為等待第一次恢復時c.L沒有被鎖定,
// 所以當Wait返回時,呼叫者通常不能認為條件為真。
// 相反,呼叫者應該迴圈等待:
//
//    c.L.Lock()
//    for !condition() {
//        c.Wait()
//    }
//    ... make use of condition ...
//    c.L.Unlock()
//
//呼叫此方法會將此routine加入通知列表,並等待獲取通知,呼叫此方法必須先Lock,不然方法裡會呼叫Unlock(),報錯
func (c *Cond) Wait() {
    // 檢查是否被複制; 如果是就panic
    // check檢查,保證cond在第一次使用後沒有被複制
    c.checker.check()
    // 將當前goroutine加入等待佇列, 該方法在 runtime 包的 notifyListAdd 函式中實現 src/runtime/sema.go
    t := runtime_notifyListAdd(&c.notify)
    // 釋放鎖, 因此在呼叫Wait方法前,必須保證獲取到了cond的鎖,否則會報錯
    c.L.Unlock()
 
    // 等待佇列中的所有的goroutine執行等待喚醒操作
    // 將當前goroutine掛起,等待喚醒訊號
    // 該方法在 runtime 包的 notifyListWait 函式中實現 src/runtime/sema.go
    runtime_notifyListWait(&c.notify, t)
    c.L.Lock()
}
 
// Signal wakes one goroutine waiting on c, if there is any.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
// 喚醒單個 等待的 goroutine
func (c *Cond) Signal() {
    c.checker.check()
    // 通知等待列表中的一個, 順序喚醒一個等待的gorountine
    // 在runtime 包的 notifyListNotifyOne 函式中被實現 src/runtime/sema.go
    runtime_notifyListNotifyOne(&c.notify)
}
 
// Broadcast wakes all goroutines waiting on c.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
// 喚醒等待佇列中的所有goroutine。
func (c *Cond) Broadcast() {
    c.checker.check()
    // 喚醒等待佇列中所有的goroutine
    // 有runtime 包的 notifyListNotifyAll 函式實現 src\runtime\sema.go
    runtime_notifyListNotifyAll(&c.notify)
}
 
// copyChecker holds back pointer to itself to detect object copying.
// copyChecker保持指向自身的指標以檢測物件複製。
type copyChecker uintptr
// 檢查c是否被複制,如果是則panic
//check方法在第一次呼叫的時候,會將checker物件地址賦值給checker,也就是將自身記憶體地址賦值給自身
func (c *copyChecker) check() {
    /**
    因為 copyChecker的底層型別為 uintptr
    那麼 這裡的 *c其實就是 copyChecker型別本身,然後強轉成uintptr
    和拿著 c 也就是copyChecker的指標去求 uintptr,理論上要想等
    即:記憶體地址為一樣,則表示沒有被複制
     */
     // 下述做法是:
     // 其實 copyChecker中儲存的物件地址就是 copyChecker 物件自身的地址
     // 先把 copyChecker 處儲存的物件地址和自己通過 unsafe.Pointer求出來的物件地址作比較,
     // 如果發現不相等,那麼就嘗試的替換,由於使用的 old是0,
     // 則表示c還沒有開闢記憶體空間,也就是說,只有是首次開闢地址才會替換成功
     // 如果替換不成功,則表示 copyChecker出所儲存的地址和 unsafe計算出來的不一致
     // 則表示物件是被複制了
    if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
        !atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
        uintptr(*c) != uintptr(unsafe.Pointer(c)) {
        panic("sync.Cond is copied")
    }
}
 
// noCopy may be embedded into structs which must not be copied
// after the first use.
//
// See https://golang.org/issues/8005#issuecomment-190753527
// for details.
// noCopy可以嵌入到結構中,在第一次使用後不得複製。
type noCopy struct{}
 
// Lock is a no-op used by -copylocks checker from `go vet`.
func (*noCopy) Lock()   {}
func (*noCopy) Unlock() {}
 
 
type notifyList struct {
    wait   uint32
    notify uint32
    lock   uintptr // key field of the mutex
    head   unsafe.Pointer
    tail   unsafe.Pointer
}

我們可以看出,其中

  • Cond不能被複制:Cond在內部持有一個等待佇列,這個佇列維護所有等待在這個Cond的goroutine。因此若這個Cond允許值傳遞,則這個佇列在值傳遞的過程中會進行復制,導致在喚醒goroutine的時候出現錯誤。

  • 順序喚醒: notifyList物件持有兩個無限自增的欄位wait和notify,wait欄位在有新的goroutine等待的時候加1,notify欄位在有新的喚醒訊號的時候加1。在有新的goroutine加入佇列的時候,會將當前wait賦值給goroutine的ticket,喚醒的時候會喚醒ticket等於notify的gourine。另外,當wait==notify時表示沒有goroutine需要被喚醒,wait>notify時表示有goroutine需要被喚醒,waity恆大於等於notify

Wait: