理解Go協程與併發
協程
Go語言裡建立一個協程很簡單,使用go
關鍵字就可以讓一個普通方法協程化:
package main import ( "fmt" "time" ) func main(){ fmt.Println("run in main coroutine.") for i:=0; i<10; i++ { go func(i int) { fmt.Printf("run in child coroutine %d.\n", i) }(i) } //防止子協程還沒有結束主協程就退出了 time.Sleep(time.Second * 1) }
下面這些概念可能不太好理解,需要慢慢理解。可以先跳過,回頭再來看。
概念:
協程
可以理解為純使用者態的執行緒,其通過協作而不是搶佔來進行切換。相對於程序或者執行緒,協程所有的操作都可以在使用者態完成,建立和切換的消耗更低。
- 一個程序內部可以執行多個執行緒,而每個執行緒又可以執行很多協程。執行緒要負責對協程進行排程,保證每個協程都有機會得到執行。當一個協程睡眠時,它要將執行緒的執行權讓給其它的協程來執行,而不能持續霸佔這個執行緒。同一個執行緒內部最多隻會有一個協程正在執行。
- 協程可以簡化為三個狀態:
執行態
、就緒態
和休眠態
。同一個執行緒中最多隻會存在一個處於執行態的協程。就緒態協程
是指那些具備了執行能力但是還沒有得到執行機會的協程,它們隨時會被排程到執行態;休眠態的協程
- 子協程的異常退出會將異常傳播到主協程,直接會導致主協程也跟著掛掉。
協程一般用 TCP/HTTP/RPC服務、訊息推送系統、聊天系統等。使用協程,我們可以很方便的搭建一個支援高併發的TCP或HTTP服務端。
通道
通道的英文是Channels,簡稱chan
。什麼時候要用到通道呢?可以先簡單的理解為:協程
在需要協作通訊的時候就需要用通道。
在GO裡,不同的並行協程之間交流的方式有兩種,一種是通過共享變數,另一種是通過通道。Go 語言鼓勵使用通道的形式來交流。
舉個簡單的例子,我們使用協程實現併發呼叫遠端介面,最終我們需要把每個協程請求回來的資料進行彙總一起返回,這個時候就用到通道了。
建立通道
建立通道
(channel)只能使用make
函式:
c := make(chan int)
通道
是區分型別的,如這裡的int
。
Go 語言為通道的讀寫設計了特殊的箭頭語法糖 <-
,讓我們使用通道時非常方便。把箭頭寫在通道變數的右邊就是寫通道,把箭頭寫在通道的左邊就是讀通道。一次只能讀寫一個元素。
c := make(chan bool)
c <- true //寫入
<- c //讀取
緩衝通道
上面我們介紹了預設的非快取型別的channel,不過Go也允許指定channel的緩衝大小,很簡單,就是channel可以儲存多少元素:
c := make(chan int, value)
當 value = 0
時,通道
是無緩衝阻塞讀寫的,等價於make(chan int)
;當value > 0
時,通道
有緩衝、是非阻塞的,直到寫滿 value
個元素才阻塞寫入。具體說明下:
非緩衝通道
無論是傳送操作還是接收操作,一開始執行就會被阻塞,直到配對的操作也開始執行才會繼續傳遞。由此可見,非緩衝通道是在用同步的方式傳遞資料。也就是說,只有收發雙方對接上了,資料才會被傳遞。資料是直接從傳送方複製到接收方的,中間並不會用非緩衝通道做中轉。
緩衝通道
緩衝通道可以理解為訊息佇列,在有容量的時候,傳送和接收是不會互相依賴的。用非同步的方式傳遞資料。
下面我們用一個例子來理解一下:
package main
import "fmt"
func main() {
var c = make(chan int, 0)
var a string
go func() {
a = "hello world"
<-c
}()
c <- 0
fmt.Println(a)
}
這個例子輸出的一定是hello world
。但是如果你把通道的容量由0改為大於0的數字,輸出結果就不一定是hello world
了,很可能是空。為什麼?
當通道是無緩衝通道時,執行到c <- 0
,通道滿了,寫操作會被阻塞住,直到執行<-c
解除阻塞,後面的語句接著執行。
要是改成非阻塞通道,執行到c <- 0
,發現還能寫入,主協程就不會阻塞了,但這時候輸出的是空字串還是hello world
,取決於是子協程和主協程哪個執行的速度快。
通道作為容器,它可以像切片一樣,使用
cap()
和len()
全域性函式獲得通道的容量和當前內部的元素個數。
模擬訊息佇列
上一節"協程"的例子裡,我們在主協程里加了個time.Sleep()
,目的是防止子協程還沒有結束主協程就退出了。但是對於實際生活的大多數場景來說,1秒是不夠的,並且大部分時候我們都無法預知for迴圈內程式碼執行時間的長短。這時候就不能使用time.Sleep()
來完成等待操作了。下面我們用通道來改寫:
package main
import (
"fmt"
)
func main() {
fmt.Println("run in main coroutine.")
count := 10
c := make(chan bool, count)
for i := 0; i < count; i++ {
go func(i int) {
fmt.Printf("run in child coroutine %d.\n", i)
c <- true
}(i)
}
for i := 0; i < count; i++ {
<-c
}
}
單向通道
預設的通道是支援讀寫的,我們可以定義單向通道:
//只讀
var readOnlyChannel = make(<-chan int)
//只寫
var writeOnlyChannel = make(chan<- int)
下面是一個示例,我們模擬訊息佇列的消費者、生產者:
package main
import (
"fmt"
"time"
)
func Producer(c chan<- int) {
for i := 0; i < 10; i++ {
c <- i
}
}
func Consumer1(c <-chan int) {
for m := range c {
fmt.Printf("oh, I get luckly num: %v\n", m)
}
}
func Consumer2(c <-chan int) {
for m := range c {
fmt.Printf("oh, I get luckly num too: %v\n", m)
}
}
func main() {
c := make(chan int, 2)
go Consumer1(c)
go Consumer2(c)
Producer(c)
time.Sleep(time.Second)
}
對於生產者,我們希望通道是隻寫屬性,而對於消費者則是隻讀屬性,這樣避免對通道進行錯誤的操作。當然,如果你將本例裡消費者、生產者的通道單向屬性去掉也是可以的,沒什麼問題:
func Producer(c chan int) {}
func Consumer1(c chan int) {}
func Consumer2(c chan int) {}
事實上
channel
只讀或只寫都沒有意義,所謂的單向channel
其實只是方法裡宣告時用,如果後續程式碼裡,向本來用於讀channel
裡寫入了資料,編譯器會提示錯誤。
關閉通道
讀取一個已經關閉的通道會立即返回通道型別的零值
,而寫一個已經關閉的通道會拋異常。如果通道里的元素是整型的,讀操作是不能通過返回值來確定通道是否關閉的。
1、如何安全的讀通道,確保不是讀取的已關閉通道的零值
?
答案是使用for...range
語法。當通道為空時,迴圈會阻塞;當通道關閉,迴圈會停止。通過迴圈停止,我們可以認為通道已經關閉。示例:
package main
import "fmt"
func main() {
var c = make(chan int, 3)
//子協程寫
go func() {
c <- 1
close(c)
}()
//直接讀取通道,存在不知道子協程是否已關閉的情況
//fmt.Println(<-c)
//fmt.Println(<-c)
//主協程讀取:使用for...range安全的讀取
for value := range c {
fmt.Println(value)
}
}
輸出:
1
2、如何安全的寫通道,確保不會寫入已關閉的通道?
Go 語言並不存在一個內建函式可以判斷出通道是否已經被關閉。確保通道寫安全的最好方式是由負責寫通道的協程自己來關閉通道,讀通道的協程不要去關閉通道。
但是這個方法只能解決單寫多讀的場景。如果遇到多寫單讀的情況就有問題了:無法知道其它寫協程什麼時候寫完,那麼也就不能確定什麼時候關閉通道。這個時候就得額外使用一個通道專門做這個事情。
我們可以使用內建的 sync.WaitGroup
,它使用計數來等待指定事件完成:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var ch = make(chan int, 8)
//寫協程
var wg = new(sync.WaitGroup)
for i := 1; i <= 4; i++ {
wg.Add(1)
go func(num int, ch chan int, wg *sync.WaitGroup) {
defer wg.Done()
ch <- num
ch <- num * 10
}(i, ch, wg)
}
//讀
go func(ch chan int) {
for num := range ch {
fmt.Println(num)
}
}(ch)
//Wait阻塞等待所有的寫通道協程結束,待計數值變成零,Wait才會返回
wg.Wait()
//安全的關閉通道
close(ch)
//防止讀取通道的協程還沒有完畢
time.Sleep(time.Second)
fmt.Println("finish")
}
輸出:
3
30
2
20
1
10
4
40
finish
多路通道
有時候還會遇到多個生產者,只要有一個生產者就緒,消費者就可以進行消費的情況。這個時候可以使用go語言提供的select
語句,它可以同時管理多個通道讀寫,如果所有通道都不能讀寫,它就整體阻塞,只要有一個通道可以讀寫,它就會繼續。示例:
package main
import (
"fmt"
"time"
)
func main() {
var ch1 = make(chan int)
var ch2 = make(chan int)
fmt.Println(time.Now().Format("15:04:05"))
go func(ch chan int) {
time.Sleep(time.Second)
ch <- 1
}(ch1)
go func(ch chan int) {
time.Sleep(time.Second * 2)
ch <- 2
}(ch2)
for {
select {
case v := <-ch1:
fmt.Println(time.Now().Format("15:04:05") + ":來自ch1:", v)
case v := <-ch2:
fmt.Println(time.Now().Format("15:04:05") + ":來自ch2:", v)
//default:
//fmt.Println("channel is empty !")
}
}
}
輸出:
13:39:56
13:39:57:來自ch1: 1
13:39:58:來自ch2: 2
fatal error: all goroutines are asleep - deadlock!
預設select
處於阻塞狀態,1s後,子協程1完成寫入,主協程讀出了資料;接著子協程2完成寫入,主協程讀出了資料;接著主協程掛掉了,原因是主協程發現在等一個永遠不會來的資料,這顯然是沒有結果的,乾脆就直接退出了。
如果把註釋的部分開啟,那麼程式在打印出來自ch1、ch2的資料後,就會一直執行default
裡面的程式。這個時候程式不會退出。原因是當 select
語句所有通道都不可讀寫時,如果定義了 default
分支,那就會執行 default
分支邏輯。
注:
select{}
程式碼塊是一個沒有任何case
的select
,它會一直阻塞。
Chan的應用場景
golang中chan的應用場景總結
https://github.com/nange/blog/issues/9
Go語言之Channels實際應用
https://www.s0nnet.com/archives/go-channels-practice
- 訊息佇列
- 併發請求
- 模擬鎖的功能
- 模擬sync.WaitGroup
- 平行計算
通道原理部分可以根據文末給出的參考連結
《快學 Go 語言》第 12 課 —— 通道
去檢視。
併發鎖
互斥所
go語言裡的map
是執行緒不安全的:
package main
import "fmt"
func write(d map[string]string) {
d["name"] = "yujc"
}
func read(d map[string]string) {
fmt.Println(d["name"])
}
func main() {
d := map[string]string{}
go read(d)
write(d)
}
Go 語言內建了資料結構競態檢查
工具來幫我們檢查程式中是否存線上程不安全的程式碼,只要在執行的時候加上-race
引數即可:
$ go run -race main.go
==================
WARNING: DATA RACE
Read at 0x00c0000a8180 by goroutine 6:
...
yujc
Found 2 data race(s)
exit status 66
可以看出,上面的程式碼存在安全隱患。
我們可以使用sync.Mutex
來保護map
,原理是在每次讀寫操作之前使用互斥鎖進行保護,防止其他執行緒同時操作:
package main
import (
"fmt"
"sync"
)
type SafeDict struct {
data map[string]string
mux *sync.Mutex
}
func NewSafeDict(data map[string]string) *SafeDict {
return &SafeDict{
data: data,
mux: &sync.Mutex{},
}
}
func (d *SafeDict) Get(key string) string {
d.mux.Lock()
defer d.mux.Unlock()
return d.data[key]
}
func (d *SafeDict) Set(key string, value string) {
d.mux.Lock()
defer d.mux.Unlock()
d.data[key] = value
}
func main(){
dict := NewSafeDict(map[string]string{})
go func(dict *SafeDict) {
fmt.Println(dict.Get("name"))
}(dict)
dict.Set("name", "yujc")
}
執行檢測:
$ go run -race main.go
yujc
上面的程式碼如果不使用-race
執行,不一定會有結果,取決於主協程、子協程哪個先執行。
注意:
sync.Mutex
是一個結構體物件,這個物件在使用的過程中要避免被淺拷貝,否則起不到保護作用。應儘量使用它的指標型別。
上面的程式碼裡我們多處使用了d.mux.Lock()
,能否簡化成d.Lock()
呢?答案是可以的。我們知道,結構體可以自動繼承匿名內部結構體的所有方法:
type SafeDict struct {
data map[string]string
*sync.Mutex
}
func NewSafeDict(data map[string]string) *SafeDict {
return &SafeDict{data, &sync.Mutex{}}
}
func (d *SafeDict) Get(key string) string {
d.Lock()
defer d.Unlock()
return d.data[key]
}
這樣就完成了簡化。
讀寫鎖
對於讀多寫少的場景,可以使用讀寫鎖
代替互斥鎖
,可以提高效能。
讀寫鎖提供了下面4個方法:
Lock()
寫加鎖Unlock()
寫釋放鎖RLock()
讀加鎖RUnlock()
讀釋放鎖
寫鎖
是排它鎖
,加寫鎖
時會阻塞其它協程再加讀鎖
和寫鎖
;讀鎖
是共享鎖
,加讀鎖還可以允許其它協程再加讀鎖
,但是會阻塞加寫鎖
。讀寫鎖
在寫併發高的情況下效能退化為普通的互斥鎖
。
我們把上節中的互斥鎖換成讀寫鎖:
package main
import (
"fmt"
"sync"
)
type SafeDict struct {
data map[string]string
*sync.RWMutex
}
func NewSafeDict(data map[string]string) *SafeDict {
return &SafeDict{data, &sync.RWMutex{}}
}
func (d *SafeDict) Get(key string) string {
d.RLock()
defer d.RUnlock()
return d.data[key]
}
func (d *SafeDict) Set(key string, value string) {
d.Lock()
defer d.Unlock()
d.data[key] = value
}
func main(){
dict := NewSafeDict(map[string]string{})
go func(dict *SafeDict) {
fmt.Println(dict.Get("name"))
}(dict)
dict.Set("name", "yujc")
}
改完後,使用競態檢測工具檢測還是能通過的。
參考
1、make(chan int) 和 make(chan int, 1) 的區別
https://www.jianshu.com/p/f12e1766c19f
2、channel
https://www.jianshu.com/p/4d97dc032730
3、《快學 Go 語言》第 12 課 —— 通道
https://mp.weixin.qq.com/s?__biz=MzI0MzQyMTYzOQ==&mid=2247484601&idx=1&sn=97c0de2acc3127c9e913b6338fa65737
4、《快學 Go 語言》第 13 課 —— 併發與安全
https://mp.weixin.qq.com/s?__biz=MzI0MzQyMTYzOQ==&mid=2247484683&idx=1&sn=966cb818f034ffd4538eae7a61cd0