Go入門系列(十四) go併發程式設計之Goroutine與channel(上)
- Go入門系列(一) 初識Go語言
- Go入門系列(二) 變數、指標、資料型別簡介和作用域
- Go入門系列(三) 基礎型別——整型、浮點型、布林型別和字串
- Go入門系列(四) 陣列Array和切片Slice
- Go入門系列(五) 雜湊表map
- Go入門系列(六) 結構體struct
- Go入門系列(七) json、時間操作和template模板
- Go入門系列(八) 函式(上)
- Go入門系列(九) 函式 - defer機制(下)
- Go入門系列(十) go中的面向物件程式設計——方法
- Go入門系列(十一) 介面——介面的定義、作用、具體型別和具體值(上)
- Go入門系列(十二) 介面——介面的應用和例子(中)
- Go入門系列(十三) 介面——型別斷言(下)
- Go入門系列(十四) go併發程式設計之Goroutine與channel(上)
- Go入門系列(十五) go併發程式設計之Goroutine與channel(中)
- Go入門系列(十六) go併發程式設計之Goroutine與channel(下)
- Go入門系列(十七) go併發之基於共享變數的併發
- Go入門系列(十八) 反射、包和測試工具
- Go入門系列(十九)io程式設計和檔案操作
現在終於講到了Go中最核心也最重要的部分:Goroutine併發程式設計。
Go的併發有兩種主要方式:基於CSP模型的併發(通過channel實現) 和 基於共享變數的併發。Go的主要併發方式是前者,當然後者也是不可或缺的處理併發的傳統同步機制。本章重點介紹CSP模型的併發。
Goroutine
在Go語言中,每一個併發的執行單元叫作一個goroutine(go中的協程)。我們可以把一個goroutine理解為是一個執行緒(但是他和執行緒是不同的)。一個goroutine通常作用於一個函式,我們可以通過在函式呼叫前加上go關鍵字的方式將一個函式放在一個新建立的goroutine協程上執行,這樣的話這個函式無需等待且會相對於與其他函式併發的執行。
主函式main是在一個單獨的goroutine上執行的,我們叫它main goroutine。
例如:
func main(){ go demo() // 會建立一個goroutine協程,並且讓demo函式在這個新的協程上執行。該行程式碼會立刻返回,不會阻塞。 fmt.Println("主協程執行") } func demo() { fmt.Println("子協程執行") time.Sleep(1*time.Second) fmt.Println("子協程執行結束") }
和多執行緒程式設計相似。當go demo()的時候,系統會開啟一個goroutine協程,使用了go關鍵字的語句會立刻返回,不會阻塞無需等待,之後主協程maingoroutine和在main()中建立的子協程會同時併發執行。但是當main主協程與執行結束時,所有的子協程都會終止並結束。因此上面的列印結果為:
主協程執行
子協程執行
最後一句 fmt.Println("子協程執行結束") 沒來得及執行就已經終止。
除了從主函式退出或者直接終止程式之外,沒有其它的程式設計方法能夠讓一個goroutine來打斷另一個的執行。除非是通過協程與協程之間的通訊,讓一個協程通知另一個協程終止。
==============================================================
網路程式設計是併發大顯身手的一個領域,由於伺服器是最典型的需要同時處理很多連線的程式。
下面通過兩個網路程式設計例子來演示goroutine的更多使用。
第一個例子是一個clock伺服器,客戶端訪問這個服務的時候會每隔一秒列印一次時間。
// 服務端程式碼:
func main(){
// 建立一個tcp服務,這1句就包含了建立套接字,監聽埠等操作,很方便
server, err := net.Listen("tcp", "127.0.0.1:8000")
if err != nil {
log.Fatal(err) // 建立服務失敗則終止程式
}
fmt.Println("Clock服務開啟")
for{ // 死迴圈
c, err := server.Accept() // 接收客戶端連線,這是一個阻塞的接收方法。c是接收得到的客戶端連線
if err != nil { // 有客戶端發過來建立連線的請求了,但是服務端接收連線失敗
log.Print(err)
}
// 為客戶端提供相應的服務
go handleConn(c) // 使用go的話就意味著由主協程負責接收連線,由子協程負責處理每個連線的請求。而且每連進來一個連線都要建立一個goroutine協程來為其提供服務(一個連線對應一個協程)。
}
}
// 處理連線,為客戶端提供服務
func handleConn(c net.Conn) (err error) { // c是客戶端連線(客戶端的socket套接字)
defer func(){
c.Close()
fmt.Println("客戶端關閉連線")
}()
for {
// 獲取當前的時間
now := time.Now().Format("2006-01-02 15:04:05")
_, err = io.WriteString(c, now + "\n") // 向客戶端傳送 now
if err != nil { // 如果傳送失敗,說明客戶端已經關閉連線(但是服務端連線還沒關閉)
return err // 此時會執行上面 defer 中的關閉服務端連線的操作。
}
time.Sleep(1*time.Second)
}
}
// 客戶端程式碼
func main(){
// 連線服務端
conn, err := net.Dial("tcp", "127.0.0.1:8000")
if err != nil{
log.Fatal(err)
}
defer func(){
conn.Close()
fmt.Println("服務端停止服務或客戶端關閉連線")
}()
// 接收來自服務端的響應資訊(由於這是一個時鐘服務,所以客戶端無需向服務端傳送訊息,服務端也無需接收客戶端的訊息。只需客戶端接收服務端訊息即可)
for {
_, err := io.Copy(os.Stdout, conn) // io.Copy方法是一個阻塞方法,有2個引數:第一參是可寫物件,第二參是可讀物件,Copy的作用是將一個讀事件就緒的可讀物件的內容拷貝到一個可寫物件中。這裡是將服務端傳送到客戶端緩衝區的資料拷貝到標準輸出
if err != nil{ // 可能是客戶端主動斷開連線
break
}
}
}
這裡需要注意的是:在使用io.Copy的時候,我們不用寫for死迴圈來接收服務端不斷傳輸過來的資料。因為io.Copy本身內部實現了for死迴圈,會自己不斷的接收訊息。所以上面客戶端的for是多餘的。
第二個例子是一個Encho服務,這個服務也很簡單,客戶端可以往服務端傳送一個訊息,服務端需要回復這個訊息3遍,客戶端接收回復並列印。
這裡我複用了上面第一個例子的程式碼框架,略微作出修改使得 handleConn 能夠接收一個函式引用來決定它提供clock服務還是encho服務。
type serviceFunc func (c net.Conn, args ...interface{})(err error)
func main(){
// 建立一個tcp服務,這1句就包含了建立套接字,監聽埠等操作,很方便
server, err := net.Listen("tcp", "127.0.0.1:8000")
if err != nil {
log.Fatal(err) // 建立服務失敗則終止程式
}
fmt.Println("服務開啟")
for{ // 死迴圈
c, err := server.Accept() // 接收客戶端連線,這是一個阻塞的接收方法。c是接收得到的客戶端連線
if err != nil { // 有客戶端發過來建立連線的請求了,但是服務端接收連線失敗
log.Print(err)
}
// 為客戶端提供相應的服務
go handleConn(c, encho) // 使用go的話就意味著由主協程負責接收連線,由子協程負責處理每個連線的請求。而且每連進來一個連線都要建立一個goroutine協程來為其提供服務(一個連線對應一個協程)。
}
}
// 處理連線,為客戶端提供服務
func handleConn(c net.Conn, fn serviceFunc) (err error) { // c是客戶端連線(客戶端的socket套接字)
defer func(){
c.Close()
fmt.Println("客戶端關閉連線")
}()
err = fn(c) // 呼叫服務函式; fn的返回值只會為error型別。
return err
}
func clock(c net.Conn, args ...interface{}) (err error){
for {
// 獲取當前的時間
now := time.Now().Format("2006-01-02 15:04:05")
_, err = io.WriteString(c, now+"\n") // 向客戶端傳送 now
if err != nil { // 如果傳送失敗,說明客戶端已經關閉連線(但是服務端連線還沒關閉)
return err // 此時會執行上面 defer 中的關閉服務端連線的操作。
}
time.Sleep(1 * time.Second)
}
}
func encho(c net.Conn, args ...interface{}) (err error){
scanner := bufio.NewScanner(c) // NewScanner需要傳入一個可讀物件(io.Reader),返回一個Scanner物件,這是一個帶io緩衝區的掃描器物件,該物件提供從緩衝區讀取資料的方法如Scan()
for scanner.Scan(){ // 不斷從緩衝區讀取資料(接收客戶端訊息),這是個阻塞的方法。
_encho(c, scanner)
}
return err
}
func _encho(c net.Conn, scanner *bufio.Scanner) {
cont := scanner.Text() // 單純的從scanner物件的token變數中獲取資料,讀資料的過程(等待的過程)其實是發生在Scan。Text()返回的是一個String
fmt.Fprintln(c, "\t" + strings.ToUpper(cont)) // 向c(客戶端連線)傳送一個訊息
time.Sleep(1 * time.Second) // 每次回覆隔一秒
fmt.Fprintln(c, "\t" + cont)
time.Sleep(1 * time.Second)
fmt.Fprintln(c, "\t" + strings.ToLower(cont))
}
這個例子中,我聲明瞭一個底層型別為函式 serviceFunc 型別,所有的服務函式(如encho和clock)都要遵循這個型別的規範,需要傳入一個客戶端連線c 和可以接受任意型別任意多個引數的其他引數。
客戶端程式碼如下
package main
import (
"io"
"log"
"net"
"os"
)
// 該方法是對 io.Copy 的簡單封裝,這樣的話這個方法既能用於從服務端接收訊息並輸出到客戶端,又能用於把客戶端的訊息傳送給服務端
// r是訊息傳送端,w是訊息接收端,mustCopy的意思是從一個可讀物件r讀取資料,再發送到可寫物件w中
// 該方法 內部會阻塞,會無限迴圈(io.Copy內部實現了死迴圈)接收Reader的訊息
func mustCopy(w io.Writer, r io.Reader){
if _, err := io.Copy(w, r); err != nil {
log.Fatal(err)
}
}
func main() {
// 連線服務端
conn, err := net.Dial("tcp", "127.0.0.1:8000")
if err != nil{
log.Fatal()
}
defer conn.Close()
// 開一個子協程用於接收服務端的訊息
go mustCopy(os.Stdout, conn) // 將服務端發過來的訊息傳送到客戶端的標準輸出(螢幕上)
// 主協程則向服務端傳送訊息
mustCopy(conn, os.Stdin) // 將客戶端標準輸入拷貝(傳送)到服務端
}
這裡要注意,客戶端程式碼開了兩個goroutine,一個用來向伺服器傳送訊息(maingoroutine中發生),一個用來接收服務端訊息並列印到螢幕(子協程中)。這樣做的好處是:如果接收訊息和傳送訊息都放在一個goroutine中,那麼任何其中一個操作都可能阻塞另一個操作,造成類似於“服務端返回上一次訊息的響應之前客戶端無法傳送下一條訊息”的情況。
上面的程式還有一個小缺陷:雖然伺服器還是對一個連線建立一個goroutine進行服務避免了多使用者連線造成的服務端阻塞問題。但是某個客戶端如果連續發出2條訊息給服務端的話,那麼在這個 goroutine內服務端必須回覆完第1條訊息(花3秒)才能開始回覆第二條訊息,因此會發生這樣的情況:
hello # 客戶端傳送
HELLO # 服務端回覆
Hi # 客戶端傳送
hello
hello
HI
hi
hi
服務端必須回覆完所有的Hello才能回覆hi。
但我希望的是服務端回覆Hello的時候也能抽出空回覆hi。
hello
HELLO
hi
HI
hello
hi
hello
hi
其實很簡單,我們只需要讓服務端針對客戶端每傳送過來的1條訊息建立一個goroutine來處理即可。這意味著多個_encho()之間可以併發執行。
只需對程式作出微小的改動(紅色部分):
func encho(c net.Conn, args ...interface{}) (err error){
scanner := bufio.NewScanner(c)
for scanner.Scan(){
go _encho(c, scanner)
}
return err
}
再做一個小改進,讓客戶端輸出end之後服務端斷開連線:
func encho(c net.Conn, args ...interface{}) (err error){
scanner := bufio.NewScanner(c) // NewScanner需要傳入一個可讀物件(io.Reader),返回一個Scanner物件,這是一個帶io緩衝區的掃描器物件,該物件提供從緩衝區讀取資料的方法如Scan()
for scanner.Scan(){ // 不斷從緩衝區讀取資料,這是個阻塞的方法。返回一個false
cont := scanner.Text()
if cont == "end"{ // 如果客戶端輸入回車表示停止服務
break
}
go _encho(c, cont)
}
return err
}
最後還要說一點:
如果一個程式有多個goroutine在執行,其中任何一個goroutine發生panic異常都會終止所有的goroutine,然後程式結束。
Channels
Goroutine是go中最常見的併發執行體,而Channel是多個goroutine之間最常見的通訊機制。它類似於一個通道,用來傳輸資料,而且傳輸的形式是一個物件的形式而不是流的形式。
Channel有具體的型別。用來傳輸字串的就是字串型別的channel( chanstring),用來傳輸整型的就是整型的channel(chan int)。
1.如何建立一個channel
通過make()
ch := make(chan int) // ch has type 'chan int'
ch = make(chan int) // 建立無快取的channel
ch = make(chan int, 0) // 建立無快取的channel
ch = make(chan int, 3) // 建立一個容量為3的帶快取channel
channel和切片、雜湊表一樣是一種引用型別,它對應著底層的一種資料結構。
當我們把channel賦給一個變數的時候或者作為引數傳給函式時,是僅僅複製多一份這個引用,而不會複製底層型別。Channel的零值是nil。
兩個相同型別的channel可以使用==運算子比較。如果兩個channel引用的是相同的物件,那麼比較的結果為真。一個channel也可以和nil進行比較。
一個channel有傳送和接受兩個主要操作,都是通訊行為。例如:
ch <- x // 傳送x值(相當於put()或者append()操作)
x = <-ch // 從通道接受一個值並賦給x(相當於get()或者acquire()操作)
<-ch // 這樣也可以,相當於彈出一個值但不賦給任何變數
2.關閉一個channel
當不再需要從一個channel接收資料,也不再需要從一個channel傳送資料的時候,就需要關閉這個channel。雖然如果不關閉,go的垃圾回收機制也會回收不再使用的channel,但是我們還是要遵循建立關閉原則手動的關閉一下,否則可能會發送一些不可預料的問題。
對一個未關閉的channel傳送訊息:
A如果goroutine1對一個不帶快取的channel發訊息則會阻塞,直到有另一個goroutine2接收訊息,goroutine1才會被喚醒;
B如果goroutine1對一個帶快取的channel(channel未滿)發訊息,則不會阻塞,訊息會被暫存在channel內部的一個佇列中;如果滿了就阻塞。
對一個已經關閉的channel傳送訊息會導致panic異常
對一個未關閉的channel接收訊息:
A如果goroutine1從一個不帶快取的channel接收訊息則會阻塞,直到有另一個goroutine2向這個channel發訊息,goroutine1才會被喚醒然後接收到訊息。
B如果goroutine1從一個帶快取的channel(channel有訊息的話)接收訊息,則不會阻塞;如果channel為空就會阻塞。
對一個已經關閉的channel接收訊息(對帶不帶快取的channel都相同):
A如果channel中還有訊息,則可以不阻塞的接收到這些訊息;
B如果channel中沒有訊息,則會不阻塞的不停的接收到相應型別的零值,不會引發panic;
使用內建的close函式就可以關閉一個channel:
close(ch)
需要注意的是:
channel的傳送操作會複製一份傳送的資料,在資料複製完之前,接收方的接收操作會被阻塞直到複製完成才開始接收。所以,往channel中傳送的資料最好是不要太大,如果要傳送一個結構體則儘量傳送其指標。這樣既可以減少拷貝的資料量,又節省了接收方等待發送方拷貝的時間,提高channel的傳輸效能(不過這樣的話,接收方修改這個指標也會影響到傳送方的資料)。
對於一個無快取的channel,由於不存在快取,所以傳送方傳送資料的時候,資料只會被複制1次,直接複製給接收方;
對於一個有快取的channel,如果接收方執行接收操作的時候,這個channel是空的,那麼當傳送方傳送資料的時候,資料會繞過channel的緩衝佇列,直接傳送給接收方,因此這種情況下就只會複製1次; 如果接收方接收到的資料是從緩衝佇列得到的(即接收方接收的時候channel中有資料的情況下),則資料總共會被複制2次,一次是傳送方將資料寫入channel的緩衝佇列的拷貝,一次是接收方接收資料時從緩衝佇列到接收方的拷貝。也就是說,當消費者的接收操作快於傳送者的傳送操作時,就只會複製1次,反之則會複製2次。
下面將分別介紹帶快取和不帶快取的channel
關於無快取或帶快取channels之間的選擇,首先兩者都能夠進行資料在goroutine之間的傳輸以達到通訊的作用。而不同點在於(適用場景的不同點):無快取channel更強地保證了每個傳送操作與相應的同步接收操作(更強調協程之間的同步和安排多個goroutine的有序執行),這是基於其無快取而帶來阻塞的特性,通過阻塞可以暫停協程的工作,讓另一個協程完成了任務之後才開始或繼續本協程的工作;帶快取channel更適用於相互解耦操作的goroutine,意味著一個函式中有多個不同型別卻相互依賴的小任務時,我們可以將多個不同型別的小任務解耦到多個函式中,用goroutine併發的跑這些任務,而此時相互依賴的任務之間可以通過帶快取channel來傳輸訊息,最典型的應用就是生產者消費者模型的任務。
下面讓我們看看帶快取和不帶快取的特性。
不帶快取的Channel
一個基於無快取Channels的傳送操作將導致傳送者goroutine阻塞,直到另一個goroutine在相同的Channels上執行接收操作,當傳送的值通過Channels成功傳輸之後,兩個goroutine可以繼續執行後面的語句。反之,如果接收操作先發生,那麼接收者goroutine也將阻塞,直到有另一個goroutine在相同的Channels上執行傳送操作。
如果傳送方通過一個無快取channel傳送資料時被阻塞,接收方會先接收資料(接收方執行<-channel的時候才開始拷貝資料),接收完了之後才去喚醒傳送方而不是先喚醒後接收。
如果接收方接收一個無快取channel被阻塞,傳送方會先發送資料(傳送方執行channel<-時才開始拷貝資料),傳送完了才去喚醒接收方而不是先喚醒後傳送。
也就是說喚醒動作都是在拷貝完成之後發生的。
下面作者給出了一個例子來介紹不帶快取的channel的用法:
下面有3個協程,counter用於不停的生成數字,squarer負責對counter生成的數字計算其平方,printer負責列印squarer的計算結果。3個協程使用2個通道naturals和squares進行通訊。
func main(){
naturals := make(chan int)
squares := make(chan int)
// counter
go func (){
for i:=0;;i++ {
time.Sleep(3 * time.Second / 10) // 限一下迴圈的速度
naturals<-i
}
}()
// squarer
go func (){
for { // 不斷接收counter傳送的i
i := <-naturals
squares <- i * i
}
}()
// printer
go func (){
for{ // 不斷接收squarer的結果
fmt.Println(<-squares)
}
}()
}
這個程式有一個致命的缺點:由於用go宣告執行的函式是非阻塞的,因此main goroutine會立刻結束,連帶著3個子goroutine也馬上結束,根本沒來得及執行。
在python的多執行緒併發程式設計中,主執行緒一般會呼叫子執行緒的join()方法等待所有子執行緒執行完之後才結束。但是在go中貌似沒有這種類似的等待方法。不過我們可以通過channel來阻塞。
我只需要在main的開始多宣告一個block的channel,這個channel的作用只有1個,就是一直阻塞main goroutine,讓主協程永遠都不會結束。
func main(){
naturals := make(chan int)
squares := make(chan int)
block := make(chan struct{})
// counter
go func ()go func (){//...}()
// squarer
go func ()go func (){//...}()
// printer
go func (){//...}()
// 往block傳送一個值,由於沒有任何協程接收這個值,因此這裡會永久阻塞
block<- struct{}{}
}
除此之外我們還可以這樣做,將printer的工作直接放在主協程中做,而無需建立一個子協程來列印平方值,這樣<-squares就會阻塞主協程而無需用block這個channel來阻塞
func main(){
naturals := make(chan int)
squares := make(chan int)
// counter
go func ()go func (){//...}()
// squarer
go func ()go func (){//...}()
// printer
for{ // 不斷接收squarer的結果
fmt.Println(<-squares)
}
}
counter向naturals這個通道發出訊息後squarer從naturals接收訊息前的這段時間(雖然這段時間很短很短,短的可以忽略),counter是被阻塞的,squarer和printer之間同理。也就是說,雖然使用了3個子goroutine,但是整個程式還是一個序列的程式,因為這個例子中一個協程的下一步操作的開始要依賴於另一個協程的某個操作完成。
現在我們要考慮更加多的細節問題,如果我希望計算有限多個(比如100個)i的值而不是無限個,此時我們要考慮資料傳輸完之後channel要怎麼被處理。
// counter
go func (){
for i:=0;i <= 100;i++ { // 讓生產者只生產100個i
time.Sleep(3 * time.Second / 10) // 限一下速度
naturals<-i
}
}()
當i全部被髮送出去之後,counter跳出迴圈結束goroutine,而printer和squarer這2個協程由於接收不到訊息而被阻塞,而本身maingoroutine也是被block這個channel阻塞的。因此,程式中所有的goroutine都被阻塞住了,而且是永遠的被阻塞住了,永遠都不會被喚醒。
這種情況被稱之為 goroutine死鎖,go是不會允許這種情況出現的,因此會報一個錯誤:
fatal error: all goroutines are asleep - deadlock!
(如果所有gouroutine都被阻塞了,但是之後可能被喚醒的話,就不會報goroutine死鎖的錯誤,例如,所有的goroutine由於都呼叫了sleep暫時都被阻塞住)。
但是其實像python等語言中,發生所有執行緒或協程被阻塞也是常有的事,這些語言在這種情況下並不會報錯。
那怎麼解決這個問題?我們只需要在counter傳送完所有的i時給printer傳送一個訊號,讓printer不要再從naturals中取資訊即可。最直接的方式就是關閉naturals,squarer就會在讀取naturals時讀到一堆int的零值。
當一個channel被關閉後,再向該channel傳送資料將導致panic異常
當一個channel被關閉後,如果channel中的資料還沒被接收完(帶快取的channel)依然可以從該channel接收資料。如果channel沒有資料了,再從裡面接收資料可以不阻塞的接收到一個零值。
但是當goroutine從channel中讀到一個零值的話,它無法確定這個channel是關閉了還是傳送者確實傳送了一個對應型別的零值給它。此時我們可以用第二個結果接收,他是一個bool。ture表示成功從channels接收到值,false表示channels已經被關閉並且裡面沒有值可接收。(如 i, ok := <-naturals)
我們甚至可以使用forrange來接收,當channel被關閉並且沒有值可接收時會自動跳出迴圈。(如果channel沒有被關閉,但是channel也沒有值可以接收時,for range不會結束,而是會阻塞)。
忘記說很重要的一點:
channel的關閉一般都是在傳送訊息的goroutine,而不是在接收訊息的goroutine。原因很簡單,如果在接收訊息的goroutine關閉channel,傳送訊息的goroutine繼續往channel放資料會引發panic從而結束整個程序。
改進後的程式碼如下:
package main
import (
"fmt"
"time"
)
func main(){
naturals := make(chan int)
squares := make(chan int)
block := make(chan struct{})
// counter
go func (){
for i:=0;i <= 10;i++ {
time.Sleep(3 * time.Second / 10) // 限一下速度
naturals<-i
}
close(naturals)
}()
// squarer
go func (){
for i := range naturals{ // 不斷接收counter傳送的i
squares <- i * i
}
close(squares)
}()
// printer
go func (){
for res := range squares{ // 不斷接收squarer的結果
fmt.Println(res)
}
block <- struct{}{}
}()
<-block
}
不管一個channel是否被關閉,當它沒有被引用時將會被Go語言的垃圾自動回收器回收。(不要將關閉一個開啟檔案的操作和關閉一個channel操作相提並論。對於每個開啟的檔案,都需要在不使用的時候呼叫對應的Close方法來關閉檔案。)
試圖重複關閉一個channel將導致panic異常,試圖關閉一個nil值的channel也將導致panic異常。關閉一個channels還會觸發一個廣播機制,這個我們在後面再說.
帶快取的channel
帶快取的channel內部持有一個佇列
向快取Channel的傳送操作就是向內部快取佇列的尾部插入元素,接收操作則是從佇列的頭部彈出元素。如果內部快取佇列是滿的,那麼傳送操作將阻塞直到因另一個goroutine執行接收操作而釋放了新的佇列空間。反之亦然。
可以使用內建的cap和len函式獲取一個帶快取的channel的容量和長度。
下面我們看一個比較典型的例子:
這個程式會向3個映象站點發出請求,三個映象站點分散在不同的地理位置。這個程式的作用是返回響應最快的映象的響應給使用者。
func main() {
response := make(chan string, 3)
// 同時向3個站點發起請求,request是一個阻塞的方法,需要等待服務的響應
go func () { response <- request("mirror1")}()
go func () { response <- request("mirror2")}()
go func () { response <- request("mirror3")}()
// main goroutine則接受最快的響應,其他響應不接收
fmt.Println(<-response)
}
func request(mirror string) string{
if mirror == "mirror1"{
time.Sleep(5 * time.Second / 10) // 通過sleep模擬請求和響應在網路中傳輸的時間和伺服器處理請求的時間
}else if mirror == "mirror2"{
time.Sleep(3 * time.Second / 10)
}else if mirror == "mirror3"{
time.Sleep(7 * time.Second / 10)
}
return "response from " + mirror
}
注意,如果上面換成使用無快取的channel,那麼兩個慢的goroutines將會因為沒有人接收而被永遠卡住。這種情況,稱為goroutines洩漏,這將是一個BUG。和垃圾變數不同,洩漏的goroutines並不會被自動回收(因為這兩個goroutine永遠都不會執行結束,所以永遠都不會被回收,除非main goroutine結束,整個程式結束)。因此確保每個不再需要的goroutine能正常退出是重要的(也就是說不再使用的goroutine要確保不被channel給卡死而無法退出)。
Channel的一種錯誤用法是在一個goroutine中對同一個channel既傳送又接收(除非是特殊場景,例如使用channel作為訊號量限制最大的併發goroutine數量)。這麼一來channel的通訊作用就失去意義,單純的變為一個切片,而且還有發生死鎖的風險。
只發送的channel和只接收的channel
作者在這一節取的標題是“單方向的channel”,但我個人覺得這樣是有歧義的,因為單方向是指從一個通道的首端入尾端出,而不能從尾端入首端出。而channel本身確實遵循這個原則,也就是說channel本身就是一個單向的通道而不是雙向的。
作者所說的“單向的channel”其實是想表達只能傳送的channel或只能接收的channel,但是這應該叫做“半開閉的channel”,即通道的一端是開放的一端是封閉的,因此我這裡把標題做了修改以避免歧義。
在很多情況下,一個goroutine只負責對一個channel傳送或只接收,而不會既對一個同一個channel又傳送又接收。
因此我們可以通過宣告的時候限制一個channel是隻傳送或只接收的。
型別chan<- int表示一個只發送int的channel,只能傳送不能接收。相反,型別<-chan int表示一個只接收int的channel,只能接收不能傳送。這種限制將在編譯期檢測。
對一個只接收的channel呼叫close將是一個編譯錯誤(因為從邏輯上說,一個只接收資訊的goroutine無法知道channel什麼時候會發送完畢訊息,這是由傳送端的goroutine決定的,所以接收者goroutine和只接收的channel應該無權呼叫close)。
現在,我們對上面的counter/squarer/printer例子進行最後的改進,將channel在特定的goroutine中變為半開閉的channel:
func main(){
naturals := make(chan int)
squares := make(chan int)
// 已知資料都是從channel的右邊進左邊出(單向)
go counter(naturals) // naturals在counter是右開左閉的
go squarer(squares, naturals) // naturals在squarer中是左開右閉,squares在squarer是右開左閉
printer(squares) // printer直接在main goroutine中跑,而不另開協程
}
// counter
func counter(in chan<-int){
for i:=0;i <= 10;i++ {
time.Sleep(3 * time.Second / 10) // 限一下速度
in<-i
}
close(in)
}
// squarer
func squarer(in chan<-int, out <-chan int){
for i := range out{ // 不斷接收counter傳送的i
in <- i * i
}
close(in)
}
// printer
func printer(in <-chan int){
for res := range in{ // 不斷接收squarer的結果
fmt.Println(res)
}
}
這裡涉及到 channel型別的轉換。我們在main中宣告naturals的時候是宣告的一個右開右閉的雙開口channel。但是當將它傳給counter的時候,函式會做一次 in:= naturals的隱式賦值,而in又是 chan<-int 型別的單開口channel,所以在counter函式中naturals的型別將隱式地從chan int轉換成chan<- int。
雙開口channel向單開口channel變數的賦值操作都將導致該隱式轉換。雙開口的channel可以轉為單開口,但是反之單開口的channel不能轉雙開口的。
本文轉載自: 張柏沛IT技術部落格 > Go入門系列(十四) go併發程式設計之Goroutine與channel(上)