1. 程式人生 > 其它 >javaweb學習17:JSP基礎語法和指令

javaweb學習17:JSP基礎語法和指令

併發程式設計,為什麼選Go? https://mp.weixin.qq.com/s/IVAM7mejrUjp3Ol78S6KIw

併發程式設計,為什麼選Go?

呂呂 雲加社群 2022-03-30 12:09

 

導語 | 程式碼的穩健、可讀和高效是我們每一個coder的共同追求。本文將結合Go語言特性,為書寫高效的程式碼,力爭從併發方面給出相關建議。讓我們一起學習Go高效能程式設計的技法吧~

 

在上篇再不Go就來不及了!Go高效能程式設計技法解讀中我們結合Go語言特性,為書寫高效的程式碼,從常用資料結構、記憶體管理兩個方面給出相關建議,本篇將深入併發這部分進行闡述。

 

一、併發程式設計

 

(一)關於鎖

 

  • 無鎖化

 

加鎖是為了避免在併發環境下,同時訪問共享資源產生的安全問題。那麼,在併發環境下,是否必須加鎖?答案是否定的。並非所有的併發都需要加鎖。適當地降低鎖的粒度,甚至採用無鎖化的設計,更能提升併發能力。

無鎖化主要有兩種實現,無鎖資料結構和序列無鎖。

 

  • 無鎖資料結構

 

利用硬體支援的原子操作可以實現無鎖的資料結構,原子操作可以在lock-free的情況下保證併發安全,並且它的效能也能做到隨CPU個數的增多而線性擴充套件。很多語言都提供CAS原子操作(如Go中的atomic包和C++11中的atomic庫),可以用於實現無鎖資料結構,如無鎖鏈表。

 

我們以一個簡單的執行緒安全單向連結串列的插入操作來看下無鎖程式設計和普通加鎖的區別。

 

package list
import ( "fmt" "sync" "sync/atomic"
"golang.org/x/sync/errgroup")
// Node 連結串列節點type Node struct { Value interface{} Next *Node}
//// 有鎖單向連結串列的簡單實現//
// WithLockList 有鎖單向連結串列type WithLockList struct { Head *Node mu sync.Mutex}
// Push 將元素插入到連結串列的首部func (l *WithLockList) Push(v interface{}) { l.mu.Lock()
defer l.mu.Unlock() n := &Node{ Value: v, Next: l.Head, } l.Head = n}
// String 有鎖鏈表的字串形式輸出func (l WithLockList) String() string { s := "" cur := l.Head for { if cur == nil { break } if s != "" { s += "," } s += fmt.Sprintf("%v", cur.Value) cur = cur.Next } return s}
//// 無鎖單向連結串列的簡單實現//
// LockFreeList 無鎖單向連結串列type LockFreeList struct { Head atomic.Value}
// Push 有鎖func (l *LockFreeList) Push(v interface{}) { for { head := l.Head.Load() headNode, _ := head.(*Node) n := &Node{ Value: v, Next: headNode, } if l.Head.CompareAndSwap(head, n) { break } }}
// String 有鎖鏈表的字串形式輸出func (l LockFreeList) String() string { s := "" cur := l.Head.Load().(*Node) for { if cur == nil { break } if s != "" { s += "," } s += fmt.Sprintf("%v", cur.Value) cur = cur.Next } return s}

 

上面的實現有幾點需要注意一下:

 

  • 無鎖單向連結串列實現時在插入時需要進行CAS操作,即呼叫CompareAndSwap()方法進行插入,如果插入失敗則進行for迴圈多次嘗試,直至成功。

 

  • 為了方便列印連結串列內容,實現一個String()方法遍歷連結串列,且使用值作為接收者,避免列印物件指標時無法生效。

 

If an operand implements method String() string, that method will be invoked to convert the object to a string, which will then be formatted as required by the verb (if any).

 

我們分別對兩種連結串列做一個併發寫入的操作驗證一下其功能。

 

package main
import ( "fmt" "main/list")
// ConcurWriteWithLockList 併發寫入有鎖鏈表func ConcurWriteWithLockList(l *WithLockList) { var g errgroup.Group // 10 個協程併發寫入連結串列 for i := 0; i < 10; i++ { i := i g.Go(func() error { l.Push(i) return nil }) } _ = g.Wait()}
// ConcurWriteLockFreeList 併發寫入無鎖鏈表func ConcurWriteLockFreeList(l *LockFreeList) { var g errgroup.Group // 10 個協程併發寫入連結串列 for i := 0; i < 10; i++ { i := i g.Go(func() error { l.Push(i) return nil }) } _ = g.Wait()}
func main() { // 併發寫入與遍歷列印有鎖鏈表 l1 := &list.WithLockList{} list.ConcurWriteWithLockList(l1) fmt.Println(l1)
// 併發寫入與遍歷列印無鎖鏈表 l2 := &list.LockFreeList{} list.ConcurWriteLockFreeList(l2) fmt.Println(l2)}

 

注意,多次執行上面的main()函式的結果可能會不相同,因為併發是無序的。

 

8,7,6,9,5,4,3,1,2,09,8,7,6,5,4,3,2,0,1

 

下面再看一下連結串列Push操作的基準測試,對比一下有鎖與無鎖的效能差異。

 

func BenchmarkWriteWithLockList(b *testing.B) {  l := &WithLockList{}  for n := 0; n < b.N; n++ {    l.Push(n)  }}BenchmarkWriteWithLockList-8    14234166                83.58 ns/op
func BenchmarkWriteLockFreeList(b *testing.B) { l := &LockFreeList{} for n := 0; n < b.N; n++ { l.Push(n) }}BenchmarkWriteLockFreeList-8 15219405 73.15 ns/op

 

可以看出無鎖版本比有鎖版本效能高一些。

 

  • 序列無鎖

 

序列無鎖是一種思想,就是避免對共享資源的併發訪問,改為每個併發操作訪問自己獨佔的資源,達到序列訪問資源的效果,來避免使用鎖。不同的場景有不同的實現方式。比如網路I/O場景下將單Reactor多執行緒模型改為主從Reactor多執行緒模型,避免對同一個訊息佇列鎖讀取。

 

這裡我介紹的是後臺微服務開發經常遇到的一種情況。我們經常需要併發拉取多方面的資訊,匯聚到一個變數上。那麼此時就存在對同一個變數互斥寫入的情況。比如批量併發拉取使用者資訊寫入到一個map。此時我們可以將每個協程拉取的結果寫入到一個臨時物件,這樣便將併發地協程與同一個變數解綁,然後再將其匯聚到一起,這樣便可以不用使用鎖。即獨立處理,然後合併。

 

 

為了模擬上面的情況,簡單地寫個示例程式,對比下效能。

 

import (  "sync"
"golang.org/x/sync/errgroup")
// ConcurWriteMapWithLock 有鎖併發寫入 mapfunc ConcurWriteMapWithLock() map[int]int { m := make(map[int]int) var mu sync.Mutex var g errgroup.Group // 10 個協程併發寫入 map for i := 0; i < 10; i++ { i := i g.Go(func() error { mu.Lock() defer mu.Unlock() m[i] = i * i return nil }) } _ = g.Wait() return m}
// ConcurWriteMapLockFree 無鎖併發寫入 mapfunc ConcurWriteMapLockFree() map[int]int { m := make(map[int]int) // 每個協程獨佔一 value values := make([]int, 10) // 10 個協程併發寫入 map var g errgroup.Group for i := 0; i < 10; i++ { i := i g.Go(func() error { values[i] = i * i return nil }) } _ = g.Wait() // 匯聚結果到 map for i, v := range values { m[i] = v } return m}

 

看下二者的效能差異:


 

func BenchmarkConcurWriteMapWithLock(b *testing.B) {  for n := 0; n < b.N; n++ {    _ = ConcurWriteMapWithLock()  }}BenchmarkConcurWriteMapWithLock-8         218673              5089 ns/op
func BenchmarkConcurWriteMapLockFree(b *testing.B) { for n := 0; n < b.N; n++ { _ = ConcurWriteMapLockFree() }}BenchmarkConcurWriteMapLockFree-8 316635 4048 ns/op

 

  • 減少鎖競爭

 

如果加鎖無法避免,則可以採用分片的形式,減少對資源加鎖的次數,這樣也可以提高整體的效能。

 

比如Golang優秀的本地快取元件bigcache、go-cache、freecache都實現了分片功能,每個分片一把鎖,採用分片儲存的方式減少加鎖的次數從而提高整體效能。

 

以一個簡單的示例,通過對map[uint64]struct{}分片前後併發寫入的對比,來看下減少鎖競爭帶來的效能提升。

 

var (  num = 1000000  m0  = make(map[int]struct{}, num)  mu0 = sync.RWMutex{}  m1  = make(map[int]struct{}, num)  mu1 = sync.RWMutex{})
// ConWriteMapNoShard 不分片寫入一個 map。func ConWriteMapNoShard() { g := errgroup.Group{} for i := 0; i < num; i++ { g.Go(func() error { mu0.Lock() defer mu0.Unlock() m0[i] = struct{}{} return nil }) } _ = g.Wait()}
// ConWriteMapTwoShard 分片寫入兩個 map。func ConWriteMapTwoShard() { g := errgroup.Group{} for i := 0; i < num; i++ { g.Go(func() error { if i&1 == 0 { mu0.Lock() defer mu0.Unlock() m0[i] = struct{}{} return nil } mu1.Lock() defer mu1.Unlock() m1[i] = struct{}{} return nil }) } _ = g.Wait()}

 

看下二者的效能差異:

 

func BenchmarkConWriteMapNoShard(b *testing.B) {  for i := 0; i < b.N; i++ {    ConWriteMapNoShard()  }}BenchmarkConWriteMapNoShard-12                 3         472063245 ns/op
func BenchmarkConWriteMapTwoShard(b *testing.B) { for i := 0; i < b.N; i++ { ConWriteMapTwoShard() }}BenchmarkConWriteMapTwoShard-12 4 310588155 ns/op

 

可以看到,通過對分共享資源的分片處理,減少了鎖競爭,能明顯地提高程式的併發效能。可以預見的是,隨著分片粒度地變小,效能差距會越來越大。當然,分片粒度不是越小越好。因為每一個分片都要配一把鎖,那麼會帶來很多額外的不必要的開銷。可以選擇一個不太大的值,在效能和花銷上尋找一個平衡。

 

  • 優先使用共享鎖而非互斥鎖

 

如果併發無法做到無鎖化,優先使用共享鎖而非互斥鎖。

 

所謂互斥鎖,指鎖只能被一個Goroutine獲得。共享鎖指可以同時被多個Goroutine獲得的鎖。

 

Go標準庫sync提供了兩種鎖,互斥鎖(sync.Mutex)和讀寫鎖(sync.RWMutex),讀寫鎖便是共享鎖的一種具體實現。

 

  • sync.Mutex

 

互斥鎖的作用是保證共享資源同一時刻只能被一個Goroutine佔用,一個Goroutine佔用了,其他的Goroutine則阻塞等待。

 

 

sync.Mutex提供了兩個匯出方法用來使用鎖。

 

Lock()     // 加鎖Unlock()    // 釋放鎖

 

我們可以通過在訪問共享資源前前用Lock方法對資源進行上鎖,在訪問共享資源後呼叫Unlock方法來釋放鎖,也可以用defer語句來保證互斥鎖一定會被解鎖。在一個Go協程呼叫Lock方法獲得鎖後,其他請求鎖的協程都會阻塞在Lock方法,直到鎖被釋放。

 

  • sync.RWMutex

 

讀寫鎖是一種共享鎖,也稱之為多讀單寫鎖 (multiple readers, single writer lock)。在使用鎖時,對獲取鎖的目的操作做了區分,一種是讀操作,一種是寫操作。因為同一時刻允許多個Gorouine獲取讀鎖,所以是一種共享鎖。但寫鎖是互斥的。

 

一般來說,有如下幾種情況:

 

  • 讀鎖之間不互斥,沒有寫鎖的情況下,讀鎖是無阻塞的,多個協程可以同時獲得讀鎖。

 

  • 寫鎖之間是互斥的,存在寫鎖,其他寫鎖阻塞。

 

  • 寫鎖與讀鎖是互斥的,如果存在讀鎖,寫鎖阻塞,如果存在寫鎖,讀鎖阻塞。

 

 

sync.RWMutex提供了五個匯出方法用來使用鎖。

 

Lock()        // 加寫鎖Unlock()      // 釋放寫鎖RLock()        // 加讀鎖RUnlock()      // 釋放讀鎖RLocker() Locker  // 返回讀鎖,使用 Lock() 和 Unlock() 進行 RLock() 和 RUnlock()

 

讀寫鎖的存在是為了解決讀多寫少時的效能問題,讀場景較多時,讀寫鎖可有效地減少鎖阻塞的時間。

 

  • 效能對比

 

大部分業務場景是讀多寫少,所以使用讀寫鎖可有效提高對共享資料的訪問效率。最壞的情況,只有寫請求,那麼讀寫鎖頂多退化成互斥鎖。所以優先使用讀寫鎖而非互斥鎖,可以提高程式的併發效能。

 

接下來,我們測試三種情景下,互斥鎖和讀寫鎖的效能差異。

 

  • 讀多寫少(讀佔80%)

 

  • 讀寫一致(各佔50%)

 

  • 讀少寫多(讀佔20%)

 

首先根據互斥鎖和讀寫鎖分別實現對共享map的併發讀寫。

 

// OpMapWithMutex 使用互斥鎖讀寫 map。// rpct 為讀操作佔比。func OpMapWithMutex(rpct int) {  m := make(map[int]struct{})  mu := sync.Mutex{}  var wg sync.WaitGroup  for i := 0; i < 100; i++ {    i := i    wg.Add(1)    go func() {      defer wg.Done()      mu.Lock()      defer mu.Unlock()      // 寫操作。      if i >= rpct {        m[i] = struct{}{}        time.Sleep(time.Microsecond)        return      }      // 讀操作。      _ = m[i]      time.Sleep(time.Microsecond)    }()  }  wg.Wait()}
// OpMapWithRWMutex 使用讀寫鎖讀寫 map。// rpct 為讀操作佔比。func OpMapWithRWMutex(rpct int) { m := make(map[int]struct{}) mu := sync.RWMutex{} var wg sync.WaitGroup for i := 0; i < 100; i++ { i := i wg.Add(1) go func() { defer wg.Done() // 寫操作。 if i >= rpct { mu.Lock() defer mu.Unlock() m[i] = struct{}{} time.Sleep(time.Microsecond) return } // 讀操作。 mu.RLock() defer mu.RUnlock() _ = m[i] time.Sleep(time.Microsecond) }() } wg.Wait()}

 

入參rpct用來調節讀操作的佔比,來模擬讀寫佔比不同的場景。rpct設為80表示讀多寫少(讀佔80%),rpct設為50表示讀寫一致(各佔50%),rpct設為20表示讀少寫多(讀佔20%)。

 

func BenchmarkMutexReadMore(b *testing.B) {  for i := 0; i < b.N; i++ {    OpMapWithMutex(80)  }}
func BenchmarkRWMutexReadMore(b *testing.B) { for i := 0; i < b.N; i++ { OpMapWithRWMutex(80) }}
func BenchmarkMutexRWEqual(b *testing.B) { for i := 0; i < b.N; i++ { OpMapWithMutex(50) }}
func BenchmarkRWMutexRWEqual(b *testing.B) { for i := 0; i < b.N; i++ { OpMapWithRWMutex(50) }}
func BenchmarkMutexWriteMore(b *testing.B) { for i := 0; i < b.N; i++ { OpMapWithMutex(20) }}
func BenchmarkRWMutexWriteMore(b *testing.B) { for i := 0; i < b.N; i++ { OpMapWithRWMutex(20) }}

 

執行當前包下的所有基準測試,結果如下:

 

dablelv@DABLELV-MB0 mutex % go test -bench=.goos: darwingoarch: amd64pkg: main/mutexcpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHzBenchmarkMutexReadMore-12                   2462            485917 ns/opBenchmarkRWMutexReadMore-12                 8074            145690 ns/opBenchmarkMutexRWEqual-12                    2406            498673 ns/opBenchmarkRWMutexRWEqual-12                  4124            303693 ns/opBenchmarkMutexWriteMore-12                  1906            532350 ns/opBenchmarkRWMutexWriteMore-12                2462            432386 ns/opPASSok      main/mutex      9.532s

 

可見讀多寫少的場景,使用讀寫鎖併發效能會更優。可以預見的是如果寫佔比更低,那麼讀寫鎖帶的併發效果會更優。

 

這裡需要注意的是,因為每次讀寫map的操作耗時很短,所以每次睡眠一微秒(百萬分之一秒)來增加耗時,不然對共享資源的訪問耗時,小於鎖處理的本身耗時,那麼使用讀寫鎖帶來的效能優化效果將變得不那麼明顯,甚至會降低效能。

 

 

(二)限制協程數量

 

  • 協程數過多的問題

 

  • 程式崩潰

 

Go程(goroutine)是由Go執行時管理的輕量級執行緒。通過它我們可以輕鬆實現併發程式設計。但是當我們無限開闢協程時,將會遇到致命的問題。

 

func main() {  var wg sync.WaitGroup  for i := 0; i < math.MaxInt32; i++ {    wg.Add(1)    go func(i int) {      defer wg.Done()      fmt.Println(i)      time.Sleep(time.Second)    }(i)  }  wg.Wait()}

 

這個例子實現了math.MaxInt32個協程的併發,2^31-1約為20億個,每個協程內部幾乎沒有做什麼事情。正常的情況下呢,這個程式會亂序輸出0~2^31-1個數字。

 

程式會像預期的那樣順利的執行嗎?

 

go run main.go...1086681142025panic: too many concurrent operations on a single file or socket (max 1048575)
goroutine 1158408 [running]:internal/poll.(*fdMutex).rwlock(0xc0000ae060, 0x0) /usr/local/go/src/internal/poll/fd_mutex.go:147 +0x11binternal/poll.(*FD).writeLock(...) /usr/local/go/src/internal/poll/fd_mutex.go:239internal/poll.(*FD).Write(0xc0000ae060, {0xc12cadf690, 0x8, 0x8}) /usr/local/go/src/internal/poll/fd_unix.go:262 +0x72os.(*File).write(...) /usr/local/go/src/os/file_posix.go:49os.(*File).Write(0xc0000ac008, {0xc12cadf690, 0x1, 0xc12ea62f50}) /usr/local/go/src/os/file.go:176 +0x65fmt.Fprintln({0x10c00e0, 0xc0000ac008}, {0xc12ea62f90, 0x1, 0x1}) /usr/local/go/src/fmt/print.go:265 +0x75fmt.Println(...) /usr/local/go/src/fmt/print.go:274main.main.func1(0x0) /Users/dablelv/work/code/test/main.go:16 +0x8f...

 

執行的結果是程式直接崩潰了,關鍵的報錯資訊是:

 

panic: too many concurrent operations on a single file or socket (max 1048575)

 

對單個file/socket的併發操作個數超過了系統上限,這個報錯是fmt.Printf函式引起的,fmt.Printf將格式化後的字串列印到螢幕,即標準輸出。在Linux系統中,標準輸出也可以視為檔案,核心(Kernel)利用檔案描述符(File Descriptor)來訪問檔案,標準輸出的檔案描述符為1,錯誤輸出檔案描述符為2,標準輸入的檔案描述符為0。

 

簡而言之,系統的資源被耗盡了。

 

那如果我們將fmt.Printf這行程式碼去掉呢?那程式很可能會因為記憶體不足而崩潰。這一點更好理解,每個協程至少需要消耗2KB的空間,那麼假設計算機的記憶體是4GB,那麼至多允許4GB/2KB=1M個協程同時存在。那如果協程中還存在著其他需要分配記憶體的操作,那麼允許併發執行的協程將會數量級地減少。

 

  • 協程的代價

 

前面的例子過於極端,一般情況下程式也不會無限開闢協程,旨在說明協程數量是有限制的,不能無限開闢。

 

如果我們開闢很多協程,但不會導致程式崩潰,可以嗎?如果真要這麼做的話,我們應該清楚地知道,協程雖然輕量,但仍有開銷。

 

Go的開銷主要是三個方面:建立(佔用記憶體)、排程(增加排程器負擔)和刪除(增加GC壓力)。

 

  • 記憶體開銷

 

空間上,一個Go程佔用約2K的記憶體,在原始碼src/runtime/runtime2.go裡面,我們可以找到Go程的結構定義type g struct。

 

  • 排程開銷

 

時間上,協程排程也會有CPU開銷。我們可以利用runntime.Gosched()讓當前協程主動讓出CPU去執行另外一個協程,下面看一下協程之間切換的耗時。

 

const NUM = 10000
func cal() { for i := 0; i < NUM; i++ { runtime.Gosched() }}
func main() { // 只設置一個 Processor runtime.GOMAXPROCS(1) start := time.Now().UnixNano() go cal() for i := 0; i < NUM; i++ { runtime.Gosched() } end := time.Now().UnixNano() fmt.Printf("total %vns per %vns", end-start, (end-start)/NUM)}

 

執行輸出:

 

total 997200ns per 99ns

 

可見一次協程的切換,耗時大概在100ns,相對於執行緒的微秒級耗時切換,效能表現非常優秀,但是仍有開銷。

 

  • GC開銷

建立Go程到執行結束,佔用的記憶體資源是需要由GC來回收,如果無休止地建立大量Go程後,勢必會造成對GC的壓力。

 

package main
import ( "fmt" "runtime" "runtime/debug" "sync" "time")
func createLargeNumGoroutine(num int, wg *sync.WaitGroup) { wg.Add(num) for i := 0; i < num; i++ { go func() { defer wg.Done() }() }}
func main() { // 只設置一個 Processor 保證 Go 程序列執行 runtime.GOMAXPROCS(1) // 關閉GC改為手動執行 debug.SetGCPercent(-1)
var wg sync.WaitGroup createLargeNumGoroutine(1000, &wg) wg.Wait() t := time.Now() runtime.GC() // 手動GC cost := time.Since(t) fmt.Printf("GC cost %v when goroutine num is %v\n", cost, 1000)
createLargeNumGoroutine(10000, &wg) wg.Wait() t = time.Now() runtime.GC() // 手動GC cost = time.Since(t) fmt.Printf("GC cost %v when goroutine num is %v\n", cost, 10000)
createLargeNumGoroutine(100000, &wg) wg.Wait() t = time.Now() runtime.GC() // 手動GC cost = time.Since(t) fmt.Printf("GC cost %v when goroutine num is %v\n", cost, 100000)}

 

執行輸出:

 

GC cost 0s when goroutine num is 1000GC cost 2.0027ms when goroutine num is 10000GC cost 30.9523ms when goroutine num is 100000

 

當建立的Go程數量越多,GC耗時越大。

 

上面的分析目的是為了儘可能地量化Goroutine的開銷。雖然官方宣稱用 Golang寫併發程式的時候隨便起個成千上萬的Goroutine毫無壓力,但當我們起十萬、百萬甚至千萬個Goroutine呢?Goroutine輕量的開銷將被放大。

 

  • 限制協程數量

 

系統地資源是有限,協程是有代價的,為了保護程式,提高效能,我們應主動限制併發的協程數量。

 

可以利用通道channel的緩衝區大小來實現。

 

func main() {  var wg sync.WaitGroup  ch := make(chan struct{}, 3)  for i := 0; i < 10; i++ {    ch <- struct{}{}    wg.Add(1)    go func(i int) {      defer wg.Done()      log.Println(i)      time.Sleep(time.Second)      <-ch    }(i)  }  wg.Wait()}

 

上例中建立了緩衝區大小為3的channel,在沒有被接收的情況下,至多傳送3個訊息則被阻塞。開啟協程前,呼叫ch<- struct{}{},若快取區滿,則阻塞。協程任務結束,呼叫<-ch釋放緩衝區。

 

sync.WaitGroup並不是必須的,例如Http服務,每個請求天然是併發的,此時使用channel控制併發處理的任務數量,就不需要 sync.WaitGroup。

 

執行結果如下:

 

2022/03/06 20:37:02 02022/03/06 20:37:02 22022/03/06 20:37:02 12022/03/06 20:37:03 32022/03/06 20:37:03 42022/03/06 20:37:03 52022/03/06 20:37:04 62022/03/06 20:37:04 72022/03/06 20:37:04 82022/03/06 20:37:05 9

 

從日誌中可以很容易看到,每秒鐘只併發執行了3個任務,達到了協程併發控制的目的。

 

  • 協程池化

 

上面的例子只是簡單地限制了協程開闢的數量。在此基礎之上,基於物件複用的思想,我們可以重複利用已開闢的協程,避免協程的重複建立銷燬,達到池化的效果。

 

協程池化,我們可以自己寫一個協程池,但不推薦這麼做。因為已經有成熟的開源庫可供使用,無需再重複造輪子。目前有很多第三方庫實現了協程池,可以很方便地用來控制協程的併發數量,比較受歡迎的有:

 

  • Jeffail/tunny

 

  • panjf2000/ants

 

下面以panjf2000/ants為例,簡單介紹其使用。

 

ants是一個簡單易用的高效能Goroutine池,實現了對大規模Goroutine的排程管理和複用,允許使用者在開發併發程式的時候限制Goroutine數量,複用協程,達到更高效執行任務的效果。


 

package main
import ( "fmt" "time"
"github.com/panjf2000/ants")
func main() { // Use the common pool for i := 0; i < 10; i++ { i := i ants.Submit(func() { fmt.Println(i) }) } time.Sleep(time.Second)}

 

使用ants,我們簡單地使用其預設的協程池,直接將任務提交併發執行。預設協程池的預設容量math.MaxInt32。

 

如果自定義協程池容量大小,可以呼叫NewPool方法來例項化具有給定容量的池,如下所示:

 

// Set 10000 the size of goroutine poolp, _ := ants.NewPool(10000)

 

  • 小結

 

Golang為併發而生。Goroutine是由Go執行時管理的輕量級執行緒,通過它我們可以輕鬆實現併發程式設計。Go雖然輕量,但天下沒有免費的午餐,無休止地開闢大量Go程勢必會帶來效能影響,甚至程式崩潰。所以,我們應儘可能的控制協程數量,如果有需要,請複用它。

 

 

(三)使用sync.Once避免重複執行

 

  • 簡介

 

sync.Once是Go標準庫提供的使函式只執行一次的實現,常應用於單例模式,例如初始化配置、保持資料庫連線等。作用與init函式類似,但有區別。

 

  • init函式是當所在的package首次被載入時執行,若遲遲未被使用,則既浪費了記憶體,又延長了程式載入時間。

 

  • sync.Once可以在程式碼的任意位置初始化和呼叫,因此可以延遲到使用時再執行,併發場景下是執行緒安全的。

 

在多數情況下,sync.Once被用於控制變數的初始化,這個變數的讀寫滿足如下三個條件:

 

  • 當且僅當第一次訪問某個變數時,進行初始化(寫);

 

  • 變數初始化過程中,所有讀都被阻塞,直到初始化完成;

 

  • 變數僅初始化一次,初始化完成後駐留在記憶體裡。

 

 

  • 原理

 

sync.Once用來保證函式只執行一次。要達到這個效果,需要做到兩點:

 

  • 計數器,統計函式執行次數;

 

  • 執行緒安全,保障在多Go程的情況下,函式仍然只執行一次,比如鎖。

 

 

  • 原始碼

 

下面看一下sync.Once結構,其有兩個變數。使用done統計函式執行次數,使用鎖m實現執行緒安全。果不其然,和上面的猜想一致。

 

// Once is an object that will perform exactly one action.//// A Once must not be copied after first use.type Once struct {  // done indicates whether the action has been performed.  // It is first in the struct because it is used in the hot path.  // The hot path is inlined at every call site.  // Placing done first allows more compact instructions on some architectures (amd64/386),  // and fewer instructions (to calculate offset) on other architectures.  done uint32  m    Mutex}

 

sync.Once僅提供了一個匯出方法Do(),引數f是隻會被執行一次的函式,一般為物件初始化函式。

 

// go version go1.17 darwin/amd64
// Do calls the function f if and only if Do is being called for the// first time for this instance of Once. In other words, given// var once Once// if once.Do(f) is called multiple times, only the first call will invoke f,// even if f has a different value in each invocation. A new instance of// Once is required for each function to execute.//// Do is intended for initialization that must be run exactly once. Since f// is niladic, it may be necessary to use a function literal to capture the// arguments to a function to be invoked by Do:// config.once.Do(func() { config.init(filename) })//// Because no call to Do returns until the one call to f returns, if f causes// Do to be called, it will deadlock.//// If f panics, Do considers it to have returned; future calls of Do return// without calling f.//func (o *Once) Do(f func()) { // Note: Here is an incorrect implementation of Do: // // if atomic.CompareAndSwapUint32(&o.done, 0, 1) { // f() // } // // Do guarantees that when it returns, f has finished. // This implementation would not implement that guarantee: // given two simultaneous calls, the winner of the cas would // call f, and the second would return immediately, without // waiting for the first's call to f to complete. // This is why the slow path falls back to a mutex, and why // the atomic.StoreUint32 must be delayed until after f returns.
if atomic.LoadUint32(&o.done) == 0 { // Outlined slow-path to allow inlining of the fast-path. o.doSlow(f) }}
func (o *Once) doSlow(f func()) { o.m.Lock() defer o.m.Unlock() if o.done == 0 { defer atomic.StoreUint32(&o.done, 1) f() }}

 

拋去大段的註釋,可以看到sync.Once實現非常簡潔。Do()函式中,通過對成員變數done的判斷,來決定是否執行傳入的任務函式。執行任務函式前,通過鎖保證任務函式的執行和done的修改是一個互斥操作。在執行任務函式前,對done做一個二次判斷,來保證任務函式只會被執行一次,done只會被修改一次。

 

  • done為什麼是第一個欄位

 

從欄位done前有一段註釋,說明了done為什麼是第一個欄位。

 

done在熱路徑中,done放在第一個欄位,能夠減少CPU指令,也就是說,這樣做能夠提升效能。

 

熱路徑(hot path)是程式非常頻繁執行的一系列指令,sync.Once絕大部分場景都會訪問o.done,在熱路徑上是比較好理解的。如果hot path 編譯後的機器碼指令更少,更直接,必然是能夠提升效能的。

 

為什麼放在第一個欄位就能夠減少指令呢?因為結構體第一個欄位的地址和結構體的指標是相同的,如果是第一個欄位,直接對結構體的指標解引用即可。如果是其他的欄位,除了結構體指標外,還需要計算與第一個值的偏移(calculate offset)。在機器碼中,偏移量是隨指令傳遞的附加值,CPU需要做一次偏移值與指標的加法運算,才能獲取要訪問的值的地址。因為,訪問第一個欄位的機器程式碼更緊湊,速度更快。

 

  • 效能差異

 

我們以一個簡單示例,來說明使用sync.Once保證函式只會被執行一次和多次執行,二者的效能差異。

 

考慮一個簡單的場景,函式ReadConfig需要讀取環境變數,並轉換為對應的配置。環境變數在程式執行前已經確定,執行過程中不會發生改變。ReadConfig可能會被多個協程併發呼叫,為了提升效能(減少執行時間和記憶體佔用),使用sync.Once是一個比較好的方式。

 

type Config struct {  GoRoot string  GoPath string}
var ( once sync.Once config *Config)
func ReadConfigWithOnce() *Config { once.Do(func() { config = &Config{ GoRoot: os.Getenv("GOROOT"), GoPath: os.Getenv("GOPATH"), } }) return config}
func ReadConfig() *Config { return &Config{ GoRoot: os.Getenv("GOROOT"), GoPath: os.Getenv("GOPATH"), }}

 

我們看下二者的效能差異。

 

func BenchmarkReadConfigWithOnce(b *testing.B) {  for i := 0; i < b.N; i++ {    _ = ReadConfigWithOnce()  }}
func BenchmarkReadConfig(b *testing.B) { for i := 0; i < b.N; i++ { _ = ReadConfig() }}

 

執行測試結果如下:

 

go test -bench=. main/oncegoos: darwingoarch: amd64pkg: main/oncecpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHzBenchmarkReadConfigWithOnce-12          670438965                1.732 ns/opBenchmarkReadConfig-12                  13339154                87.46 ns/opPASSok      main/once       3.006s

 

sync.Once中保證了Config初始化函式僅執行了一次,避免了多次重複初始化,在併發環境下很有用。

 

 

(四)使用sync.Cond通知協程

 

  • 簡介

 

sync.Cond是基於互斥鎖/讀寫鎖實現的條件變數,用來協調想要訪問共享資源的那些Goroutine,當共享資源的狀態發生變化的時候,sync.Cond 可以用來通知等待條件發生而阻塞的Goroutine。

 

sync.Cond基於互斥鎖/讀寫鎖,它和互斥鎖的區別是什麼呢?

 

互斥鎖sync.Mutex通常用來保護共享的臨界資源,條件變數sync.Cond 用來協調想要訪問共享資源的Goroutine。當共享資源的狀態發生變化時,sync.Cond可以用來通知被阻塞的Goroutine。

 

  • 使用場景

 

sync.Cond經常用在多個Goroutine等待,一個Goroutine通知(事件發生)的場景。如果是一個通知,一個等待,使用互斥鎖或channel就能搞定了。

 

我們想象一個非常簡單的場景:

 

有一個協程在非同步地接收資料,剩下的多個協程必須等待這個協程接收完資料,才能讀取到正確的資料。在這種情況下,如果單純使用chan或互斥鎖,那麼只能有一個協程可以等待,並讀取到資料,沒辦法通知其他的協程也讀取資料。

 

這個時候,就需要有個全域性的變數來標誌第一個協程資料是否接受完畢,剩下的協程,反覆檢查該變數的值,直到滿足要求。或者建立多個channel,每個協程阻塞在一個channel上,由接收資料的協程在資料接收完畢後,逐個通知。總之,需要額外的複雜度來完成這件事。

 

Go語言在標準庫sync中內建一個sync.Cond用來解決這類問題。

 

  • 原理

 

sync.Cond內部維護了一個等待佇列,佇列中存放的是所有在等待這個 sync.Cond的Go程,即儲存了一個通知列表。sync.Cond可以用來喚醒一個或所有因等待條件變數而阻塞的Go程,以此來實現多個Go程間的同步。

 

sync.Cond的定義如下:

 

// Cond implements a condition variable, a rendezvous point// for goroutines waiting for or announcing the occurrence// of an event.//// Each Cond has an associated Locker L (often a *Mutex or *RWMutex),// which must be held when changing the condition and// when calling the Wait method.//// A Cond must not be copied after first use.type Cond struct {  noCopy noCopy
// L is held while observing or changing the condition L Locker
notify notifyList checker copyChecker}

 

每個Cond例項都會關聯一個鎖L(互斥鎖Mutex,或讀寫鎖RWMutex),當修改條件或者呼叫Wait方法時,必須加鎖。

 

sync.Cond的四個成員函式定義如下:

 

// NewCond returns a new Cond with Locker l.func NewCond(l Locker) *Cond {  return &Cond{L: l}}

 

NewCond建立Cond例項時,需要關聯一個鎖。

 

// Wait atomically unlocks c.L and suspends execution// of the calling goroutine. After later resuming execution,// Wait locks c.L before returning. Unlike in other systems,// Wait cannot return unless awoken by Broadcast or Signal.//// Because c.L is not locked when Wait first resumes, the caller// typically cannot assume that the condition is true when// Wait returns. Instead, the caller should Wait in a loop:////    c.L.Lock()//    for !condition() {//        c.Wait()//    }//    ... make use of condition ...//    c.L.Unlock()//func (c *Cond) Wait() {  c.checker.check()  t := runtime_notifyListAdd(&c.notify)  c.L.Unlock()  runtime_notifyListWait(&c.notify, t)  c.L.Lock()}

 

Wait用於阻塞呼叫者,等待通知。呼叫Wait會自動釋放鎖c.L,並掛起呼叫者所在的goroutine。如果其他協程呼叫了Signal或Broadcast喚醒了該協程,那麼Wait方法在結束阻塞時,會重新給c.L加鎖,並且繼續執行Wait後面的程式碼。

 

對條件的檢查,使用了for !condition()而非if,是因為當前協程被喚醒時,條件不一定符合要求,需要再次Wait等待下次被喚醒。為了保險起,使用for能夠確保條件符合要求後,再執行後續的程式碼。

 

// Signal wakes one goroutine waiting on c, if there is any.//// It is allowed but not required for the caller to hold c.L// during the call.func (c *Cond) Signal() {  c.checker.check()  runtime_notifyListNotifyOne(&c.notify)}
// Broadcast wakes all goroutines waiting on c.//// It is allowed but not required for the caller to hold c.L// during the call.func (c *Cond) Broadcast() { c.checker.check() runtime_notifyListNotifyAll(&c.notify)}

 

Signal只喚醒任意1個等待條件變數c的goroutine,無需鎖保護。Broadcast喚醒所有等待條件變數c的goroutine,無需鎖保護。

 

  • 使用示例

 

我們實現一個簡單的例子,三個協程呼叫Wait()等待,另一個協程呼叫Broadcast()喚醒所有等待的協程。

 

var done = false
func read(name string, c *sync.Cond) { c.L.Lock() for !done { c.Wait() } log.Println(name, "starts reading") c.L.Unlock()}
func write(name string, c *sync.Cond) { log.Println(name, "starts writing") time.Sleep(time.Second) done = true log.Println(name, "wakes all") c.Broadcast()}
func main() { cond := sync.NewCond(&sync.Mutex{})
go read("reader1", cond) go read("reader2", cond) go read("reader3", cond) write("writer", cond)
time.Sleep(time.Second * 3)}

 

done即多個Goroutine 阻塞等待的條件。

 

read()呼叫Wait()等待通知,直到done為true。

 

write()接收資料,接收完成後,將done置為true,呼叫Broadcast()通知所有等待的協程。

 

write()中的暫停了1s,一方面是模擬耗時,另一方面是確保前面的3個 read協程都執行到Wait(),處於等待狀態。main函式最後暫停了3s,確保所有操作執行完畢。

 

執行輸出:

 

go run main.go2022/03/07 17:20:09 writer starts writing2022/03/07 17:20:10 writer wakes all2022/03/07 17:20:10 reader3 starts reading2022/03/07 17:20:10 reader1 starts reading2022/03/07 17:20:10 reader2 starts reading

 

  • 注意事項

 

  • sync.Cond不能被複制

 

sync.Cond不能被複制的原因,並不是因為其內部嵌套了Locker。因為 NewCond時傳入的Mutex/RWMutex指標,對於Mutex指標複製是沒有問題的。

 

主要原因是sync.Cond內部是維護著一個Goroutine通知佇列 notifyList。如果這個佇列被複制的話,那麼就在併發場景下導致不同 Goroutine之間操作的notifyList.wait、notifyList.notify並不是同一個,這會導致出現有些Goroutine會一直阻塞。

 

  • 喚醒順序

 

從等待佇列中按照順序喚醒,先進入等待佇列,先被喚醒。

 

  • 呼叫Wait()前要加鎖

 

呼叫Wait()函式前,需要先獲得條件變數的成員鎖,原因是需要互斥地變更條件變數的等待佇列。在Wait()返回前,會重新上鎖。

 

參考資料:

1.github.com/uber-go/guide

2.go-proverbs

3.github/dgryski/go-perfbook

4.High Performance Go Workshop - Dave Cheney

5.atomic 的原理與使用場景

6.極客兔兔.Go 語言高效能程式設計

7.深度解密Go 語言之sync.Pool-Stefno-部落格園

8.Golang記憶體分配逃逸分析-Gopherzhang

9.Go語言的記憶體逃逸分析-Golang夢工廠

 

 

 作者簡介

 

呂呂

騰訊後臺開發工程師

騰訊後臺開發工程師,畢業於華南理工大學。目前負責NokNok後臺開發工作,有豐富的分散式後臺開發經驗。