Golang sync包的使用
Desc:Go sync 包的使用方法,sync.Mutex,sync.RMutex,sync.Once,sync.Cond,sync.Waitgroup
儘管 Golang 推薦通過 channel 進行通訊和同步,但在實際開發中 sync 包用得也非常的多。另外 sync 下還有一個 atomic 包,提供了一些底層的原子操作(這裡不做介紹)。本篇文章主要介紹該包下的鎖的一些概念及使用方法。
整個包都圍繞這 Locker 進行,這是一個 interface:
type Locker interface {
Lock()
Unlock()
}
只有兩個方法,Lock() 和 Unlock()。
另外該包下的物件,在使用過之後,千萬不要複製。
有許多同學不理解鎖的概念,下面會一一介紹到:
為什麼需要鎖?
在併發的情況下,多個執行緒或協程同時去修改一個變數,可能會出現如下情況:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var a = 0
// 啟動 100 個協程,需要足夠大
// var lock sync.Mutex
for i := 0; i < 100; i++ {
go func(idx int) {
// lock.Lock()
// defer lock.Unlock()
a += 1
fmt.Printf("goroutine %d, a=%d\n", idx, a)
}(i)
}
// 等待 1s 結束主程式
// 確保所有協程執行完
time.Sleep(time.Second)
}
觀察列印結果,是否出現 a 的值是相同的情況(未出現則重試或調大協程數),答案:是的。
顯然這不是我們想要的結果。出現這種情況的原因是,協程依次執行:從暫存器讀取 a 的值 -> 然後做加法運算 -> 最後寫會暫存器。試想,此時一個協程取出 a 的值 3,正在做加法運算(還未寫回暫存器)。同時另一個協程此時去取,取出了同樣的 a 的值 3。最終導致的結果是,兩個協程產出的結果相同,a 相當於只增加了 1。
所以,鎖的概念就是,我正在處理 a(鎖定),你們誰都別和我搶,等我處理完了(解鎖),你們再處理。這樣就實現了,同時處理 a 的協程只有一個,就實現了同步。
把上面程式碼裡的註釋取消掉再試下。
什麼是互斥鎖 Mutex?
什麼是互斥鎖?它是鎖的一種具體實現,有兩個方法:
func (m *Mutex) Lock()
func (m *Mutex) Unlock()
在首次使用後不要複製該互斥鎖。對一個未鎖定的互斥鎖解鎖將會產生執行時錯誤。
一個互斥鎖只能同時被一個 goroutine 鎖定,其它 goroutine 將阻塞直到互斥鎖被解鎖(重新爭搶對互斥鎖的鎖定)。如:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
ch := make(chan struct{}, 2)
var l sync.Mutex
go func() {
l.Lock()
defer l.Unlock()
fmt.Println("goroutine1: 我會鎖定大概 2s")
time.Sleep(time.Second * 2)
fmt.Println("goroutine1: 我解鎖了,你們去搶吧")
ch <- struct{}{}
}()
go func() {
fmt.Println("groutine2: 等待解鎖")
l.Lock()
defer l.Unlock()
fmt.Println("goroutine2: 哈哈,我鎖定了")
ch <- struct{}{}
}()
// 等待 goroutine 執行結束
for i := 0; i < 2; i++ {
<-ch
}
}
注意,平時所說的鎖定,其實就是去鎖定互斥鎖,而不是說去鎖定一段程式碼。也就是說,當代碼執行到有鎖的地方時,它獲取不到互斥鎖的鎖定,會阻塞在那裡,從而達到控制同步的目的。
什麼是讀寫鎖 RWMutex?
那麼什麼是讀寫鎖呢?它是針對讀寫操作的互斥鎖,讀寫鎖與互斥鎖最大的不同就是可以分別對 讀、寫 進行鎖定。一般用在大量讀操作、少量寫操作的情況:
func (rw *RWMutex) Lock()
func (rw *RWMutex) Unlock()
func (rw *RWMutex) RLock()
func (rw *RWMutex) RUnlock()
由於這裡需要區分讀寫鎖定,我們這樣定義:
- 讀鎖定(RLock),對讀操作進行鎖定
- 讀解鎖(RUnlock),對讀鎖定進行解鎖
- 寫鎖定(Lock),對寫操作進行鎖定
- 寫解鎖(Unlock),對寫鎖定進行解鎖
在首次使用之後,不要複製該讀寫鎖。不要混用鎖定和解鎖,如:Lock 和 RUnlock、RLock 和 Unlock。因為對未讀鎖定的讀寫鎖進行讀解鎖或對未寫鎖定的讀寫鎖進行寫解鎖將會引起執行時錯誤。
如何理解讀寫鎖呢?
- 同時只能有一個 goroutine 能夠獲得寫鎖定。
- 同時可以有任意多個 gorouinte 獲得讀鎖定。
- 同時只能存在寫鎖定或讀鎖定(讀和寫互斥)。
也就是說,當有一個 goroutine 獲得寫鎖定,其它無論是讀鎖定還是寫鎖定都將阻塞直到寫解鎖;當有一個 goroutine 獲得讀鎖定,其它讀鎖定任然可以繼續;當有一個或任意多個讀鎖定,寫鎖定將等待所有讀鎖定解鎖之後才能夠進行寫鎖定。所以說這裡的讀鎖定(RLock)目的其實是告訴寫鎖定:有很多人正在讀取資料,你給我站一邊去,等它們讀(讀解鎖)完你再來寫(寫鎖定)。
使用例子:
package main
import (
"fmt"
"math/rand"
"sync"
)
var count int
var rw sync.RWMutex
func main() {
ch := make(chan struct{}, 10)
for i := 0; i < 5; i++ {
go read(i, ch)
}
for i := 0; i < 5; i++ {
go write(i, ch)
}
for i := 0; i < 10; i++ {
<-ch
}
}
func read(n int, ch chan struct{}) {
rw.RLock()
fmt.Printf("goroutine %d 進入讀操作...\n", n)
v := count
fmt.Printf("goroutine %d 讀取結束,值為:%d\n", n, v)
rw.RUnlock()
ch <- struct{}{}
}
func write(n int, ch chan struct{}) {
rw.Lock()
fmt.Printf("goroutine %d 進入寫操作...\n", n)
v := rand.Intn(1000)
count = v
fmt.Printf("goroutine %d 寫入結束,新值為:%d\n", n, v)
rw.Unlock()
ch <- struct{}{}
}
WaitGroup 例子
WaitGroup 用於等待一組 goroutine 結束,用法很簡單。它有三個方法:
func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait()
Add 用來新增 goroutine 的個數。Done 執行一次數量減 1。Wait 用來等待結束:
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
for i, s := range seconds {
// 計數加 1
wg.Add(1)
go func(i, s int) {
// 計數減 1
defer wg.Done()
fmt.Printf("goroutine%d 結束\n", i)
}(i, s)
}
// 等待執行結束
wg.Wait()
fmt.Println("所有 goroutine 執行結束")
}
注意,wg.Add()
方法一定要在 goroutine 開始前執行哦。
Cond 條件變數
Cond 實現一個條件變數,即等待或宣佈事件發生的 goroutines 的會合點。
type Cond struct {
noCopy noCopy
// L is held while observing or changing the condition
L Locker
notify notifyList
checker copyChecker
}
它會儲存一個通知列表。
func NewCond(l Locker) *Cond
func (c *Cond) Broadcast()
func (c *Cond) Signal()
func (c *Cond) Wait()
Wait 方法、Signal 方法和 Broadcast 方法。它們分別代表了等待通知、單發通知和廣播通知的操作。
我們來看一下 Wait 方法:
func (c *Cond) Wait() {
c.checker.check()
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}
它的操作為:加入到通知列表 -> 解鎖 L -> 等待通知 -> 鎖定 L。其使用方法是:
c.L.Lock()
for !condition() {
c.Wait()
}
... make use of condition ...
c.L.Unlock()
舉個例子:
// Package main provides ...
package main
import (
"fmt"
"sync"
"time"
)
var count int = 4
func main() {
ch := make(chan struct{}, 5)
// 新建 cond
var l sync.Mutex
cond := sync.NewCond(&l)
for i := 0; i < 5; i++ {
go func(i int) {
// 爭搶互斥鎖的鎖定
cond.L.Lock()
defer func() {
cond.L.Unlock()
ch <- struct{}{}
}()
// 條件是否達成
for count > i {
cond.Wait()
fmt.Printf("收到一個通知 goroutine%d\n", i)
}
fmt.Printf("goroutine%d 執行結束\n", i)
}(i)
}
// 確保所有 goroutine 啟動完成
time.Sleep(time.Millisecond * 20)
// 鎖定一下,我要改變 count 的值
fmt.Println("broadcast...")
cond.L.Lock()
count -= 1
cond.Broadcast()
cond.L.Unlock()
time.Sleep(time.Second)
fmt.Println("signal...")
cond.L.Lock()
count -= 2
cond.Signal()
cond.L.Unlock()
time.Sleep(time.Second)
fmt.Println("broadcast...")
cond.L.Lock()
count -= 1
cond.Broadcast()
cond.L.Unlock()
for i := 0; i < 5; i++ {
<-ch
}
}
Pool 臨時物件池
sync.Pool
可以作為臨時物件的儲存和複用的集合。其結構為:
type Pool struct {
noCopy noCopy
local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
localSize uintptr // size of the local array
// New optionally specifies a function to generate
// a value when Get would otherwise return nil.
// It may not be changed concurrently with calls to Get.
New func() interface{}
}
func (p *Pool) Get() interface{}
func (p *Pool) Put(x interface{})
新鍵 Pool 需要提供一個 New 方法,目的是當獲取不到臨時物件時自動建立一個(不會主動加入到 Pool 中),Get 和 Put 方法都很好理解。
深入瞭解過 Go 的同學應該知道,Go 的重要組成結構為 M、P、G。Pool 實際上會為每一個操作它的 goroutine 相關聯的 P 都生成一個本地池。如果從本地池 Get 物件的時候,本地池沒有,則會從其它的 P 本地池獲取。因此,Pool 的一個特點就是:可以把由其中的物件值產生的儲存壓力進行分攤。
它有著以下特點:
- Pool 中的物件在僅有 Pool 有著唯一索引的情況下可能會被自動刪除(取決於下一次 GC 執行的時間)。
- goroutines 協程安全,可以同時被多個協程使用。
GC 的執行一般會使 Pool 中的物件全部移除。
那麼 Pool 都適用於什麼場景呢?從它的特點來說,適用與無狀態的物件的複用,而不適用與如連線池之類的。在 fmt 包中有一個很好的使用池的例子,它維護一個動態大小的臨時輸出緩衝區。
官方例子:
package main
import (
"bytes"
"io"
"os"
"sync"
"time"
)
var bufPool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}
func timeNow() time.Time {
return time.Unix(1136214245, 0)
}
func Log(w io.Writer, key, val string) {
// 獲取臨時物件,沒有的話會自動建立
b := bufPool.Get().(*bytes.Buffer)
b.Reset()
b.WriteString(timeNow().UTC().Format(time.RFC3339))
b.WriteByte(' ')
b.WriteString(key)
b.WriteByte('=')
b.WriteString(val)
w.Write(b.Bytes())
// 將臨時物件放回到 Pool 中
bufPool.Put(b)
}
func main() {
Log(os.Stdout, "path", "/search?q=flowers")
}
列印結果:
2006-01-02T15:04:05Z path=/search?q=flowers
Once 執行一次
使用 sync.Once
物件可以使得函式多次呼叫只執行一次。其結構為:
type Once struct {
m Mutex
done uint32
}
func (o *Once) Do(f func())
用 done 來記錄執行次數,用 m 來保證保證僅被執行一次。只有一個 Do 方法,呼叫執行。
package main
import (
"fmt"
"sync"
)
func main() {
var once sync.Once
onceBody := func() {
fmt.Println("Only once")
}
done := make(chan bool)
for i := 0; i < 10; i++ {
go func() {
once.Do(onceBody)
done <- true
}()
}
for i := 0; i < 10; i++ {
<-done
}
}
# 列印結果
Only once