1. 程式人生 > >Go語言併發程式設計(四)

Go語言併發程式設計(四)

同步

Go 程式可以使用通道進行多個 goroutine 間的資料交換,但這僅僅是資料同步中的一種方法。通道內部的實現依然使用了各種鎖,因此優雅程式碼的代價是效能。在某些輕量級的場合,原子訪問(atomic包)、互斥鎖(sync.Mutex)以及等待組(sync.WaitGroup)能最大程度滿足需求。

當多執行緒併發執行的程式競爭訪問和修改同一塊資源時,會發生競態問題。

下面的程式碼中有一個 ID 生成器,每次呼叫生成器將會生成一個不會重複的順序序號,使用 10 個併發生成序號,觀察 10 個併發後的結果。

競態檢測:

package main

import (
	"fmt"
	"sync/atomic"
)

var (
	// 序列號
	seq int64
)

// 序列號生成器
func GenID() int64 {

	// 嘗試原子的增加序列號
	atomic.AddInt64(&seq, 1)
	return seq
}

func main() {

	// 10個併發序列號生成
	for i := 0; i < 10; i++ {
		go GenID()
	}

	fmt.Println(GenID())
}

  

程式碼說明如下:

  • 第10行,序列號生成器中的儲存上次序列號的變數。
  • 第17行,使用原子操作函式atomic.AddInt64()對seq()函式加1操作。不過這裡故意沒有使用atomic.AddInt64()的返回值作為GenID()函式的返回值,因此會造成一個競態問題。
  • 第25行,迴圈10次生成10個goroutine呼叫GenID()函式,同時忽略GenID()的返回值。
  • 第28行,單獨呼叫一次GenID()函式。

在執行程式時,為執行引數加入-race引數,開啟執行時(runtime)對競態問題的分析,命令如下:

# go run -race racedetect.go
==================
WARNING: DATA RACE
Write at 0x0000005d3f10 by goroutine 7:
  sync/atomic.AddInt64()
      E:/go/src/runtime/race_amd64.s:276 +0xb
  main.GenID()
      D:/go_work/src/chapter09/racedetect/racedetect.go:17 +0x4a

Previous read at 0x0000005d3f10 by goroutine 6:
  main.GenID()
      D:/go_work/src/chapter09/racedetect/racedetect.go:18 +0x5a

Goroutine 7 (running) created at:
  main.main()
      D:/go_work/src/chapter09/racedetect/racedetect.go:25 +0x56

Goroutine 6 (finished) created at:
  main.main()
      D:/go_work/src/chapter09/racedetect/racedetect.go:25 +0x56
==================
10
Found 1 data race(s)
exit status 66

  

程式碼執行發生宕機,根據報錯資訊,第18行有競態問題,根據atomic.AddInt64()的引數宣告,這個函式會將修改後的值以返回值方式傳出:

func GenID() int64 {
    // 嘗試原子的增加序列號
    return atomic.AddInt64(&seq, 1)
}

  

再次執行:

# go run -race racedetect.go
10

  

沒有發生競態問題,程式執行正常。

本例中只是對變數進行增減操作,雖然可以使用互斥鎖(sync.Mutex)解決競態問題,但是對效能消耗較大。在這種情況下,推薦使用原子操作(atomic)進行變數操作。

互斥鎖(sync.Mutex)和讀寫互斥鎖(sync.RWMutex)

互斥鎖是一種常用的控制共享資源訪問的方法,它能夠保證同時只有一個goroutine可以訪問共享資源。在Go程式中的使用非常簡單,參見下面的程式碼:

package main

import (
    "fmt"
    "sync"
)

var (
    // 邏輯中使用的某個變數
    count int

    // 與變數對應的使用互斥鎖
    countGuard sync.Mutex
)

func GetCount() int {

    // 鎖定
    countGuard.Lock()

    // 在函式退出時解除鎖定
    defer countGuard.Unlock()

    return count
}

func SetCount(c int) {
    countGuard.Lock()
    count = c
    countGuard.Unlock()
}

func main() {

    // 可以進行併發安全的設定
    SetCount(1)

    // 可以進行併發安全的獲取
    fmt.Println(GetCount())

}

  

程式碼說明如下:

  • 第10行是某個邏輯步驟中使用到的變數,無論是包級的變數還是結構體成員欄位,都可以。
  • 第13行,一般情況下,建議將互斥鎖的粒度設定得越小越好,降低因為共享訪問時等待的時間。
  • 第16行是一個獲取count值的函式封裝,通過這個函式可以併發安全的訪問變數count。
  • 第19行,嘗試對countGuard互斥量進行加鎖。一旦countGuard發生加鎖,如果另外一個goroutine嘗試繼續加鎖時將會發生阻塞,直到這個countGuard被解鎖。
  • 第22行使用defer將countGuard的解鎖進行延遲呼叫,解鎖操作將會發生在GetCount()函式返回時。
  • 第27行在設定count值時,同樣使用countGuard進行加鎖、解鎖操作,保證修改count值的過程是一個原子過程,不會發生併發訪問衝突。

在讀多寫少的環境中,可以優先使用讀寫互斥鎖(sync.RWMutex),它比互斥鎖更加高效。sync包中的RWMutex提供了讀寫互斥鎖的封裝。

我們將互斥鎖例子中的一部分程式碼修改為讀寫互斥鎖,參見下面程式碼:

var (
    // 邏輯中使用的某個變數
    count int

    // 與變數對應的使用互斥鎖
    countGuard sync.RWMutex
)

func GetCount() int {

    // 鎖定
    countGuard.RLock()

    // 在函式退出時解除鎖定
    defer countGuard.RUnlock()

    return count
}

  

程式碼說明如下:

  • 第6行,在宣告countGuard時,從sync.Mutex互斥鎖改為sync.RWMutex讀寫互斥鎖。
  • 第12行,獲取count的過程是一個讀取count資料的過程,適用於讀寫互斥鎖。在這一行,把countGuard.Lock()換做countGuard.RLock(),將讀寫互斥鎖標記為讀狀態。如果此時另外一個goroutine併發訪問了countGuard,同時也呼叫了countGuard.RLock()時,並不會發生阻塞。
  • 第15行,與讀模式加鎖對應的,使用讀模式解鎖。

等待組(sync.WaitGroup)

除了可以使用通道(channel)和互斥鎖進行兩個併發程式間的同步外,還可以使用等待組進行多個任務的同步,等待組可以保證在併發環境中完成指定數量的任務

等待組有下面幾個方法可用,如表1-2所示。

表1-2   等待組的方法
方法名 功能
(wg * WaitGroup) Add(delta int) 等待組的計數器+1
(wg *WaitGroup) Done() 等待組的計數器-1
(wg *WaitGroup) Wait() 當等待組計數器不等於0時阻塞直到變0

等待組內部擁有一個計數器,計數器的值可以通過方法呼叫實現計數器的增加和減少。當我們添加了N個併發任務進行工作時,就將等待組的計數器值增加N。每個任務完成時,這個值減1。同時,在另外一個goroutine中等待這個等待組的計數器值為0時,表示所有任務已經完成。

package main

import (
    "fmt"
    "net/http"
    "sync"
)

func main() {

    // 宣告一個等待組
    var wg sync.WaitGroup

    // 準備一系列的網站地址
    var urls = []string{
        "http://www.github.com/",
        "https://www.qiniu.com/",
        "https://www.golangtc.com/",
    }

    // 遍歷這些地址
    for _, url := range urls {

        // 每一個任務開始時, 將等待組增加1
        wg.Add(1)

        // 開啟一個併發
        go func(url string) {

            // 使用defer, 表示函式完成時將等待組值減1
            defer wg.Done()

            // 使用http訪問提供的地址
            _, err := http.Get(url)

            // 訪問完成後, 列印地址和可能發生的錯誤
            fmt.Println(url, err)

            // 通過引數傳遞url地址
        }(url)
    }

    // 等待所有的任務完成
    wg.Wait()

    fmt.Println("over")
}

  

程式碼說明如下:

  • 第12行,宣告一個等待組,對一組等待任務只需要一個等待組,而不需要每一個任務都使用一個等待組。
  • 第15行,準備一系列可訪問的網站地址的字串切片。
  • 第22行,遍歷這些字串切片。
  • 第25行,將等待組的計數器加1,也就是每一個任務加1。
  • 第28行,將一個匿名函式開啟併發。
  • 第31行,在匿名函式結束時會執行這一句以表示任務完成。wg.Done()方法等效於執行wg.Add(-1)。
  • 第34行,使用http包提供的Get()函式對url進行訪問,Get()函式會一直阻塞直到網站響應或者超時。
  • 第37行,在網站響應和超時後,列印這個網站的地址和可能發生的錯誤。
  • 第40行,這裡將url通過goroutine的引數進行傳遞,是為了避免url變數通過閉包放入匿名函式後又被修改的問題。
  • 第44行,等待所有的網站都響應或者超時後,任務完成,Wait就會停止阻塞。