Go36-27,28-條件變量
條件變量(conditional variable),和互斥鎖一樣,也是一個同步工具。我們常常會把條件變量與互斥鎖一起討論。實際上,條件變量是基於互斥鎖的,它必須有互斥鎖的支撐才能發揮作用。
作用
條件變量並不是被用來保護臨界區和共享資源的,它是用於協調想要訪問共享資源的那些線程的。當共享資源的狀態發生變化時,它可以被用來通知被互斥鎖阻塞的線程。
使用條件變量的最大優勢就是在效率方面的提升。當共享資源的狀態不滿足條件的時候,想操作它的線程再也不用循環往復的做檢查了,只要等待通知就好了。
使用條件變量
條件變量需要與互斥鎖配合使用。條件變量的初始化需要互斥鎖,並且它的方法有的也是基於互斥鎖的。
- 等待通知(wait)
- 單發通知(signal)
- 廣播通知(broadcast)
在利用條件變量等待通知的時候,需要在它基於的那個互斥鎖的保護下進行。
在進行單發通知或光爆通知的時候,需要在對應的互斥鎖解鎖之後再做操作。
創建條件變量
結合代碼理解上面的含義,先創建幾個變量:
var lock sync.RWMutex
sendCond := sync.NewCond(&lock)
recvCond := sync.NewCond(lock.RLocker())
條件變量的類型
lock是一個讀寫鎖,基於這把鎖,創建了2個代表條件變量的變量,這兩個變量的類型是*sync.Cond,是由sync.NewCond函數來初始化的。
初始化
與互斥鎖鎖不同,這裏不是開箱即用的,只能使用sync.NewCond函數來創建它的指針值,這個函數需要一個sync.Locker類型的參數。
前面說過,條件變量是基於互斥鎖的,它必須有互斥鎖的支持才能夠起作用。因此,這裏的參數是必須的,它也會參與到條件變量的方法實現中去。
sync.Locker接口
sync.Locker其實是一個接口,包含兩個方法Lock()和Unlock():
type Locker interface {
Lock()
Unlock()
}
sync.Mutex類型sync,RWMutex類型都擁有這兩個方法,不過都是指針方法。因此這兩個類型的指針類型才是sync.Locker接口的實現類型。
初始化的過程
在為sendCond初始化的時候,把lock變量的指針作為參數。這裏lock變量的Lock方法和Unlock方法分別用於對其中寫鎖的鎖定和解鎖。這裏與實現接口的兩個方法的名稱是對應的。
在為recvCond初始化的時候,需要的是lock變量的讀鎖,並且還得是sync.Locker接口類型,就是要實現了Lock和Unlock方法的讀鎖。可是lock變量中用於讀鎖的方法卻是RLock方法和RUnlock方法,這裏名稱不對應了。不過有一個RLocker方法可以實現這一需求,下面是源碼裏實現的部分,很簡單:
// RLocker returns a Locker interface that implements
// the Lock and Unlock methods by calling rw.RLock and rw.RUnlock.
func (rw *RWMutex) RLocker() Locker {
return (*rlocker)(rw)
}
type rlocker RWMutex
func (r *rlocker) Lock() { (*RWMutex)(r).RLock() }
func (r *rlocker) Unlock() { (*RWMutex)(r).RUnlock() }
這裏我有一些小疑惑,3個方法裏面都是類型斷言吧。RLocker方法把原來的讀寫鎖類型轉成一個新的類型然後返回。後面的兩個方法,為了用新類型調用讀寫鎖類型裏的方法,先進行類型斷言,轉成讀寫鎖原本的類型,然後調用它的方法。
使用條件變量
下面是截取的使用時的部分代碼:
lock.Lock()
for !isEmpty {
sendCond.Wait()
}
isEmpty = false
// 這裏可以做寫入的操作
lock.Unlock()
recvCond.Signal()
上面是一個寫入的流程。之前的代碼定義了一個狀態變量isEmpty,只有狀態為空的時候,才允許寫入,寫入後把狀態設置為非空。
這裏要先調用Lock方法,等待通知(wait)是要在互斥鎖的保護下進行的。
然後再操作完之後,先調用Unlock方法,再發送通知,發送通知的操作要在互斥鎖解鎖之後。
這裏等待的出sendCond的信號,而最後發送的是recvCond的信號。在另一個讀取的流程裏則正好相反。利用條件變量可以實現單向的通知,而這裏要實現雙向的通知,就需要兩個條件變量。這是條件變量的基本使用原則。
示例代碼
上面把關鍵的代碼分析了一下,下面是完整的示例代碼:
package main
import (
"fmt"
"sync"
"time"
"flag"
)
var useCond bool
func init() {
flag.BoolVar(&useCond, "cond", false, "是否使用條件變量")
}
type msgBox struct {
message string
isEmpty bool
sendCond *sync.Cond
recvCond *sync.Cond
}
func main() {
flag.Parse()
fmt.Println("是否開啟了條件變量保護:", useCond)
var lock sync.RWMutex
msgBox := msgBox{
isEmpty: true, // 默認值是false,狀態初始值應該為true
sendCond: sync.NewCond(&lock), // 不是開箱即用的,需要在使用前初始化
recvCond: sync.NewCond(lock.RLocker()),
}
done := make(chan struct{})
max := 5
// 寫操作的goroutine
go func(max int) {
defer func() {
done <- struct{}{}
}()
for i := 0; i < max; i++ {
time.Sleep(time.Millisecond * 200)
// 先進行保護
lock.Lock()
// 再等待通知
for useCond && !msgBox.isEmpty {
msgBox.sendCond.Wait()
}
msgBox.isEmpty = false
msg := fmt.Sprintf("第 %d 條消息", i)
msgBox.message = msg
fmt.Printf("發送消息[%d]: %s\n", i, msg)
// 先解鎖
lock.Unlock()
// 再發送通知
msgBox.recvCond.Signal()
}
}(max)
// 讀操作的goroutine
go func(max int) {
defer func() {
done <- struct{}{}
}()
for j := 0; j < max; j++ {
time.Sleep(time.Millisecond * 500)
lock.RLock()
for useCond && msgBox.isEmpty {
msgBox.recvCond.Wait()
}
msgBox.isEmpty = true
msg := msgBox.message
fmt.Printf("接收消息[%d]: %s\n", j, msg)
lock.RUnlock()
msgBox.sendCond.Signal()
}
}(max)
<-done
<-done
fmt.Println("Over")
}
代碼中條件變量的作用
在這個例子裏,寫的時候要獲取到寫鎖,讀的時候要獲取到讀鎖,這個邏輯和之前互斥鎖是一樣的。但是只是獲取到鎖還不能做操作,這裏還要再做一個限制,所以就用到了條件變量。
在這個例子裏,寫操作和讀操作是需要成對出現的。寫完一次之後,依然能獲取到寫鎖,但是不能立刻寫。而是要等待讀操作把之前寫入的數據讀過之後,才能再次寫入,把之前的內容覆蓋掉。讀操作也是一樣。這裏就需要兩個goroutine之間傳遞信號了。
通過命令行參數分別在開啟/關閉條件變量的環境下運行,可以看到其中的作用:
go run main.go
go run main.go -cond
Wait方法
條件變量的Wait方法主要做了4件事:
- 把調用它的goroutine加入到當前條件變量的通知隊列中
- 解鎖當前的條件變量基於的那個互斥鎖
- 讓當前的goroutine處於等待狀態,等到通知來了再決定是否喚醒它。此時阻塞在調用Wait方法的那行代碼上
- 如果通知來了並且決定喚醒當前goroutine,就在喚醒它之後重新鎖定當前條件變量基於的互斥鎖
先解鎖,在阻塞
在Wait方法裏,必須要先解鎖,在阻塞當前goroutine。否則就違背了互斥鎖要成對出現的原則。並且當前goroutine在解鎖千就阻塞的話,當前goroutine就不可能在執行解鎖了。即使不考慮原則,讓別的goroutine來解鎖,又會有重復解鎖可能。
使用for語句
並且Wait方法建議是放在一個for循環裏的。這裏似乎也是可以用if語句的。但是if語句只能檢查狀態一次,而for的話可以進行多次檢查。如果goroutine收到了通知而喚醒,但是此時檢查時發現狀態還是不對,那麽就應該再次調用Wait方法。保險起見,在包裹條件變量的Wait方法總是應該使用for語句。
Signal方法和Broadcast方法
這2個方法都是用來發送通知的。Signal方法的通知只會喚醒一個goroutine,而Broadcast方法的通知會喚醒所有等待的goroutine。Wait方法會把當前的goroutine添加到通知隊列的隊尾,而Signal方法會從通知隊列的隊首開始查找可以被喚醒的goroutine。因此Signal方法喚醒的一般是最早等待的那個goroutine。
適用場景
這2個方法的行為決定他們的適用場景。確定只有一個goroutine在等待通知,或者值需要喚醒一個goroutine的時候,就使用Signal方法。否則,使用Broadcast方法總是沒錯的,Broadcast方法的適用場景更多。
通知的即時性
條件變量的通知具有即時性。如果發送通知的時候沒有goroutine在等待,那麽該次通知就會被直接丟棄。之後再開始等待的goroutine需要等待之後的通知。
示例代碼2
還是前面那個示例,稍微改了改,把讀寫鎖換成了互斥鎖,通知方法把Signal換成了Broadcast:
package main
import (
"fmt"
"sync"
"time"
)
var lock sync.Mutex
// 匿名結構體,定義並初始化賦值
// 嵌入式鎖(Embedded lock)的場景適合使用匿名結構體
var msgBox = struct {
message string
isEmpty bool
sendCond *sync.Cond
recvCond *sync.Cond
}{
isEmpty: true,
sendCond: sync.NewCond(&lock),
recvCond: sync.NewCond(&lock),
}
// 用於設置消息的函數
func send(id, index int) {
lock.Lock()
for !msgBox.isEmpty {
msgBox.sendCond.Wait()
}
msg := fmt.Sprintf("msg: [%d-%d]", id, index)
msgBox.message = msg
fmt.Printf("發送消息[%d-%d]: %s\t", id, index, msg)
msgBox.isEmpty = false
lock.Unlock()
msgBox.recvCond.Broadcast()
}
// 用於讀取消息的函數
func recv(id, index int) {
lock.Lock()
for msgBox.isEmpty {
msgBox.recvCond.Wait()
}
msg := msgBox.message
msgBox.message = ""
fmt.Printf("接收消息[%d-%d]: %s\n", id, index, msg)
msgBox.isEmpty = true
lock.Unlock()
msgBox.sendCond.Broadcast()
}
func main() {
done := make(chan struct{})
count := 5
// 啟動一個goroutine用於發送
go func(id, count int) {
defer func() {
done <- struct{}{}
}()
for i := 0; i < count; i++ {
time.Sleep(time.Millisecond * 100)
send(id, i)
}
}(0, count * 2)
// 啟動兩個goroutine用於接收
go func(id, count int) {
defer func() {
done <- struct{}{}
}()
for i := 0; i < count; i++ {
time.Sleep(time.Millisecond * 300)
recv(id, i)
}
}(1, count)
go func(id, count int) {
defer func() {
done <- struct{}{}
}()
for i := 0; i < count; i++ {
time.Sleep(time.Millisecond * 400)
recv(id, i)
}
}(2, count)
<- done
<- done
<- done
fmt.Println("Over")
}
Go36-27,28-條件變量