Go 語言併發程式設計介紹
Go 語言併發程式設計介紹
作者:閃電豹貓 碼字不易,轉載需註明出處
1. 併發與並行
現代 CPU 1 秒內能執行數十億條指令,而人類的反應速度最快也才是毫秒級,所以計算機才看起來是在同時執行多項任務。一句話概括就是 “巨集觀並行,微觀序列”。
序列 (Serial) 是指一個一個來處理任務。
併發 (Concurrency) 是指把任務在不同的時間點交給單個處理器進行處理。從處理器的角度來看,處理器沒有 “分身” 去同時執行多項任務,任務不會在微觀層面同時執行。
並行 (Parallelism) 是把每一個任務分給每一個處理器獨立完成,多個任務之間一定是同時執行的。
舉個許多人去打水的栗子:
-
序列:只有一個水龍頭,所有人排隊打水,一個人打滿後下一個人才可以開始打。排在隊伍末尾的人可能需要很長的等待時間才能打上水。
-
併發:仍然只有一個水龍頭,不過每個人一次最多打 5 秒鐘的水,用完水才能再過來接著排隊打。這樣的好處是相比於序列,每個人都能在需要水的時候很快打上一點點水,這點水用完了,還可以排隊過來打,大大縮短了每個人的平均等待時間。
-
並行:多少人過來打水就有多少個水龍頭,這樣能讓所有人同時打夠自己需要的水,沒有排隊時間。這樣的情況並不多見,一般還是併發見得多。並行可以看成 “多個序列”。
2. 程序與執行緒
程式都是存在磁碟上的。當我們雙擊執行這個程式時,這個程式就會被複制到記憶體裡,成為活躍的程式。每個活躍的程式都會佔用系統資源,成為一個程序,為了管理它們對系統資源的使用情況,作業系統會為這個程序分配一個 PID (Process ID),哪怕是多次雙擊執行同一個程式,系統也會把它們視為不同的程序,分配不同的 PID 。程序是作業系統分配資源的基本單位。
執行緒又稱輕量級程序,是包含於程序裡的實體概念,一個程序通常包含若干個執行緒。執行緒可以呼叫程序所擁有的系統資源,所以執行緒是獨立執行和獨立排程的基本單位。比如一個音樂 APP ,在聽歌的同時可以刷評論區,對應的就是音樂 APP 程序裡有兩個執行緒:播放執行緒和網頁瀏覽執行緒。
3. Goroutine
3.1 協程 Coroutine
協程 (Coroutine),又稱微執行緒,是一種比執行緒還輕量級的存在。一個程序可擁有多個執行緒,一個執行緒可以擁有多個協程。協程是編譯器級的,程序和執行緒是作業系統級的。因此,協程是有程式自己管理,不由作業系統核心直接管理;程序和協程則直接接受作業系統核心管理。因此協程的切換沒有執行緒的切換那樣的開銷。協程的輕量性可以允許程式建立上萬個協程而不會導致系統資源枯竭。
3.2 Coroutine 與 Goroutine
Go 語言的協程叫 Goroutine
,由 Go runtime 排程管理。Go 程式會智慧的排程各 Goroutine 給 每個 CPU,建立 Goroutine 的堆疊開銷很小。
Coroutine 只能順序執行,而 Goroutine 可以並行執行。前者只能發生在單執行緒中,而後者可以發生在多個執行緒中。Goroutine 是協程的深度抽象。
Coroutine 程式只有主動交出控制權後,系統才能把控制權轉給其他 Coroutine;在 CPU 閒置時,若 Coroutine 拒絕交出控制權,作業系統也無能為力,只能乾等著,也就是失去響應和宕機。Goroutine 屬於搶佔式處理,也就不存在這樣的問題了。
3.3 建立 Goroutine
Goroutine 語法規則:
go 函式名( 引數列表 )
比如:
go NewVector( x, y, z )
同一程式 (程序) 的所有 Goroutine 都共享同一個地址空間。
需要注意的是,所有 Goroutine 都會在 main() 函式終止時停止,不論這些 Goroutine 本身是否執行完畢。Goroutine 內還可以開啟 “子 Goroutine”。main() 自己就是一個協程,叫主協程。
比如如下程式碼:
package main
import (
"fmt"
"time"
)
func main() {
for i := 0; i <= 5; i++ {
go f(i)
}
time.Sleep(2 * time.Second) // 等上兩秒鐘,讓所有 Goroutine 執行完再退出
}
func f(a int) {
go fmt.Println(a + 1000)
fmt.Println(a)
}
它的一種可能輸出為
5
1
1001
3
1003
0
2
1000
1002
1005
4
1004
如果沒有那行 “等兩秒鐘” 的程式碼,在我這裡試了十次,都是沒有輸出就退出了。這是因為 main() 比各 Goroutine 都要先終止的緣故。但是,使用time.Sleep()
的方法以阻塞主協程來讓其他協程執行完畢的方式並不可靠。後面會將到什麼樣的阻塞方式更可靠。
下面這個示例允許程式在列印數字的同時,一直等待使用者的輸入,給了輸入後輸出所輸入的並退出程式。
package main
import (
"fmt"
"time"
)
func ticking() {
var sec int
for {
fmt.Printf("%d\n", sec)
sec++
time.Sleep(1 * time.Second)
}
}
func main() {
go ticking() // 將列印數字交給 Goroutine
var str string
for {
fmt.Scanf("%s", &str)
fmt.Println(str)
return
}
}
可能的輸出:
0
1
2
waht // 這行是鍵盤輸入
waht
3.4 匿名函式建立 Goroutine
匿名函式和閉包也是可以用來作為 Goroutine 的物件的。比如如下程式碼:
package main
import (
"fmt"
"time"
)
func main() {
go func() {
var sec int
for {
fmt.Printf("%d\n", sec)
sec++
time.Sleep(1 * time.Second)
}
}()
var str string
for {
fmt.Scanf("%s", &str)
fmt.Println(str)
return
}
}
這個就是 3.3 節最後一個示例程式碼的匿名函式寫法,故不重複了。
3.5 啟動多個 Goroutine
多個 Goroutine 是搶佔式執行,隨機排程。比如:
package main
import (
"fmt"
"time"
)
func Num() {
for i := 0; i <= 9; i++ {
fmt.Printf("%d", i)
time.Sleep(250 * time.Millisecond) // 250 ms
}
}
func Letter() {
for i := 'a'; i <= 'k'; i++ {
fmt.Printf("%c", i)
time.Sleep(250 * time.Millisecond) // 250 ms
}
}
func main() {
go Num()
go Letter()
time.Sleep(3 * time.Second)
}
一種可能的輸出:
a01bc2d3e4f5g6h7i8j9k
3.6 調整併發執行效能
在多個 Goroutine 的情況下,可以通過runtime.Gosched()
交出控制權。不過一般不需要這個函式,實踐中一般也用不著。
在傳統併發程式設計中,開發者需要維護執行緒池中的執行緒數量與 CPU 核心數的對應關係。Go 併發是由 Go runtime 實現智慧排程的,不過開發者也可以通過runtime.GOMAXPROCS( 邏輯處理器數量 )
調整。
Go 1.5 版本 (2015 年中期的更新) 開始,預設讓程式碼併發執行,最大效率的利用 CPU。
我們可以通過for
迴圈來建立多個 Goroutine,下面的例子就是一個同時建立多個 Goroutine 的程式程式碼:
package main
import (
"fmt"
"runtime"
)
func main() {
runtime.GOMAXPROCS(16)
var counter = 0
func() {
for i := 0; i < 1000; i++ {
go func() {
counter++ // 這句不是一個原子操作
}()
}
}()
fmt.Println("Please ENTER to continue...")
var s string
fmt.Scanln(&s) // 阻塞程式,比 time.Sleep() 的阻塞方式更可靠
fmt.Println(counter)
}
一種可能的輸出是:
Please ENTER to continue...
885
一般來講,我們預期的結果應該是1000
,但是實際上,每次執行這個程式,輸出的結果都是不確定的,結果範圍在閉區間 [0, 1000] 內,這就是併發程式的不確定性,同時這也反應出併發程式經常會遇到的資料同步問題。為什麼?
在上面的程式碼中,counter++
並不是一個原子操作。所謂的原子操作可以是一個步驟,也可以是多個操作步驟,但是其順序不可以被打亂,也不可以被切割而只執行其中的一部分。語句counter++
不是原子操作,它由多個步驟組成:
- 將記憶體中 counter 的值載入到 CPU 中
- 在 CPU 中對 counter 加 1
- 將 CPU 中的 counter 值儲存到記憶體中
因此,當在多核 CPU 的計算機上時,若沒有顯式地呼叫語句runtime.GOPAXPROCS( 1 )
將併發程式只允許執行在一個 CPU 核心上的話,該程式將可能按照預設設定,執行在多個 CPU 上。由於 CPU 執行的速度非常快,可能會出現資料不同步的情況,為了更清晰的說明,我舉個栗子,可能會發生這樣的現象:
- 假設現在記憶體中 counter = 123
- CPU 3 將記憶體中 counter 的值 [123] 載入進來
- CPU 3 對 123 + 1 進行運算,得到 [124]
- CPU 1 將記憶體中 counter 的值 [123] 載入進來
- CPU 3 將運算結果 [124] 儲存在 counter 所佔的記憶體空間中
- CPU 6 將記憶體中 counter 的值 [124] 載入進來
- CPU 6 對 124 + 1 進行運算,得到 [125]
- CPU 1 對 123 + 1 進行運算,得到 [124]
- CPU 6 將運算結果 [125] 儲存在 counter 所佔的記憶體空間中
- CPU 1 將運算結果 [124] 儲存在 counter 所佔的記憶體空間中
- 最終記憶體中 counter = 124
我們發現,三個 CPU 對變數 counter 獨立且同時執行了counter++
的指令,但是最後結果卻不是我們期望的 126,這就是因為變數 counter 的記憶體空間對各個 CPU 是共享的,如果我們在程式中限制GOMAXPROCS
為 1 或者設定互斥鎖,在一個 CPU 對這塊記憶體區域訪問時,禁止其它 CPU 對該塊的訪問,讓它們等待上一個 CPU 的對該記憶體區域的操作完成就可以解決問題了。具體後面會介紹。
3.7 Goroutine 異常處理
在我們的併發程式執行的過程中,可能會遇到某個 Goroutine 遇到錯誤並導致整個程式崩潰的情況,這種一個子協程異常導致整個程式崩潰的情況是在是因小失大,因此有了下面的方法來解決:
package main
import (
"fmt"
"sync"
)
var wg sync.WaitGroup
func div(num int) {
defer func() { // defer 關鍵詞後面的這個匿名函式是在 div() 執行完畢返回(或者崩潰)時才開始執行的
err := recover()
if err != nil {
fmt.Println(err)
}
wg.Done() // 給協程計數器減1,在defer裡面,保證能被執行
}()
fmt.Printf("10/%d = %d\n", num, 10/num)
}
func main() {
for i := 0; i <= 5; i++ {
wg.Add(1) // 給協程計數器加1
go div(i)
}
wg.Wait() // 確保所有協程執行完畢再退出程式
}
可能的輸出結果:
10/5 = 2
10/1 = 10
10/3 = 3
10/2 = 5
runtime error: integer divide by zero
10/4 = 2
這樣,一個協程崩了,就不再會導致整個程式崩掉。
4. Go 排程器
Go 語言的併發排程 (Schedule) 模型叫做 G-M-P 模型:
-
G: 是一個 Goroutine 協程物件,每次呼叫一個 Goroutine 時,都會建立一個 G 物件。它是對併發執行的任務的封裝,G 物件是一個非常輕量級的物件,屬於使用者級資源,對作業系統透明,上下文切換消耗低。
-
M: 是 Machine 的簡稱,代表一個執行緒物件。每建立一個 M 時,都會有一個底層執行緒被建立。M 的作用就是允許 G 中包裝的併發任務,Go 排程器的目標是把 G 高效的繫結到 M 上去執行。M 屬於系統資源,建立數量受系統的限制,一般來說,G 的數量要遠高於 M 的數量。M 最高的數量是 10000,這個數可由
runtime/debug.SetMaxThreads()
來更改。 -
P: 是 Processor 的簡稱,是一個邏輯處理器物件。每個 M 都得繫結到一個 P 上去執行,就像一個執行緒得繫結到一個 CPU 核心上去一樣。每個 P 都有個全域性 G 佇列,主要用於管理 G 物件,併為 G 在 M 上的執行提供本地化條件。P 物件的最大數量就是
GOMAXPROCS
(也就是 256),啟動程式後是固定的,一般不修改 P 的數量;M 和 P 的數量一般不相等,因為可能會有休眠的 M 。 -
本地佇列 (Local Queue): 每個 P 維護一個本地佇列,與 P 繫結的 M 中如果有新的 G 需要執行,一般會放到 P 的本地佇列裡儲存;除非本地佇列已滿,才會擷取本地佇列中特定數量的 G 放入全域性佇列裡,然後把剛才因滿而沒放入本地佇列的 G 放入本地佇列裡。
-
全域性佇列 (Global Queue): 可儲存本地佇列儲存不下的 G 物件。為了保證排程的公平性,排程器在排程過程中會有特定的概率 (如 1/61) 優先從全域性佇列裡獲取 G。
-
竊取 (Stealing): 為了提高資源的利用率,使得空閒的 M 有活幹,排程器會隨機地從其他 P 的本地佇列裡竊取特定數量的 G 到空閒的 M 上去執行。
在程式開始執行的時候,Go 排程器會首先初始化執行緒 m0、棧、垃圾回收,以及建立和初始化由runtime.GOMAXPROCS( N )
設定的 N 個邏輯處理器的 P 物件。具體地,首先啟動執行緒 m0,將 m0 繫結到某一個邏輯處理器 P 上,並從 P 的本地佇列裡獲取需要執行的 G。最先開始獲取到的是 main 協程。G 是擁有棧的,M 便會根據 G 中的棧資訊和排程資訊設定執行環境。
執行緒 M 允許 G,當執行的 G 退出後,M 會被再次獲取可執行的 G,這樣一直重複,知道 main 協程退出。
G 只能執行在 M 上,一個 M 必須持有一個 P,P:非休眠的M = 1:1。M 會從 P 的本地佇列裡彈出一個處於可執行狀態的 G。如果 P 的本地佇列為空,就會執行竊取。
每次使用關鍵詞go
時,都會:
- 建立一個 G 物件,加入到本地佇列或全域性佇列。
- 如果還有空閒的 P,那麼建立一個 M
- M 會啟動一個底層執行緒,迴圈執行能找到的 G 任務。
- G 任務的執行順序是:先從本地佇列找,沒有的話再從全域性佇列找
當 M 執行某一個 G 時,如果發生了syscall
或者其他的阻塞操作,M 就會阻塞。如果當前 M 有一些 G 在執行,排程器會把 這個執行緒 M 和 P 解除繫結關係,然後建立一個新的執行緒 M 服務於 P。
5. 協程間通訊 之 共享變數
共享變數是 Go 程式的各個 Goroutine 之間傳遞資料的兩大方法之一。為了保證資料同步,會配合鎖機制,利用讀寫鎖或互斥鎖對共享變數加鎖,從而保證多個協程的資料共享安全。
在 3.6 節的示例程式中,沒有實現不同協程對共享變數counter
的安全訪問。下面的程式碼是為 3.6 節的程式碼引入鎖機制,實現資料安全訪問。
package main
import (
"fmt"
"runtime"
"sync"
)
func main() {
runtime.GOMAXPROCS(16)
var counter = 0
var mlock sync.Mutex // 定義一個互斥鎖
func() {
for i := 0; i < 1000; i++ {
go func() {
mlock.Lock() // 給該句後面遇到的變數加鎖
counter++ // 這句不是一個原子操作
mlock.Unlock() // 結束加鎖
}()
}
}()
fmt.Println("Please ENTER to continue...")
var s string
fmt.Scanln(&s) // 阻塞程式,比 time.Sleep() 的阻塞方式更可靠
fmt.Println(counter)
}
按理來說,控制檯輸出應該是:
Please ENTER to continue...
1000
一旦對counter
進行加鎖操作,其他 Goroutine 就無法在它釋放前從記憶體中載入counter
的值進行處理,而必須等待,從而保證資料的安全訪問。
6. 協程間通訊 之 通道 (channel)
通道 (channel) 又稱通道,是協程間通訊的兩大方法的另一種方法。通道本質是一個 FIFO (先進先出,First In Fisrt Out) 的佇列,這種協程間通訊方式比共享變數的方式更簡單高效。Go 語言提倡使用 channel 替代共享記憶體。
6.1 建立 channel 型別
channel 也是一種資料型別,宣告一個 channel 型別的語法格式如下:
var 通道名 chan 通道具體型別
建立一個 channel 格式如下:
通道名 := make( 通道具體型別 )
比如:
var a chan int // 只是個宣告
var b chan *float64 // 只是個宣告
var c chan nil // 只是個宣告
d := make( chan string ) // 例項化
e := make( chan interface{} ) // 例項化
其中的通道e
是一個空介面型通道,可以傳入任意型別的資料。
channel 不是值型別,而是引用型別,所以需要make()
建立了 (例項化了) 才能使用。
6.2 向 channel 寫入資料
讓 channel 接收資料需要一個特殊的操作符<-
,比如
a := make( chan int16 )
a <- 5
表示建立了一個型別為 int16 的通道,並且向其傳送了資料 5。
預設情況下,channel 都是無緩衝的。也就是說,在上面的例子中,如果我們傳送了 5,卻沒有後續的語句從管道 a 進行接收,那麼所有協程 (包括 main 協程) 將會持續阻塞。
因此,如下程式碼會使程式出現死鎖 (deadlock),程式無響應,Go runtime 會識別出來並報錯退出:fatal error: all goroutines are asleep - deadlock!
:
func main() {
a := make(chan int)
a <- 5
}
這樣修改也會死鎖,程式報錯退出:
func main() {
a := make(chan int)
a <-5
fmt.Println( <-a )
}
這樣修改才能保證並不是所有的協程處於阻塞狀態:
func main() {
a := make(chan int)
go func() {
a <- 5
}()
fmt.Println(<-a)
}
具體為什麼第二種修改方式是可行的,後面會介紹到。
無緩衝通道本身並不儲存資料,只是負責資料的流通。
-
從無緩衝通道取資料,必須要有資料流進來才可以,否則當前 Goroutine 阻塞,直到有協程傳入資料
-
資料流入無緩衝通道, 如果沒有其他 Goroutine 來拿走這個資料,那麼當前 Goroutine 阻塞,直到有協程過來拿走這個資料
因此,使用無緩衝 channel,必定涉及兩個協程。上面兩個死鎖報錯的程式全程式使用了無緩衝 channel 卻只有一個協程,這就一定會導致死鎖。可以通過改成有緩衝 channel 或者將讀寫通道的操作扔到新協程裡去來改進。
6.3 從 channel 讀取資料
無緩衝 channel 的收發操作應該發生在兩個協程中。
假定我們有個叫做ch
的 int16 通道,讀資料的語法有下面幾種:
- 簡易阻塞接收資料
a := <-ch
執行該語句時,該句所在 Goroutine 將阻塞,直到有資料傳進來為止
- 完整的阻塞接收資料
aa, ok := <-ch // ok 是個 bool 型變數,用於判斷當前 channel 是否關閉
第二關ok
變數是用來判斷通道ch
是否關閉的變數。為 true 時,通道開啟;為 false 時,通道關閉。
- 忽略接收的資料
<-ch
這種讀取通道資料卻又忽略所讀的資料的方法,其意圖不在於資料的傳遞,而是用來阻塞該語句所在的協程。
- for ... range 迴圈接收資料
通過for ... range
語句,可以將一個有緩衝通道的資料一股腦按順序全讀出來,這種for ... range
會在所有資料讀取完畢畢後自動停止迴圈,比如:
for aaa := range ch {
fmt.Printf("從通道讀取資料:%v\n", aaa)
}
- for 迴圈接收資料
如果只是用了for
迴圈而沒有配合上range
,那麼需要手動使用break
退出迴圈:
for {
aaaa, ojbk := <-ch
if !ojbk {
fmt.Println("資料讀取完畢")
break
}
fmt.Printf("讀取到資料:%v\n", aaaa)
}
下面給出這 5 方法的示例程式:
package main
import (
"fmt"
)
func SendData(ch chan int16) {
defer close(ch) // bie wang le guan diao tong dao, fou ze zhe ge goroutine hui yi zhi kaiz he
ch <- 1
ch <- 3
ch <- 5
}
func reader1(ch chan int16) {
defer fmt.Println("One is done.")
for aa := range ch {
fmt.Printf("Method One: %v\n", aa)
}
}
func reader2(ch chan int16) {
defer fmt.Println("Two is done.")
for {
aaa, ojbk := <-ch
if !ojbk {
break
}
fmt.Printf("Method Two: %v\n", aaa)
}
}
func reader3(ch chan int16) {
defer fmt.Println("Three is done.")
for {
aaa := <-ch
if aaa == 0 {
break
}
fmt.Printf("Method Three: %v\n", aaa)
}
}
func reader4(ch chan int16) {
defer fmt.Println("Four is done.")
fmt.Printf("Method Four: %v %v %v\n", <-ch, <-ch, <-ch)
}
func main() {
// 1: for ... range 方法
ch := make(chan int16)
go SendData(ch)
reader1(ch)
// 2: for break
ch2 := make(chan int16)
go SendData(ch2)
reader2(ch2)
// 3: for break 不推薦,因為有可能傳入的就是型別預設值,比如 0 或者 "" 或者 false
ch3 := make(chan int16)
go SendData(ch3)
reader3(ch3)
// 4: 一個一個讀取
ch4 := make(chan int16)
go SendData(ch4)
reader4(ch4)
}
for ... range 迴圈遍歷通道後,必須關閉通道。如果未關閉,則會引發死鎖。
6.4 無緩衝通道
在各類書籍文獻中,一旦提到 Go 的通道 (channel),那麼沒有特殊說明,一定說的是無緩衝通道。本文也是如此,提到通道或者 channel 一定指的是無緩衝通道。
我們都知道如何建立一個通道:
ch := make(chan 型別)
通道可分成 3 種類型:
-
只讀通道:只允許讀取,不允許寫入的通道。建立一個只讀通道的語法如下:
ch := make(<-chan 型別)
-
只寫通道:只允許寫入,不允許讀取的通道。建立一個只寫通道的語法如下:
ch := make(chan<- 型別)
-
雙向通道:就是一個普通的通道。建立一個雙向通道的語法如下:
ch := make(chan 型別)
一個通道預設就是個雙向通道,而對於<-
,可以理解為資料的流動方向。<-chan
就是隻讀,chan<-
就是隻寫。
有的人可能會問,一個只讀或只寫的通道有個啥子存在的意義呢?實際上,它們還是很有用的,比如用於傳遞函式引數時,這樣子開發可以提高程式的安全性和可靠性。一個函式專門用於向通道寫資料,另一個函式專門用於讀資料,這樣可以避免在函式內誤操作帶來的錯誤。
在 6.2 和 6.3 節中,我們已經初步介紹了通道的讀寫方法。接下來,我們還要進一步地介紹無緩衝通道的讀寫。
func main() {
ch := make(chan int16)
go func() {
ch <- 7 // 這裡只發生了一個數據
}()
fmt.Println(<-ch) // 從管道取一個數據,沒問題
//fmt.Println(<-ch) // 再取一個出來會遭遇死鎖
time.Sleep(2 * time.Second) // 讓所有子協程在 main 協程退出前完成
}
控制檯輸出:
7
該示例中,傳送資料與讀取資料不在同一個協程中,因為在只有一個主協程的程式裡對無緩衝通道讀寫資料會遭遇死鎖。其次,如果把第二次讀取資料那行取消註釋,則也會遭遇死鎖,這是因為程式執行到這一行之後,由於沒有其他協程向通道寫入資料,所以該句所在的協程將會持續阻塞。由於程式此時只剩一個協程在執行,所以會死鎖崩掉。
通道可以通過close(通道名)
關閉。如果在向通道寫入資料後手動關閉了這個通道,那麼沒有任何其它協程往信道里寫資料,協程也可以正常從這個關閉了的通道讀取資料,這個資料是型別零值 (如 0 、"" 、false) 。但是記住,往一個關閉了的信道里寫資料會導致報錯崩掉。
不能關閉一個只讀的通道,可以關閉一個只寫和雙向通道。
一般不推薦使用 “for 和判斷零值” 來迴圈讀通道,因為如果一個通道已被關閉,那麼再迴圈讀取時,從信道里讀出來的將總會是零值,也就永遠無法退出迴圈了:
for {
aaa := <-ch
if aaa == 0 {
break // 如果通道 ch 已關閉,那麼這將是個死迴圈
}
}
用for ... range
就不會遇到這個零值問題,不過要記住,傳送方一定要在傳送完所有資料後來一句close(通道名)
,否則讀取方協程這邊會一直等著傳送方新資料的傳送,也就是無法結束讀取。這就是說,若使用 for ... range 來接收資料,那麼傳送方傳送的資料必須是有限個,傳送方不能使用無限 for{} 迴圈。
如果傳送方使用無限 for{} 迴圈,那麼接收方絕不能使用 for ... range 迴圈,只能使用無限 for{} 迴圈來接收資料。當然,接收方使用無限 for{} 接收資料時,最好設定個退出條件,要不然傳送方的無限傳送協程都結束了,接收方這邊是無法知道的,接收方這邊可能又是一個退不出的死迴圈。至於這個退出條件怎麼寫比較好,下面會介紹。
6.5 select 語句塊
select 語言塊是專門為通道而設計的,它和 switch 的語法規則非常相近,相同點是可以有多個 case 和一個 default,不同點是在 select 中,所有的 case 語句要麼執行通道的寫入,要麼執行通道的讀取。select 與 {
之間不能有任何表示式。select 裡的 case 是隨機執行的,不是順序執行的。fallthrough
不能用於 select 語句塊裡。
select 語句塊本身不帶有迴圈監聽機制,需要通過外層 for 迴圈來啟用迴圈監聽機制。在監聽 case 分支時,如果沒有任何分支滿足監聽條件,則進入阻塞狀態。如果有多個分支符合條件,則隨機選擇一個執行。default
分支是用來處理所有分支都不符合條件的情況。
下面給出示例程式碼:
package main
import (
"fmt"
"time"
)
func send1(ch chan<- int) {
time.Sleep(1500 * time.Millisecond) // 設定該協程先休眠 1.5 秒再寫入資料
ch <- 7
}
func send2(ch chan<- string) {
time.Sleep(400 * time.Millisecond) // 設定該協程先休眠 0.4 秒再向通道寫入資料
ch <- "Upside down T"
}
func main() {
ch1 := make(chan int)
ch2 := make(chan string)
go send1(ch1)
go send2(ch2)
for {
select {
case aaa := <-ch1:
fmt.Printf("Got it from ch1: %v\n", aaa)
case aaa := <-ch2:
fmt.Printf("Got it from ch2: %v\n", aaa)
case <-time.After(1 * time.Second): // 這裡設定的超時時間為 1 秒
fmt.Println("Time out.")
goto byebye
}
}
byebye:
fmt.Println("See you.")
}
控制檯輸出:
Got it from ch2: Upside down T
Time out.
See you.
在該示例程式碼中,都是先休眠一段時間再發送資料。我們用了無限 for 迴圈來配合 select 開啟迴圈監聽 case 分支,其中第三個 case 分支case <-time.After(1 * time.Second)
中的time.After(1 * time.Second)
函式返回一個型別為<-chan Time
的只讀通道,我們選擇對這個通道丟棄資料接收的方式接受通道資料,這實際是個超時處理。如果超時,就需要跳出 for 迴圈,不需要再繼續監聽了,此處goto byebye
來實現。如果用 break,只能跳出 select 中的一個 case 選項,而不能跳出 for 迴圈。
該例中,由於 send2() 可以看成花費了 0.4 秒向通道傳送資料,在 1 秒的超時時間內,所以通道 ch2 正常的接收了;而 send1() 裡我們設定的休眠 1.5 秒大於了超時時間,所以通道 ch1 沒有接收到資料。
如果我們把超時分支去掉,換成一個 default 分支,分支內用 goto 跳出 for 迴圈,其他不變:
package main
import (
"fmt"
"time"
)
func send1(ch chan<- int) {
ch <- 7
}
func send2(ch chan<- string) {
ch <- "Upside down T"
}
func main() {
ch1 := make(chan int)
ch2 := make(chan string)
go send1(ch1)
go send2(ch2)
time.Sleep(1 * time.Second) // 這裡設定成 1 s 後才開始接收資料,避免資料還沒傳送,接收方啥也沒收到而直接 default 退出
for {
select {
case aaa := <-ch1:
fmt.Printf("Got it from ch1: %v\n", aaa)
case aaa := <-ch2:
fmt.Printf("Got it from ch2: %v\n", aaa)
default:
fmt.Println("Literally nothing more here.")
goto byebye
}
}
byebye:
fmt.Println("Cya.")
}
控制檯輸出:
Got it from ch2: Upside down T
Got it from ch1: 7
Literally nothing more here.
Cya.
這份修改過的程式碼中,如果直接去掉 default 分支的話,switch 所在協程會阻塞,而我們的程式一共就一個協程,這會導致死鎖:
Got it from ch1: 7
Got it from ch2: Upside down T
fatal error: all goroutines are asleep - deadlock!
6.6 有緩衝通道
有緩衝通道 (Buffered channel),是在通道自身帶有一定快取空間用於接收資料的通道。如果說無緩衝通道的資料收發雙方必須同步操作的話,那麼有緩衝通道資料收發操作是可以非同步完成的。有緩衝通道可以看成一個有容量上限的佇列用於暫時儲存資料,如果緩衝滿了,就會發生通道阻塞,這時除非接收方從通道讀取資料,否則通道將就這麼一直阻塞,無法寫入新的資料。
舉個有緩衝通道的栗子吧。菜就是資料,後廚們不同炒菜可看成不同資料的傳送,上菜就是資料的接收,飯店的上菜視窗可看成緩衝區,假設這個視窗最多放得下 3 個碟子,這樣一個飯店就可以看成一個有緩衝通道了。咱們管後廚叫 Goroutine 1,服務生為 Goroutine 2,服務生會一直盯著出菜視窗,一有菜出來就立刻端走一碟去把菜上給顧客的桌上去,然後回來看是否還有待上的碟子;而後廚只管做菜就好。這兩個角色是獨立進行的,就好比兩個 Goroutine 一個只發送一個只接受,也是獨立執行的。
在 Go 語言中,建立一個緩衝 channel 的語法如下:
ch := make(chan 型別, 容量)
任何時候,我們都可以通過len(通道名)
函式獲得通道的當前有多少個元素、通過cap(通道名)
獲得通道的容量。
簡單來幾個示例程式碼吧:
示例 1
func main() {
ch := make(chan int, 1000)
ch <- 6
ch <- 9
fmt.Println(<-ch, <-ch)
}
控制檯輸出6 9
沒問題。
示例 2
func main() {
ch := make(chan int, 1)
ch <- 6
ch <- 9
fmt.Println(<-ch, <-ch)
}
示例 2 會發生 deadlock 崩潰。
示例 3 :
func main() {
ch := make(chan int, 3)
ch <- 6
fmt.Println(<-ch, <-ch)
}
同樣的,示例 3 會死鎖崩潰。
下表給出了通道不同狀態的操作規則:
操作 | 通道狀態 | 結果 |
---|---|---|
讀取操作 | nil | 阻塞 |
讀取操作 | 活躍且非空 | 成功獲取到值 |
讀取操作 | 活躍且空 | 阻塞 |
讀取操作 | 已關閉 | 型別預設值 |
讀取操作 | 只寫通道 | 錯誤 |
操作 | 通道狀態 | 結果 |
---|---|---|
寫入操作 | nil | 阻塞 |
寫入操作 | 活躍且已滿 | 阻塞 |
寫入操作 | 活躍且不滿 | 成功寫入 |
寫入操作 | 已關閉 | panic 異常 |
寫入操作 | 只讀通道 | 錯誤 |
操作 | 通道之前狀態 | 結果 |
---|---|---|
關閉操作 | nil | panic |
關閉操作 | 活躍且非空 | 成功關閉。若通道內還有值時,可繼續成功讀取,直到無值後,可讀取型別預設值 |
關閉操作 | 活躍且空 | 成功關閉。可繼續成功讀取型別預設值 |
關閉操作 | 已關閉 | panic |
關閉操作 | 只讀通道 | 錯誤 |
7. sync 包介紹
7.1 同步等待組 sync.WaitGroup
WaitGroup 的定義方式如下:
type WaitGroup struct {
noCopy noCopy
state1 [12]byte
sema uint32
}
sync.WaitGroup 中有三個方法Add()
Done()
Wait()
。
-
Add()
Add方法的定義是func (wg *WaitGroup) Add(delta int)
其中 delta 可正可負,一般為正。通過該方法來增加應等待 Goroutine 的數量。 -
Done()
func (wg *WaitGroup) Done()
用來減小 WaitGroup 計數器的值,應在 Goroutine 的最後執行。 -
Wait()
func (wg *WaitGroup) Wait()
用於阻塞 Goroutine 知道 WaitGroup 的計數器的值為 0。
7.2 互斥鎖 sync.Mutex
互斥鎖的定義如下:
type Mutex struct {
state int32
sema uint32
}
Mutex 可以作為其它結構體的欄位,一般用於保護那些包含了 Mutex 屬性的自定義結構體。像 int、float64、string 等內建型別都不包含 Mutex 屬性,用互斥鎖來保護其實是不合理的。
type mytype struct {
m sync.Mutex
var int
}
x := new(mytype)
像上述程式碼一樣,將任意型別與一個互斥鎖封裝成一個新結構體型別,這樣當 m 上鎖後,var 也是一併“上鎖”的。這樣的用法是推薦的,也是用來保護任意型別的好辦法。
Mutex 中就倆方法:
-
Lock()
原型是func (m *Mutex) Lock()
通過 Lock() 來鎖住 m,如果 m 已加鎖,則阻塞直到 m 解鎖。 -
Unlock()
func (m *Mutex) Unlock()
Unlock() 用於解鎖 m,如果 m 未加鎖則會導致執行時錯誤。
互斥鎖不與 Goroutine 繫結,也就是說,Lock() 和 Unlock() 可以發生在不同的 Goroutine 中。
實踐中,Mutex 多用於寫多讀少的情況。
7.3 讀寫互斥鎖 sync.RWMutex
讀寫互斥鎖的定義:
type RWMutex struct {
w Mutex // held if there are pending writers
writeSem unit32 // 寫鎖需要等待讀鎖釋放的訊號量
readerSem uint32 // 讀鎖需要等待寫鎖釋放的訊號量
readerCount int32 // 讀鎖計數器
readerWait int32 // 獲取寫鎖時需要等待的讀鎖釋放數量
}
讀寫互斥鎖簡稱讀寫鎖 (RWMutex),分為讀鎖和讀寫鎖。讀寫鎖在使用過程中,讀與讀不互斥,讀與寫互斥,寫與寫互斥。因為 RWMutex 內部由 Mutex 實現,所以讀寫鎖也是與 Goroutine 無關,而且 RWMutex 也可以作為其他結構體的欄位。
當讀鎖上鎖時,所有 Goroutine 都可以進行讀取操作而不能寫入。當讀寫上鎖時,同一時刻只能存在一個 Goroutine 進行讀寫操作,其他協程沒有讀和寫的許可權。
RWMutex 中有 5 個方法:
-
Lock()
func (rw *RWMutex) Lock()
,將 rw 鎖定為寫入狀態,禁止其他協程讀取和寫入。 -
Unlock()
func (rw *RWMutex) Unlock()
,解除 rw 的寫入鎖,如果 rw 未加寫入鎖則會遇到執行時錯誤。 -
RLock()
func (rw *RWMutex) RLock()
,將 rw 鎖定為讀取狀態,禁止其他協程寫入,但不禁止讀取 -
RUnlock()
func (rw *RWMutex) RUnlock()
,接觸 rw 的讀取鎖,如果 rw 未加讀取鎖則會遇到執行時錯誤。 -
RLocker()
func (rw *RWMutex) RLocker()
,返回一個讀寫鎖,通過呼叫 rw.RLock() 和 rw.RUnlock() 實現了 Locker 介面。
下面給出一個使用到了 WaitGroup 和 RWMutex 的一個示例程式碼,該程式模擬售票大廳的多視窗並行查詢餘票和搶票功能。
package main
import (
"fmt"
"sync"
"time"
)
type ticketS struct {
m sync.RWMutex
num int
}
func BuyTicket(wg *sync.WaitGroup, lck *ticketS, window int) {
defer (*wg).Done()
//(*wg).Add(1)
for {
(*lck).m.Lock() // other goroutines cant read or write
if (*lck).num > 0 {
(*lck).num--
fmt.Printf("[%d]: just got one ticket\n", window)
} else {
fmt.Printf("[%d]: sold out\n", window)
(*lck).m.Unlock()
break
}
(*lck).m.Unlock()
time.Sleep(70 * time.Millisecond) // 設定該視窗 0.07 秒後才接受下一個查餘票請求
}
}
func QueryTicket(wg *sync.WaitGroup, lck *ticketS, window int) {
defer (*wg).Done()
for {
(*lck).m.RLock()
fmt.Printf("[%d]: There's %d tickets left\n", window, (*lck).num)
(*lck).m.RUnlock()
if (*lck).num <= 0 {
break
} else {
time.Sleep(60 * time.Millisecond) // 設定該視窗 0.06 秒後才接受下一個查餘票請求
}
}
}
func main() {
defer fmt.Println("main done")
var wg sync.WaitGroup
var ticket ticketS
ticket.num = 55 // 一共 55 張票
wg.Add(6) // 3 個協程用於查餘票,3 個協程用於搶票
go QueryTicket(&wg, &ticket, 1)
go QueryTicket(&wg, &ticket, 2)
go QueryTicket(&wg, &ticket, 3)
go BuyTicket(&wg, &ticket, 1)
go BuyTicket(&wg, &ticket, 2)
go BuyTicket(&wg, &ticket, 3)
wg.Wait()
}
控制檯可能的一種輸出為:
[3]: just got one ticket
[1]: There's 54 tickets left
[3]: There's 54 tickets left
此處省略很多行...
[2]: There's 1 tickets left
[1]: just got one ticket
[3]: sold out
[1]: There's 0 tickets left
[3]: There's 0 tickets left
[2]: sold out
[2]: There's 0 tickets left
[1]: sold out
main done
RWMutex 適用於讀取量大的情況,因為 RWMutex 多個讀可以並存。
碼字不易,轉載需註明出處:https://www.cnblogs.com/hhzm/p
本文在撰寫時難免會有錯誤和不足之處,歡迎在評論區裡留言討論,一起進步。