學習筆記 sync/Cond原始碼
ond需要指定一個Locker,通常是一個*Mutex或*RWMutex。
func (c *Cond) Broadcast() 和 func (c *Cond) Signal() 喚醒因wait condition而掛起goroutine,區別是Signal只喚醒一個,而Broadcast喚醒所有。允許呼叫者獲取基礎鎖Locker之後再呼叫喚醒,但非必需。
func (c *Cond) Wait()方法在呼叫時會釋放底層鎖Locker,並且將當前goroutine掛起,直到另一個goroutine執行Signal或者Broadcase,該goroutine才有機會重新喚醒,並嘗試獲取Locker,完成後續邏輯。
使用Wait 方法之前,我們必須先獲取外部鎖,原因是:先當前協程佔有著鎖,並掛起當前協程等待,其他協程的 通知喚醒,好走後續的業務邏輯,(佔有著鎖,是不想別人拿到鎖,而自己走不到Wait這一步,而Wait是掛起了當前協程,等待別人通知,這樣做,就知道只要通知一來,肯定是當前協程可以繼續往下走了),這裡自己通過對比 Wait的使用及Wait的原始碼自己就明白了,使用示例:
package main import ( "fmt" "math/rand" "sync" "time" ) var locker = new(sync.Mutex) var cond = sync.NewCond(locker)var capacity = 10 var consumerNum = 3 var producerNum = 5 func producer(out chan<- int) { for i := 0; i < producerNum; i++ { go func(nu int) { for { cond.L.Lock() for len(out) == capacity { fmt.Println("Capacity Full, stop Produce") cond.Wait() } num := rand.Intn(100) out <- num fmt.Printf("Produce %d produce: num %d\n", nu, num) cond.L.Unlock() cond.Signal() time.Sleep(time.Second) } }(i) } } func consumer(in <-chan int) { for i := 0; i < consumerNum; i++ { go func(nu int) { for { cond.L.Lock() for len(in) == 0 { fmt.Println("Capacity Empty, stop Consume") cond.Wait() } num := <-in fmt.Printf("Goroutine %d: consume num %d\n", nu, num) cond.L.Unlock() time.Sleep(time.Millisecond * 500) cond.Signal() } }(i) } } func main() { rand.Seed(time.Now().UnixNano()) quit := make(chan bool) product := make(chan int, capacity) producer(product) consumer(product) <-quit }
sync/Cond.go原始碼
package sync import ( "sync/atomic" "unsafe" ) // Cond implements a condition variable, a rendezvous point // for goroutines waiting for or announcing the occurrence // of an event. // // Each Cond has an associated Locker L (often a *Mutex or *RWMutex), // which must be held when changing the condition and // when calling the Wait method. // // A Cond must not be copied after first use. // Cond實現了一個條件變數,一個等待或宣佈事件發生的goroutines的集合點。 // 每個Cond都有一個相關的Locker L(通常是* Mutex或* RWMutex) type Cond struct { // 不允許複製,一個結構體,有一個Lock()方法,嵌入別的結構體中,表示不允許複製 // noCopy物件,擁有一個Lock方法,使得Cond物件在進行go vet掃描的時候,能夠被檢測到是否被複制 noCopy noCopy // L is held while observing or changing the condition // 鎖的具體實現,通常為 mutex 或者rwmutex L Locker // 通知列表,呼叫Wait()方法的goroutine會被放入list中,每次喚醒,從這裡取出 // notifyList物件,維護等待喚醒的goroutine佇列,使用連結串列實現 // 在 sync 包中被實現, src/sync/runtime.go notify notifyList // 複製檢查,檢查cond例項是否被複制 // copyChecker物件,實際上是uintptr物件,儲存自身物件地址 checker copyChecker } // NewCond returns a new Cond with Locker l. // NewCond方法傳入一個實現了Locker介面的物件,返回一個新的Cond物件指標, // 保證在多goroutine使用cond的時候,持有的是同一個例項 func NewCond(l Locker) *Cond { return &Cond{L: l} } // Wait atomically unlocks c.L and suspends execution // of the calling goroutine. After later resuming execution, // Wait locks c.L before returning. Unlike in other systems, // Wait cannot return unless awoken by Broadcast or Signal. // // Because c.L is not locked when Wait first resumes, the caller // typically cannot assume that the condition is true when // Wait returns. Instead, the caller should Wait in a loop: // 等待原子解鎖c.L並暫停執行呼叫goroutine。 // 稍後恢復執行後,Wait會在返回之前鎖定c.L. // 與其他系統不同,除非被廣播或訊號喚醒,否則等待無法返回。 // 因為等待第一次恢復時c.L沒有被鎖定, // 所以當Wait返回時,呼叫者通常不能認為條件為真。 // 相反,呼叫者應該迴圈等待: // // c.L.Lock() // for !condition() { // c.Wait() // } // ... make use of condition ... // c.L.Unlock() // //呼叫此方法會將此routine加入通知列表,並等待獲取通知,呼叫此方法必須先Lock,不然方法裡會呼叫Unlock(),報錯 func (c *Cond) Wait() { // 檢查是否被複制; 如果是就panic // check檢查,保證cond在第一次使用後沒有被複制 c.checker.check() // 將當前goroutine加入等待佇列, 該方法在 runtime 包的 notifyListAdd 函式中實現 src/runtime/sema.go t := runtime_notifyListAdd(&c.notify) // 釋放鎖, 因此在呼叫Wait方法前,必須保證獲取到了cond的鎖,否則會報錯 c.L.Unlock() // 等待佇列中的所有的goroutine執行等待喚醒操作 // 將當前goroutine掛起,等待喚醒訊號 // 該方法在 runtime 包的 notifyListWait 函式中實現 src/runtime/sema.go runtime_notifyListWait(&c.notify, t) c.L.Lock() } // Signal wakes one goroutine waiting on c, if there is any. // // It is allowed but not required for the caller to hold c.L // during the call. // 喚醒單個 等待的 goroutine func (c *Cond) Signal() { c.checker.check() // 通知等待列表中的一個, 順序喚醒一個等待的gorountine // 在runtime 包的 notifyListNotifyOne 函式中被實現 src/runtime/sema.go runtime_notifyListNotifyOne(&c.notify) } // Broadcast wakes all goroutines waiting on c. // // It is allowed but not required for the caller to hold c.L // during the call. // 喚醒等待佇列中的所有goroutine。 func (c *Cond) Broadcast() { c.checker.check() // 喚醒等待佇列中所有的goroutine // 有runtime 包的 notifyListNotifyAll 函式實現 src\runtime\sema.go runtime_notifyListNotifyAll(&c.notify) } // copyChecker holds back pointer to itself to detect object copying. // copyChecker保持指向自身的指標以檢測物件複製。 type copyChecker uintptr // 檢查c是否被複制,如果是則panic //check方法在第一次呼叫的時候,會將checker物件地址賦值給checker,也就是將自身記憶體地址賦值給自身 func (c *copyChecker) check() { /** 因為 copyChecker的底層型別為 uintptr 那麼 這裡的 *c其實就是 copyChecker型別本身,然後強轉成uintptr 和拿著 c 也就是copyChecker的指標去求 uintptr,理論上要想等 即:記憶體地址為一樣,則表示沒有被複制 */ // 下述做法是: // 其實 copyChecker中儲存的物件地址就是 copyChecker 物件自身的地址 // 先把 copyChecker 處儲存的物件地址和自己通過 unsafe.Pointer求出來的物件地址作比較, // 如果發現不相等,那麼就嘗試的替換,由於使用的 old是0, // 則表示c還沒有開闢記憶體空間,也就是說,只有是首次開闢地址才會替換成功 // 如果替換不成功,則表示 copyChecker出所儲存的地址和 unsafe計算出來的不一致 // 則表示物件是被複制了 if uintptr(*c) != uintptr(unsafe.Pointer(c)) && !atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) && uintptr(*c) != uintptr(unsafe.Pointer(c)) { panic("sync.Cond is copied") } } // noCopy may be embedded into structs which must not be copied // after the first use. // // See https://golang.org/issues/8005#issuecomment-190753527 // for details. // noCopy可以嵌入到結構中,在第一次使用後不得複製。 type noCopy struct{} // Lock is a no-op used by -copylocks checker from `go vet`. func (*noCopy) Lock() {} func (*noCopy) Unlock() {} type notifyList struct { wait uint32 notify uint32 lock uintptr // key field of the mutex head unsafe.Pointer tail unsafe.Pointer }
我們可以看出,其中
-
Cond不能被複制:Cond在內部持有一個等待佇列,這個佇列維護所有等待在這個Cond的goroutine。因此若這個Cond允許值傳遞,則這個佇列在值傳遞的過程中會進行復制,導致在喚醒goroutine的時候出現錯誤。
-
順序喚醒: notifyList物件持有兩個無限自增的欄位wait和notify,wait欄位在有新的goroutine等待的時候加1,notify欄位在有新的喚醒訊號的時候加1。在有新的goroutine加入佇列的時候,會將當前wait賦值給goroutine的ticket,喚醒的時候會喚醒ticket等於notify的gourine。另外,當wait==notify時表示沒有goroutine需要被喚醒,wait>notify時表示有goroutine需要被喚醒,waity恆大於等於notify
Wait: