Go語言併發程式設計(四)
同步
Go 程式可以使用通道進行多個 goroutine 間的資料交換,但這僅僅是資料同步中的一種方法。通道內部的實現依然使用了各種鎖,因此優雅程式碼的代價是效能。在某些輕量級的場合,原子訪問(atomic包)、互斥鎖(sync.Mutex)以及等待組(sync.WaitGroup)能最大程度滿足需求。
當多執行緒併發執行的程式競爭訪問和修改同一塊資源時,會發生競態問題。
下面的程式碼中有一個 ID 生成器,每次呼叫生成器將會生成一個不會重複的順序序號,使用 10 個併發生成序號,觀察 10 個併發後的結果。
競態檢測:
package main import ( "fmt" "sync/atomic" ) var ( // 序列號 seq int64 ) // 序列號生成器 func GenID() int64 { // 嘗試原子的增加序列號 atomic.AddInt64(&seq, 1) return seq } func main() { // 10個併發序列號生成 for i := 0; i < 10; i++ { go GenID() } fmt.Println(GenID()) }
程式碼說明如下:
- 第10行,序列號生成器中的儲存上次序列號的變數。
- 第17行,使用原子操作函式atomic.AddInt64()對seq()函式加1操作。不過這裡故意沒有使用atomic.AddInt64()的返回值作為GenID()函式的返回值,因此會造成一個競態問題。
- 第25行,迴圈10次生成10個goroutine呼叫GenID()函式,同時忽略GenID()的返回值。
- 第28行,單獨呼叫一次GenID()函式。
在執行程式時,為執行引數加入-race引數,開啟執行時(runtime)對競態問題的分析,命令如下:
# go run -race racedetect.go ================== WARNING: DATA RACE Write at 0x0000005d3f10 by goroutine 7: sync/atomic.AddInt64() E:/go/src/runtime/race_amd64.s:276 +0xb main.GenID() D:/go_work/src/chapter09/racedetect/racedetect.go:17 +0x4a Previous read at 0x0000005d3f10 by goroutine 6: main.GenID() D:/go_work/src/chapter09/racedetect/racedetect.go:18 +0x5a Goroutine 7 (running) created at: main.main() D:/go_work/src/chapter09/racedetect/racedetect.go:25 +0x56 Goroutine 6 (finished) created at: main.main() D:/go_work/src/chapter09/racedetect/racedetect.go:25 +0x56 ================== 10 Found 1 data race(s) exit status 66
程式碼執行發生宕機,根據報錯資訊,第18行有競態問題,根據atomic.AddInt64()的引數宣告,這個函式會將修改後的值以返回值方式傳出:
func GenID() int64 { // 嘗試原子的增加序列號 return atomic.AddInt64(&seq, 1) }
再次執行:
# go run -race racedetect.go 10
沒有發生競態問題,程式執行正常。
本例中只是對變數進行增減操作,雖然可以使用互斥鎖(sync.Mutex)解決競態問題,但是對效能消耗較大。在這種情況下,推薦使用原子操作(atomic)進行變數操作。
互斥鎖(sync.Mutex)和讀寫互斥鎖(sync.RWMutex)
互斥鎖是一種常用的控制共享資源訪問的方法,它能夠保證同時只有一個goroutine可以訪問共享資源。在Go程式中的使用非常簡單,參見下面的程式碼:
package main import ( "fmt" "sync" ) var ( // 邏輯中使用的某個變數 count int // 與變數對應的使用互斥鎖 countGuard sync.Mutex ) func GetCount() int { // 鎖定 countGuard.Lock() // 在函式退出時解除鎖定 defer countGuard.Unlock() return count } func SetCount(c int) { countGuard.Lock() count = c countGuard.Unlock() } func main() { // 可以進行併發安全的設定 SetCount(1) // 可以進行併發安全的獲取 fmt.Println(GetCount()) }
程式碼說明如下:
- 第10行是某個邏輯步驟中使用到的變數,無論是包級的變數還是結構體成員欄位,都可以。
- 第13行,一般情況下,建議將互斥鎖的粒度設定得越小越好,降低因為共享訪問時等待的時間。
- 第16行是一個獲取count值的函式封裝,通過這個函式可以併發安全的訪問變數count。
- 第19行,嘗試對countGuard互斥量進行加鎖。一旦countGuard發生加鎖,如果另外一個goroutine嘗試繼續加鎖時將會發生阻塞,直到這個countGuard被解鎖。
- 第22行使用defer將countGuard的解鎖進行延遲呼叫,解鎖操作將會發生在GetCount()函式返回時。
- 第27行在設定count值時,同樣使用countGuard進行加鎖、解鎖操作,保證修改count值的過程是一個原子過程,不會發生併發訪問衝突。
在讀多寫少的環境中,可以優先使用讀寫互斥鎖(sync.RWMutex),它比互斥鎖更加高效。sync包中的RWMutex提供了讀寫互斥鎖的封裝。
我們將互斥鎖例子中的一部分程式碼修改為讀寫互斥鎖,參見下面程式碼:
var ( // 邏輯中使用的某個變數 count int // 與變數對應的使用互斥鎖 countGuard sync.RWMutex ) func GetCount() int { // 鎖定 countGuard.RLock() // 在函式退出時解除鎖定 defer countGuard.RUnlock() return count }
程式碼說明如下:
- 第6行,在宣告countGuard時,從sync.Mutex互斥鎖改為sync.RWMutex讀寫互斥鎖。
- 第12行,獲取count的過程是一個讀取count資料的過程,適用於讀寫互斥鎖。在這一行,把countGuard.Lock()換做countGuard.RLock(),將讀寫互斥鎖標記為讀狀態。如果此時另外一個goroutine併發訪問了countGuard,同時也呼叫了countGuard.RLock()時,並不會發生阻塞。
- 第15行,與讀模式加鎖對應的,使用讀模式解鎖。
等待組(sync.WaitGroup)
除了可以使用通道(channel)和互斥鎖進行兩個併發程式間的同步外,還可以使用等待組進行多個任務的同步,等待組可以保證在併發環境中完成指定數量的任務
等待組有下面幾個方法可用,如表1-2所示。
方法名 | 功能 |
(wg * WaitGroup) Add(delta int) | 等待組的計數器+1 |
(wg *WaitGroup) Done() | 等待組的計數器-1 |
(wg *WaitGroup) Wait() | 當等待組計數器不等於0時阻塞直到變0 |
等待組內部擁有一個計數器,計數器的值可以通過方法呼叫實現計數器的增加和減少。當我們添加了N個併發任務進行工作時,就將等待組的計數器值增加N。每個任務完成時,這個值減1。同時,在另外一個goroutine中等待這個等待組的計數器值為0時,表示所有任務已經完成。
package main import ( "fmt" "net/http" "sync" ) func main() { // 宣告一個等待組 var wg sync.WaitGroup // 準備一系列的網站地址 var urls = []string{ "http://www.github.com/", "https://www.qiniu.com/", "https://www.golangtc.com/", } // 遍歷這些地址 for _, url := range urls { // 每一個任務開始時, 將等待組增加1 wg.Add(1) // 開啟一個併發 go func(url string) { // 使用defer, 表示函式完成時將等待組值減1 defer wg.Done() // 使用http訪問提供的地址 _, err := http.Get(url) // 訪問完成後, 列印地址和可能發生的錯誤 fmt.Println(url, err) // 通過引數傳遞url地址 }(url) } // 等待所有的任務完成 wg.Wait() fmt.Println("over") }
程式碼說明如下:
- 第12行,宣告一個等待組,對一組等待任務只需要一個等待組,而不需要每一個任務都使用一個等待組。
- 第15行,準備一系列可訪問的網站地址的字串切片。
- 第22行,遍歷這些字串切片。
- 第25行,將等待組的計數器加1,也就是每一個任務加1。
- 第28行,將一個匿名函式開啟併發。
- 第31行,在匿名函式結束時會執行這一句以表示任務完成。wg.Done()方法等效於執行wg.Add(-1)。
- 第34行,使用http包提供的Get()函式對url進行訪問,Get()函式會一直阻塞直到網站響應或者超時。
- 第37行,在網站響應和超時後,列印這個網站的地址和可能發生的錯誤。
- 第40行,這裡將url通過goroutine的引數進行傳遞,是為了避免url變數通過閉包放入匿名函式後又被修改的問題。
- 第44行,等待所有的網站都響應或者超時後,任務完成,Wait就會停止阻塞。