Golang標準庫深入 - 鎖、訊號量(sync)
概述
sync包提供了基本的同步基元,如互斥鎖。除了Once和WaitGroup型別,大部分都是適用於低水平程式執行緒,高水平的同步使用channel通訊更好一些。
本包的型別的值不應被拷貝。
雖然文件解釋可能不夠深入,或者淺顯易懂,但是我覺得還是貼出來,對比了解可能會更好。
Go語言中實現併發或者是建立一個goroutine很簡單,只需要在函式前面加上"go",就可以了,那麼併發中,如何實現多個goroutine之間的同步和通訊?答: channel 我是第一個想到的, sync, 原子操作atomic等都可以。
詳解:
首先我們先來介紹一下sync包下的各種型別。那麼我們先來羅列一下sync包下所有的型別吧。
1. Cond 條件等待
type Cond struct { // L is held while observing or changing the condition L Locker // contains filtered or unexported fields }
解釋:
Cond實現了一個條件變數,一個執行緒集合地,供執行緒等待或者宣佈某事件的發生。
每個Cond例項都有一個相關的鎖(一般是*Mutex或*RWMutex型別的值),它必須在改變條件時或者呼叫Wait方法時保持鎖定。Cond可以建立為其他結構體的欄位,Cond在開始使用後不能被拷貝。
條件等待通過Wait讓例程等待,通過Signal讓一個等待的例程繼續,通過Broadcase讓所有等待的繼續。
在Wait之前需要手動為c.L上鎖, Wait結束了手動解鎖。為避免虛假喚醒, 需要將Wait放到一個條件判斷的迴圈中,官方要求寫法:
c.L.Lock() for !condition() { c.Wait() } // 執行條件滿足之後的動作... c.L.Unlock()
成員文件:
type Cond struct { L Locker // 在“檢查條件”或“更改條件”時 L 應該鎖定。 } // 建立一個條件等待func NewCond(l Locker) *Cond // Broadcast 喚醒所有等待的 Wait,建議在“更改條件”時鎖定 c.L,更改完畢再解鎖。 func (c *Cond) Broadcast() // Signal 喚醒一個等待的 Wait,建議在“更改條件”時鎖定 c.L,更改完畢再解鎖。 func (c *Cond) Signal() // Wait 會解鎖 c.L 並進入等待狀態,在被喚醒時,會重新鎖定 c.L func (c *Cond) Wait()
程式碼示例:
package main import ( "fmt" "sync" "time" ) func main() { condition := false // 條件不滿足 var mu sync.Mutex cond := sync.NewCond(&mu) // 建立一個Cond //讓協程去創造條件 go func() { mu.Lock() condition = true // 改寫條件 time.Sleep(3 * time.Second) cond.Signal() // 傳送通知:條件ok mu.Unlock() }() mu.Lock() // 檢查條件是否滿足,避免虛假通知,同時避免 Signal 提前於 Wait 執行。 for !condition { // 如果Signal提前執行了,那麼此處就是false了 // 等待條件滿足的通知,如果虛假通知,則繼續迴圈等待 cond.Wait() // 等待時 mu 處於解鎖狀態,喚醒時重新鎖定。 (阻塞當前執行緒) } fmt.Println("條件滿足,開始後續動作...") mu.Unlock() }
2. Locker
type Locker interface { Lock() Unlock() }
Locker介面代表一個可以加鎖和解鎖的物件。 是一個介面。
3. Mutex 互斥鎖
type Mutex struct { // contains filtered or unexported fields }
解釋:
Mutex 是互斥鎖。Mutex 的零值是一個解鎖的互斥鎖。第一次使用後不得複製 Mutex。
互斥鎖是用來保證在任一時刻, 只能有一個例程訪問某個物件。 Mutex的初始值為解鎖的狀態。 通常作為其他結構體的你名欄位使用, 並且可以安全的在多個例程中並行使用。
成員文件:
// Lock 用於鎖住 m,如果 m 已經被加鎖,則 Lock 將被阻塞,直到 m 被解鎖。 func (m *Mutex) Lock() // Unlock 用於解鎖 m,如果 m 未加鎖,則該操作會引發 panic。 func (m *Mutex) Unlock()
程式碼示例:
package main import ( "fmt" "sync" ) type SafeInt struct { sync.Mutex Num int } func main() { waitNum := 10 // 設定等待的個數(繼續往下看) count := SafeInt{} done := make(chan bool) for i := 0; i < waitNum; i++ { go func(i int) { count.Lock() // 加鎖,防止其它例程修改 count count.Num = count.Num + i fmt.Print(count.Num, " ") count.Unlock() done <- true }(i) } for i := 0; i < waitNum; i++ { <-done } } [ `go run sync_mutex.go` | done: 216.47974ms ] 1 4 8 8 10 15 21 30 37 45
注意:多次輸出結果不一致, 試想為什麼會出現10個結果中有0值得, 為什麼10個結果中都大於0呢?或者都大於1呢? 那麼會不會出現10個結果中最小值是9 呢?
4. Once 單次執行
type Once struct { // contains filtered or unexported fields }
解釋:
Once是隻執行一次動作的物件。
Once 的作用是多次呼叫但只執行一次,Once 只有一個方法,Once.Do(),向 Do 傳入一個函式,這個函式在第一次執行 Once.Do() 的時候會被呼叫,以後再執行 Once.Do() 將沒有任何動作,即使傳入了其它的函式,也不會被執行,如果要執行其它函式,需要重新建立一個 Once 物件。
成員文件:
// 多次呼叫僅執行一次指定的函式 f func (o *Once) Do(f func())
程式碼示例:
package main // 官方案例 import ( "fmt" "sync" ) func main() { var once sync.Once var num int onceBody := func() { fmt.Println("Only once") } done := make(chan bool) for i := 0; i < 10; i++ { go func() { once.Do(onceBody) // 多次呼叫 done <- true }() } for i := 0; i < 10; i++ { <-done } }
5.RWMutex讀寫互斥鎖
type RWMutex struct { // 包含隱藏或非匯出欄位 }
解釋:
RWMutex是讀寫互斥鎖。該鎖可以被同時多個讀取者持有或唯一個寫入者持有。RWMutex可以建立為其他結構體的欄位;零值為解鎖狀態。RWMutex型別的鎖也和執行緒無關,可以由不同的執行緒加讀取鎖/寫入和解讀取鎖/寫入鎖。
Mutex 可以安全的在多個例程中並行使用。
成員文件:
// Lock 將 rw 設定為寫鎖定狀態,禁止其他例程讀取或寫入。 func (rw *RWMutex) Lock() // Unlock 解除 rw 的寫鎖定狀態,如果 rw 未被寫鎖定,則該操作會引發 panic。 func (rw *RWMutex) Unlock() // RLock 將 rw 設定為讀鎖定狀態,禁止其他例程寫入,但可以讀取。 func (rw *RWMutex) RLock() // Runlock 解除 rw 的讀鎖定狀態,如果 rw 未被讀鎖頂,則該操作會引發 panic。 func (rw *RWMutex) RUnlock() // RLocker 返回一個互斥鎖,將 rw.RLock 和 rw.RUnlock 封裝成了一個 Locker 介面。 func (rw *RWMutex) RLocker() Locker
注意,Lock()鎖定時,其他例程是無法讀寫的。
1.可以讀時,多個goroutine可以同時讀。
2.寫的時候,其他goroutine不可讀也不可寫。
程式碼例項:
package main import ( "fmt" "sync" "time" ) var m *sync.RWMutex var wg sync.WaitGroup func main() { m = new(sync.RWMutex) wg.Add(2) go write(1) time.Sleep(1 * time.Second) go read(2) wg.Wait() } func write(i int) { fmt.Println(i, "寫開始.") m.Lock() fmt.Println(i, "正在寫入中......") time.Sleep(3 * time.Second) m.Unlock() fmt.Println(i, "寫入結束.") wg.Done() } func read(i int) { fmt.Println(i, "讀開始.") m.RLock() fmt.Println(i, "正在讀取中......") time.Sleep(1 * time.Second) m.RUnlock() fmt.Println(i, "讀取結束.") wg.Done() } > Output: command-line-arguments 1 寫開始. 1 正在寫入中...... 2 讀開始. 1 寫入結束. 2 正在讀取中...... 2 讀取結束. > Elapsed: 4.747s > Result: Success
當寫入開始時,加寫鎖開始寫入,一秒後,讀取goroutine開始讀取,發現有寫入鎖,只能等待。 2秒後寫入完成, 解除寫鎖,讀取開始加鎖,直到讀取完成。
圖解:
6.WaitGroup組等待
type WaitGroup struct { // contains filtered or unexported fields }
解釋:
WaitGroup用於等待一組執行緒的結束。父執行緒呼叫Add方法來設定應等待的執行緒的數量。每個被等待的執行緒在結束時應呼叫Done方法。同時,主執行緒裡可以呼叫Wait方法阻塞至所有執行緒結束(計數器歸零)。
成員文件:
// 計數器增加 delta,delta 可以是負數。 func (wg *WaitGroup) Add(delta int) // 計數器減少 1 func (wg *WaitGroup) Done() // 等待直到計數器歸零。如果計數器小於 0,則該操作會引發 panic。 func (wg *WaitGroup) Wait()
程式碼例項:
func main() { wg := sync.WaitGroup{} wg.Add(10) for i := 0; i < 10; i++ { go func(i int) { defer wg.Done() fmt.Print(i, " ") }(i) } wg.Wait() }
輸出是無序的。
注意此處有一個小坑,看程式碼:
func main() { wg := sync.WaitGroup{} for i := 0; i < 10; i++ { go func(i int) { wg.Add(1) defer wg.Done() fmt.Print(i, " ") }(i) } wg.Wait() }
看輸出,發現會小於10個,甚至一個也沒有。問題就在於goroutine執行時間和main程的退出時間問題,導致Add()是否執行。
再有就是複製和引用了,如果將wg複製給goroutine作為引數,一定要使用引用,否則就是兩個物件了。
那麼介紹完上面所有的型別後,我把Pool留到了最後,這也是要重點將的地方。
7.Pool 臨時物件池
type Pool struct { noCopy noCopy local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal localSize uintptr // size of the local array // New optionally specifies a function to generate // a value when Get would otherwise return nil. // It may not be changed concurrently with calls to Get. New func() interface{} }
Pool 用於儲存臨時物件,它將使用完畢的物件存入物件池中,在需要的時候取出來重複使用,目的是為了避免重複建立相同的物件造成 GC 負擔過重。其中存放的臨時物件隨時可能被 GC 回收掉(如果該物件不再被其它變數引用)。
從 Pool 中取出物件時,如果 Pool 中沒有物件,將返回 nil,但是如果給 Pool.New 欄位指定了一個函式的話,Pool 將使用該函式建立一個新物件返回。
Pool 可以安全的在多個例程中並行使用,但 Pool 並不適用於所有空閒物件,Pool 應該用來管理併發的例程共享的臨時物件,而不應該管理短壽命物件中的臨時物件,因為這種情況下記憶體不能很好的分配,這些短壽命物件應該自己實現空閒列表。
Pool 在開始使用之後,不能再被複制。
Pool的實現:
1.定時清理
文件上說,儲存在Pool中的物件會在沒有任何通知的情況下被自動移除掉。實際上,這個清理過程是在每次垃圾回收之前做的。垃圾回收是固定兩分鐘觸發一次。而且每次清理會將Pool中的所有物件都清理掉!
2.如何管理資料
先看看這幾個資料結構
type Pool struct { noCopy noCopy local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal localSize uintptr // size of the local array // New optionally specifies a function to generate // a value when Get would otherwise return nil. // It may not be changed concurrently with calls to Get. New func() interface{} } // Local per-P Pool appendix. type poolLocalInternal struct { private interface{} // Can be used only by the respective P. shared []interface{} // Can be used by any P. Mutex // Protects shared. } type poolLocal struct { poolLocalInternal // Prevents false sharing on widespread platforms with // 128 mod (cache line size) = 0 . pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte }
Pool是提供給外部使用的物件。其中的local成員的真實型別是一個poolLocal陣列,localSize是陣列長度。poolLocal是真正儲存資料的地方。priveate儲存了一個臨時物件,shared是儲存臨時物件的陣列。
為什麼Pool中需要這麼多poolLocal物件呢?實際上,Pool是給每個執行緒分配了一個poolLocal物件。也就是說local陣列的長度,就是工作執行緒的數量(size := runtime.GOMAXPROCS(0))。當多執行緒在併發讀寫的時候,通常情況下都是在自己執行緒的poolLocal中存取資料。當自己執行緒的poolLocal中沒有資料時,才會嘗試加鎖去其他執行緒的poolLocal中“偷”資料。
func (p *Pool) Get() interface{} { if race.Enabled { race.Disable() } l := p.pin() //獲取當前執行緒的poolLocal物件,也就是p.local[pid]。 x := l.private l.private = nil runtime_procUnpin() if x == nil { l.Lock() last := len(l.shared) - 1 if last >= 0 { x = l.shared[last] l.shared = l.shared[:last] } l.Unlock() if x == nil { x = p.getSlow() } } if race.Enabled { race.Enable() if x != nil { race.Acquire(poolRaceAddr(x)) } } if x == nil && p.New != nil { x = p.New() } return x }
為什麼這裡要鎖住。答案在getSlow中。因為當shared中沒有資料的時候,會嘗試去其他的poolLocal的shared中偷資料。Pool.Get的時候,首先會在local陣列中獲取當前執行緒對應的poolLocal物件。如果private中有資料,則取出來直接返回。如果沒有則先鎖住shared,有資料則直接返回。
Go語言的goroutine雖然可以建立很多,但是真正能物理上併發執行的goroutine數量是有限的,是由runtime.GOMAXPROCS(0)設定的。所以這個Pool高效的設計的地方就在於將資料分散在了各個真正併發的執行緒中,每個執行緒優先從自己的poolLocal中獲取資料,很大程度上降低了鎖競爭。
來源:https://my.oschina.net/90design/blog/1814499