Go基礎程式設計:併發程式設計—channel
goroutine執行在相同的地址空間,因此訪問共享記憶體必須做好同步。goroutine 奉行通過通訊來共享記憶體,而不是共享記憶體來通訊。
引⽤型別 channel 是 CSP 模式的具體實現,用於多個 goroutine 通訊。其內部實現了同步,確保併發安全。
1 channel型別
和map類似,channel也一個對應make建立的底層資料結構的引用。
當我們複製一個channel或用於函式引數傳遞時,我們只是拷貝了一個channel引用,因此呼叫者何被呼叫者將引用同一個channel物件。和其它的引用型別一樣,channel的零值也是nil。
定義一個channel時,也需要定義傳送到channel的值的型別。channel可以使用內建的make()函式來建立:
make(chan Type) //等價於make(chan Type, 0)
make(chan Type, capacity)
當 capacity= 0 時,channel 是無緩衝阻塞讀寫的,當capacity> 0 時,channel 有緩衝、是非阻塞的,直到寫滿 capacity個元素才阻塞寫入。
channel通過操作符<-來接收和傳送資料,傳送和接收資料語法:
channel <- value //傳送value到channel
<-channel //接收並將其丟棄
x := <-channel //從channel中接收資料,並賦值給x
x, ok := <-channel //功能同上,同時檢查通道是否已關閉或者是否為空
預設情況下,channel接收和傳送資料都是阻塞的,除非另一端已經準備好,這樣就使得goroutine同步變的更加的簡單,而不需要顯式的lock。
示例程式碼:
func main() {
c := make(chan int)
go func() {
defer fmt.Println("子協程結束")
fmt.Println("子協程正在執行……")
c <- 666 //666傳送到c
}()
num := <-c //從c中接收資料,並賦值給num
fmt.Println("num = ", num)
fmt.Println("main協程結束")
}
程式執行結果:
2 無緩衝的channel
無緩衝的通道(unbuffered channel)是指在接收前沒有能力儲存任何值的通道。
這種型別的通道要求傳送 goroutine 和接收 goroutine 同時準備好,才能完成傳送和接收操作。如果兩個goroutine沒有同時準備好,通道會導致先執行傳送或接收操作的 goroutine 阻塞等待。
這種對通道進行傳送和接收的互動行為本身就是同步的。其中任意一個操作都無法離開另一個操作單獨存在。
下圖展示兩個 goroutine 如何利用無緩衝的通道來共享一個值:
- 在第 1 步,兩個 goroutine 都到達通道,但哪個都沒有開始執行傳送或者接收。
- 在第 2 步,左側的 goroutine 將它的手伸進了通道,這模擬了向通道傳送資料的行為。這時,這個 goroutine 會在通道中被鎖住,直到交換完成。
- 在第 3 步,右側的 goroutine 將它的手放入通道,這模擬了從通道里接收資料。這個 goroutine 一樣也會在通道中被鎖住,直到交換完成。
- 在第 4 步和第 5 步,進行交換,並最終,在第 6 步,兩個 goroutine 都將它們的手從通道里拿出來,這模擬了被鎖住的 goroutine 得到釋放。兩個 goroutine 現在都可以去做別的事情了。
無緩衝的channel建立格式:
make(chan Type) //等價於make(chan Type, 0)
如果沒有指定緩衝區容量,那麼該通道就是同步的,因此會阻塞到傳送者準備好傳送和接收者準備好接收。
示例程式碼:
func main() {
c := make(chan int, 0) //無緩衝的通道
//內建函式 len 返回未被讀取的緩衝元素數量, cap 返回緩衝區大小
fmt.Printf("len(c)=%d, cap(c)=%d\n", len(c), cap(c))
go func() {
defer fmt.Println("子協程結束")
for i := 0; i < 3; i++ {
c <- i
fmt.Printf("子協程正在執行[%d]: len(c)=%d, cap(c)=%d\n", i, len(c), cap(c))
}
}()
time.Sleep(2 * time.Second) //延時2s
for i := 0; i < 3; i++ {
num := <-c //從c中接收資料,並賦值給num
fmt.Println("num = ", num)
}
fmt.Println("main協程結束")
}
程式執行結果:
3 有緩衝的channel
有緩衝的通道(buffered channel)是一種在被接收前能儲存一個或者多個值的通道。
這種型別的通道並不強制要求 goroutine 之間必須同時完成傳送和接收。通道會阻塞傳送和接收動作的條件也會不同。只有在通道中沒有要接收的值時,接收動作才會阻塞。只有在通道沒有可用緩衝區容納被髮送的值時,傳送動作才會阻塞。
這導致有緩衝的通道和無緩衝的通道之間的一個很大的不同:無緩衝的通道保證進行傳送和接收的 goroutine 會在同一時間進行資料交換;有緩衝的通道沒有這種保證。
示例圖如下:
- 在第 1 步,右側的 goroutine 正在從通道接收一個值。
- 在第 2 步,右側的這個 goroutine獨立完成了接收值的動作,而左側的 goroutine 正在傳送一個新值到通道里。
- 在第 3 步,左側的goroutine 還在向通道傳送新值,而右側的 goroutine 正在從通道接收另外一個值。這個步驟裡的兩個操作既不是同步的,也不會互相阻塞。
- 最後,在第 4 步,所有的傳送和接收都完成,而通道里還有幾個值,也有一些空間可以存更多的值。
有緩衝的channel建立格式:
make(chan Type, capacity)
如果給定了一個緩衝區容量,通道就是非同步的。只要緩衝區有未使用空間用於傳送資料,或還包含可以接收的資料,那麼其通訊就會無阻塞地進行。
示例程式碼:
func main() {
c := make(chan int, 3) //帶緩衝的通道
//內建函式 len 返回未被讀取的緩衝元素數量, cap 返回緩衝區大小
fmt.Printf("len(c)=%d, cap(c)=%d\n", len(c), cap(c))
go func() {
defer fmt.Println("子協程結束")
for i := 0; i < 3; i++ {
c <- i
fmt.Printf("子協程正在執行[%d]: len(c)=%d, cap(c)=%d\n", i, len(c), cap(c))
}
}()
time.Sleep(2 * time.Second) //延時2s
for i := 0; i < 3; i++ {
num := <-c //從c中接收資料,並賦值給num
fmt.Println("num = ", num)
}
fmt.Println("main協程結束")
}
程式執行結果:
4 range和close
如果傳送者知道,沒有更多的值需要傳送到channel的話,那麼讓接收者也能及時知道沒有多餘的值可接收將是有用的,因為接收者可以停止不必要的接收等待。這可以通過內建的close函式來關閉channel實現。
示例程式碼:
func main() {
c := make(chan int)
go func() {
for i := 0; i < 5; i++ {
c <- i
}
//把 close(c) 註釋掉,程式會一直阻塞在 if data, ok := <-c; ok 那一行
close(c)
}()
for {
//ok為true說明channel沒有關閉,為false說明管道已經關閉
if data, ok := <-c; ok {
fmt.Println(data)
} else {
break
}
}
fmt.Println("Finished")
}
程式執行結果:
注意點:
- channel不像檔案一樣需要經常去關閉,只有當你確實沒有任何傳送資料了,或者你想顯式的結束range迴圈之類的,才去關閉channel;
- 關閉channel後,無法向channel 再發送資料(引發 panic 錯誤後導致接收立即返回零值);
- 關閉channel後,可以繼續向channel接收資料;
- 對於nil channel,無論收發都會被阻塞。
可以使用 range 來迭代不斷操作channel:
func main() {
c := make(chan int)
go func() {
for i := 0; i < 5; i++ {
c <- i
}
//把 close(c) 註釋掉,程式會一直阻塞在 for data := range c 那一行
close(c)
}()
for data := range c {
fmt.Println(data)
}
fmt.Println("Finished")
}
5 單方向的channel
預設情況下,通道是雙向的,也就是,既可以往裡面傳送資料也可以同裡面接收資料。
但是,我們經常見一個通道作為引數進行傳遞而值希望對方是單向使用的,要麼只讓它傳送資料,要麼只讓它接收資料,這時候我們可以指定通道的方向。
單向channel變數的宣告非常簡單,如下:
var ch1 chan int // ch1是一個正常的channel,不是單向的
var ch2 chan<- float64 // ch2是單向channel,只用於寫float64資料
var ch3 <-chan int // ch3是單向channel,只用於讀取int資料
- chan<- 表示資料進入管道,要把資料寫進管道,對於呼叫者就是輸出。
- <-chan 表示資料從管道出來,對於呼叫者就是得到管道的資料,當然就是輸入。
可以將 channel 隱式轉換為單向佇列,只收或只發,不能將單向 channel 轉換為普通 channel:
c := make(chan int, 3)
var send chan<- int = c // send-only
var recv <-chan int = c // receive-only
send <- 1
//<-send //invalid operation: <-send (receive from send-only type chan<- int)
<-recv
//recv <- 2 //invalid operation: recv <- 2 (send to receive-only type <-chan int)
//不能將單向 channel 轉換為普通 channel
d1 := (chan int)(send) //cannot convert send (type chan<- int) to type chan int
d2 := (chan int)(recv) //cannot convert recv (type <-chan int) to type chan int
示例程式碼:
// chan<- //只寫
func counter(out chan<- int) {
defer close(out)
for i := 0; i < 5; i++ {
out <- i //如果對方不讀 會阻塞
}
}
// <-chan //只讀
func printer(in <-chan int) {
for num := range in {
fmt.Println(num)
}
}
func main() {
c := make(chan int) // chan //讀寫
go counter(c) //生產者
printer(c) //消費者
fmt.Println("done")
}
6 定時器
6.1 Timer
Timer是一個定時器,代表未來的一個單一事件,你可以告訴timer你要等待多長時間,它提供一個channel,在將來的那個時間那個channel提供了一個時間值。
示例程式碼:
import "fmt"
import "time"
func main() {
//建立定時器,2秒後,定時器就會向自己的C位元組傳送一個time.Time型別的元素值
timer1 := time.NewTimer(time.Second * 2)
t1 := time.Now() //當前時間
fmt.Printf("t1: %v\n", t1)
t2 := <-timer1.C
fmt.Printf("t2: %v\n", t2)
//如果只是想單純的等待的話,可以使用 time.Sleep 來實現
timer2 := time.NewTimer(time.Second * 2)
<-timer2.C
fmt.Println("2s後")
time.Sleep(time.Second * 2)
fmt.Println("再一次2s後")
<-time.After(time.Second * 2)
fmt.Println("再再一次2s後")
timer3 := time.NewTimer(time.Second)
go func() {
<-timer3.C
fmt.Println("Timer 3 expired")
}()
stop := timer3.Stop() //停止定時器
if stop {
fmt.Println("Timer 3 stopped")
}
fmt.Println("before")
timer4 := time.NewTimer(time.Second * 5) //原來設定3s
timer4.Reset(time.Second * 1) //重新設定時間
<-timer4.C
fmt.Println("after")
}
6.2 Ticker
Ticker是一個定時觸發的計時器,它會以一個間隔(interval)往channel傳送一個事件(當前時間),而channel的接收者可以以固定的時間間隔從channel中讀取事件。
示例程式碼:
func main() {
//建立定時器,每隔1秒後,定時器就會給channel傳送一個事件(當前時間)
ticker := time.NewTicker(time.Second * 1)
i := 0
go func() {
for { //迴圈
<-ticker.C
i++
fmt.Println("i = ", i)
if i == 5 {
ticker.Stop() //停止定時器
}
}
}() //別忘了()
//死迴圈,特地不讓main goroutine結束
for {
}
}