1. 程式人生 > >由淺入深剖析 go channel

由淺入深剖析 go channel

channel 是 golang 中最核心的 feature 之一,因此理解 Channel 的原理對於學習和使用 golang 非常重要。

channel 是 goroutine 之間通訊的一種方式,可以類比成 Unix 中的程序的通訊方式管道。

CSP 模型

在講 channel 之前,有必要先提一下 CSP 模型,傳統的併發模型主要分為 Actor 模型和 CSP 模型,CSP 模型全稱為 communicating sequential processes,CSP 模型由併發執行實體(程序,執行緒或協程),和訊息通道組成,實體之間通過訊息通道傳送訊息進行通訊。和 Actor 模型不同,CSP 模型關注的是訊息傳送的載體,即通道,而不是傳送訊息的執行實體。關於 CSP 模型的更進一步的介紹,有興趣的同學可以閱讀論文 Communicating Sequential Processes,Go 語言的併發模型參考了 CSP 理論,其中執行實體對應的是 goroutine, 訊息通道對應的就是 channel。

channel 介紹

channel 提供了一種通訊機制,通過它,一個 goroutine 可以想另一 goroutine 傳送訊息。channel 本身還需關聯了一個型別,也就是 channel 可以傳送資料的型別。
例如: 傳送 int 型別訊息的 channel 寫作 chan int 。

channel 建立

channel 使用內建的 make 函式建立,下面聲明瞭一個 chan int 型別的 channel:

ch := make(chan int)

c和 map 類似,make 建立了一個底層資料結構的引用,當賦值或引數傳遞時,只是拷貝了一個 channel 引用,指向相同的 channel 物件。
和其他引用型別一樣,channel 的空值為 nil 。
使用 == 可以對型別相同的 channel 進行比較,只有指向相同物件或同為 nil 時,才返回 true。

channel 的讀寫操作

ch := make(chan int)
// write to channel
ch <- x
// read from channel
x <- ch
// another way to read
x = <- ch

channel 一定要初始化後才能進行讀寫操作,否則會永久阻塞。

關閉 channel

golang 提供了內建的 close 函式對 channel 進行關閉操作。

ch := make(chan int)
close(ch)

有關 channel 的關閉,你需要注意以下事項:
關閉一個未初始化(nil) 的 channel 會產生 panic
重複關閉同一個 channel 會產生 panic
向一個已關閉的 channel 中傳送訊息會產生 panic
從已關閉的 channel 讀取訊息不會產生 panic,且能讀出 channel 中還未被讀取的訊息,若訊息均已讀出,則會讀到型別的零值。從一個已關閉的 channel 中讀取訊息永遠不會阻塞,並且會返回一個為 false 的 ok-idiom,可以用它來判斷 channel 是否關閉
關閉 channel 會產生一個廣播機制,所有向 channel 讀取訊息的 goroutine 都會收到訊息

ch := make(chan int, 10)
ch <- 11
ch <- 12
close(ch)
for x := range ch {
    fmt.Println(x)
}
x, ok := <- ch
fmt.Println(x, ok)


-----
output:

11
12
0 false

channel 的型別

channel 分為不帶快取的 channel 和帶快取的 channel。

無快取的 channel

從無快取的 channel 中讀取訊息會阻塞,直到有 goroutine 向該 channel 中傳送訊息;同理,向無快取的 channel 中傳送訊息也會阻塞,直到有 goroutine 從 channel 中讀取訊息。
通過無快取的 channel 進行通訊時,接收者收到資料 happens before 傳送者 goroutine 喚醒。

package main 
import (
    "fmt"
    "time"
)
func producer(ch chan<- int) {
    for i := 0; i < 10; i++ {
        ch <- i
        fmt.Println("Send:", i)
    }
}
func consumer(ch <-chan int) {
    for i := 0; i < 10; i++ {
        v := <-ch
        fmt.Println("Receive:", v)
    }
}
 
/**
// 因為channel沒有緩衝區,所以當生產者給channel賦值後,
// 生產者執行緒會阻塞,直到消費者執行緒將資料從channel中取出
// 消費者第一次將資料取出後,進行下一次迴圈時,消費者的執行緒
// 也會阻塞,因為生產者還沒有將資料存入,這時程式會去執行
// 生產者的執行緒。程式就這樣在消費者和生產者兩個執行緒間不斷切換,直到迴圈結束。
 */
func main2() {
    ch := make(chan int)
    go producer(ch)
    go consumer(ch)
    time.Sleep(1 * time.Second)
}

有快取的 channel

有快取的 channel 的宣告方式為指定 make 函式的第二個引數,該引數為 channel 快取的容量

ch := make(chan int, 10)
package main 
import (
    "fmt"
    "time"
)
func producer(ch chan<- int) {
    for i := 0; i < 10; i++ {
        ch <- i
        fmt.Println("Send:", i)
    }
}
func consumer(ch <-chan int) {
    for i := 0; i < 10; i++ {
        v := <-ch
        fmt.Println("Receive:", v)
    }
}

/**
在這個程式中,緩衝區可以儲存10個int型別的整數,
在執行生產者執行緒的時候,執行緒就不會阻塞,
一次性將10個整數存入channel,在讀取的時候,也是一次性讀取。
 */
func main() {
    ch := make(chan int, 10)
    go producer(ch)
    go consumer(ch)
    time.Sleep(1 * time.Second)
}
 

有快取的 channel 類似一個阻塞佇列(採用環形陣列實現)。
當快取未滿時,向 channel 中傳送訊息時不會阻塞,當快取滿時,傳送操作將被阻塞,直到有其他 goroutine 從中讀取訊息;
相應的,當 channel 中訊息不為空時,讀取訊息不會出現阻塞,當 channel 為空時,讀取操作會造成阻塞,直到有 goroutine 向 channel 中寫入訊息。

ch := make(chan int, 3)

// blocked, read from empty buffered channel
<- ch
ch := make(chan int, 3)
ch <- 1
ch <- 2
ch <- 3

// blocked, send to full buffered channel
ch <- 4

通過 len 函式可以獲得 chan 中的元素個數,通過 cap 函式可以得到 channel 的快取長度。

channel 的用法

1.goroutine 通訊

看一個 effective go 中的例子:

c := make(chan int)  // Allocate a channel.

// Start the sort in a goroutine; when it completes, signal on the channel.
go func() {
    list.Sort()
    c <- 1  // Send a signal; value does not matter.
}()

doSomethingForAWhile()
<-c
主 goroutine 會阻塞,直到執行 sort 的 goroutine 完成。

2.range 遍歷

channel 也可以使用 range 取值,並且會一直從 channel 中讀取資料,直到有 goroutine 對改 channel 執行 close 操作,迴圈才會結束。

// consumer worker
ch := make(chan int, 10)
for x := range ch{
    fmt.Println(x)
}
等價於

for {
    x, ok := <- ch
    if !ok {
        break
    }
    
    fmt.Println(x)
}

3.配合 select 使用

select 用法類似與 IO 多路複用,可以同時監聽多個 channel 的訊息狀態,看下面的例子

select {
    case <- ch1:
    ...
    case <- ch2:
    ...
    case ch3 <- 10;
    ...
    default:
    ...
}

select 可以同時監聽多個 channel 的寫入或讀取
執行 select 時,若只有一個 case 通過(不阻塞),則執行這個 case 塊
若有多個 case 通過,則隨機挑選一個 case 執行
若所有 case 均阻塞,且定義了 default 模組,則執行 default 模組。若未定義 default 模組,則 select 語句阻塞,直到有 case 被喚醒。
使用 break 會跳出 select 塊。

4. 設定超時時間

ch := make(chan struct{})

// finish task while send msg to ch
go doTask(ch)

timeout := time.After(5 * time.Second)
select {
    case <- ch:
        fmt.Println("task finished.")
    case <- timeout:
        fmt.Println("task timeout.")
}

5. quite channel

有一些場景中,一些 worker goroutine 需要一直迴圈處理資訊,直到收到 quit 訊號

msgCh := make(chan struct{})
quitCh := make(chan struct{})
for {
    select {
    case <- msgCh:
        doWork()
    case <- quitCh:
        finish()
        return
}

6.單向 channel

即只可寫入或只可讀的channel,事實上 channel 只讀或只寫都沒有意義,所謂的單向 channel 其實知識宣告時用,比如

func foo(ch chan<- int) <-chan int {...}

chan<- int 表示一個只可寫入的 channel,<-chan int 表示一個只可讀取的 channel。
上面這個函式約定了 foo 內只能從向 ch 中寫入資料,返回只一個只能讀取的 channel,雖然使用普通的 channel 也沒有問題,但這樣在方法宣告時約定可以防止 channel 被濫用,這種預防機制發生再編譯期間。