Golang 入門 : channel(通道)
筆者在《Golang 入門 : 競爭條件》一文中介紹了 Golang 併發程式設計中需要面對的競爭條件。本文我們就介紹如何使用 Golang 提供的 channel(通道) 消除競爭條件。
Channel 是 Golang 在語言級別提供的 goroutine 之間的通訊方式,可以使用 channel 在兩個或多個 goroutine 之間傳遞訊息。Channel 是程序內的通訊方式,因此通過 channel 傳遞物件的過程和呼叫函式時的引數傳遞行為比較一致,比如也可以傳遞指標等。使用通道傳送和接收所需的共享資源,可以在 goroutine 之間消除競爭條件。
當一個資源需要在 goroutine 之間共享時,channel 在 goroutine 之間架起了一個管道,並提供了確保同步交換資料的機制。Channel 是型別相關的,也就是說,一個 channel 只能傳遞一種型別的值,這個型別需要在宣告 channel 時指定。可以通過 channel 共享內建型別、命名型別、結構型別和引用型別的值或者指標。
基本語法
宣告 channel 的語法格式為:
var ChannelName chan ElementType
與一般變數宣告的不同之處僅僅是在型別前面添加了一個 chan 關鍵字。ElementType 則指明這個 channel 能夠傳遞的資料的型別。比如宣告一個傳遞 int 型別的 channel:
var ch chan int
或者是宣告一個 map,其元素是 bool 型的 channel:
var m map[string] chan bool
在 Golang 中需要使用內建的 make 函式類建立 channel 的例項:
ch := make(chan int)
這樣就宣告並初始化了一個名為 ch 的 int 型 channel。使用 channel 傳送和接收資料的語法也很直觀,比如下面的程式碼把資料傳送到 channel 中:
ch <- value
向 channel 中寫入資料通常會導致程式阻塞,直到有其它 goroutine 從這個 channel 中讀取資料。下面的程式碼把資料從 channel 讀取到變數中:
value := <-ch
注意,如果 channel 中沒有資料,那麼從 channel 中讀取資料也會導致程式阻塞,直到 channel 中被寫入資料為止。
根據 channel 是否有緩衝區可以簡單地把 channel 分為無緩衝區的 channel 和帶緩衝區的 channel,在本文接下來的篇幅中會詳細的介紹這兩類 channel 的用法。
select
Linux 系統中的 select 函式用來監控一系列的檔案控制代碼,一旦其中一個檔案控制代碼發生了 I/O 動作,select 函式就會返回。該函式主要被用來實現高併發的 socket 伺服器程式。Golang 中的 select 關鍵字和 linux 中的 select 函式功能有點相似,它主要用於處理非同步 I/O 問題。
select 的語法與 switch 的語法非常相似,由 select 開始一個新的選擇塊,每個選擇條件有 case 語句來描述。與 switch 語句可以選擇任何可使用相等比較的條件相比,select 有比較多的限制,其中最大的一條限制就是每個 case 語句裡必須是一個 I/O 操作。其大致的結構如下:
select { case <-chan1: // 如果 chan1 成功讀取到資料,則執行該 case 語句 case chan2 <- 1: // 如果成功向 chan2 寫入資料,則執行該 case 語句 default: // 如果上面的條件都沒有成功,則執行 default 流程 }
可以看出,select 不像 switch,後面並沒有條件判斷,而是直接去檢視 case 語句。每個 case 語句都必須是一個面向 channel 的操作。比如上面的例子中,第一個 case 試圖從 chan1 讀取一個數據並直接忽略讀取到的資料,而第二個 case 則試圖向 chan2 中寫入一個整數 1,如果這兩者都沒有成功,則執行 default 語句。
無緩衝的 channel
無緩衝的 channel(unbuffered channel) 是指在接收前沒有能力儲存任何值的 channel。這種型別的 channel 要求傳送 goroutine 和接收 goroutine 同時準備好,才能完成傳送和接收操作。如果兩個 goroutine 沒有同時準備好,channel 會導致先執行傳送或接收操作的 goroutine 阻塞等待。這種對通道進行傳送和接收的互動行為本身就是同步的。其中任意一個操作都無法離開另一個操作單獨存在。我們可以通過下面的圖示形象地理解兩個 goroutine 如何利用無緩衝的 channel 來共享一個值(下圖來自網際網路):
下面詳細地解釋一下上圖:
- 在第 1 步,兩個 goroutine 都到達通道,但兩個都沒有開始執行資料的傳送或接收。
- 在第 2 步,左側的 goroutine 將它的手伸進了通道,這模擬了向通道傳送資料的行為。這時,這個 goroutine 會在通道中被鎖住,直到交換完成。
- 在第 3 步,右側的 goroutine 將它的手放入通道,這模擬了從通道里接收資料。這個 goroutine一樣也會在通道中被鎖住,直到交換完成。
- 在第 4 步和第 5 步,進行資料交換。
- 在第 6 步,兩個 goroutine 都將它們的手從通道里拿出來,這模擬了被鎖住的 goroutine 得到釋放。兩個 goroutine 現在都可以去做別的事情了。
下面的例子模擬一場網球比賽。在網球比賽中,兩位選手會把球在兩個人之間來回傳遞。選手總是處在以下兩種狀態之一:要麼在等待接球,要麼將球打向對方。可以使用兩個goroutine來模擬網球比賽,並使用無緩衝的通道來模擬球的來回:
// 這個示例程式展示如何用無緩衝的通道來模擬 //2個goroutine間的網球比賽 package main import( "math/rand" "sync" "time" "fmt" ) // wg用來等待程式結束 var wg sync.WaitGroup func init() { rand.Seed(time.Now().UnixNano()) } // main是所有Go程式的入口 func main() { // 建立一個無緩衝的通道 court := make(chan int) // 計數加2,表示要等待兩個goroutine wg.Add(2) // 啟動兩個選手 go player("Nick", court) go player("Jack", court) // 發球 court <- 1 // 等待遊戲結束 wg.Wait() } // player 模擬一個選手在打網球 func player(name string, court chan int) { // 在函式退出時呼叫Done來通知main函式工作已經完成 defer wg.Done() for{ // 等待球被擊打過來 ball, ok := <-court if !ok { // 如果通道被關閉,我們就贏了 fmt.Printf("Player %s Won\n", name) return } // 選隨機數,然後用這個數來判斷我們是否丟球 n := rand.Intn(100) if n%5 == 0 { fmt.Printf("Player %s Missed\n", name) // 關閉通道,表示我們輸了 close(court) return } // 顯示擊球數,並將擊球數加1 fmt.Printf("Player %s Hit %d\n", name, ball) ball++ // 將球打向對手 court <- ball } }
執行上面的程式碼,會輸出類似下面的資訊:
Player Jack Hit 1 Player Nick Hit 2 Player Jack Hit 3 Player Nick Hit 4 Player Jack Missed Player Nick Won
簡單解釋一下上面的程式碼:
在 main 函式中建立了一個 int 型別的無緩衝的通道,使用該通道讓兩個 goroutine 在擊球時能夠互相同步。然後建立了參與比賽的兩個 goroutine。在這個時候,兩個 goroutine 都阻塞住等待擊球。court <- 1 模擬發球,將球發到通道里,程式開始執行這個比賽,直到某個 goroutine 輸掉比賽。
在 player 函式裡,主要是執行一個無限迴圈的 for 語句。在這個迴圈裡,是玩遊戲的過程。goroutine 從通道接收資料,用來表示等待接球。這個接收動作會鎖住 goroutine,直到有資料傳送到通道里。通道的接收動作返回時,會檢測 ok 標誌是否為 false。如果這個值是 false,表示通道已經被關閉,遊戲結束。在這個模擬程式中,使用隨機數來決定 goroutine 是否擊中了球。如果擊中了球,就把 ball 的值遞增 1,並將 ball 作為球重新放入通道,傳送給另一位選手。在這個時刻,兩個 goroutine 都會被鎖住,直到交換完成。最終,引某個 goroutine 沒有打中球會把通道關閉。之後兩個 goroutine 都會返回,通過 defer 宣告的 Done 會被執行,程式終止。
帶緩衝的 channel
帶緩衝的 channel(buffered channel) 是一種在被接收前能儲存一個或者多個值的通道。這種型別的通道並不強制要求 goroutine 之間必須同時完成傳送和接收。通道會阻塞傳送和接收動作的條件也會不同。只有在通道中沒有要接收的值時,接收動作才會阻塞。只有在通道沒有可用緩衝區容納被髮送的值時,傳送動作才會阻塞。這導致有緩衝的通道和無緩衝的通道之間的一個很大的不同:無緩衝的通道保證進行傳送和接收的 goroutine 會在同一時間進行資料交換;有緩衝的通道沒有這種保證。可以通過下面的圖示形象地理解兩個 goroutine 分別向帶緩衝的通道里增加一個值和從帶緩衝的通道里移除一個值(下圖來自網際網路):
下面詳細地解釋一下上圖:
- 在第 1 步,右側的 goroutine 正在從通道接收一個值。
- 在第 2 步,右側的這個 goroutine 獨立完成了接收值的動作,而左側的 goroutine 正在傳送一個新值到通道里。
- 在第 3 步,左側的 goroutine 還在向通道傳送新值,而右側的 goroutine 正在從通道接收另外一個值。這個步驟裡的兩個操作既不是同步的,也不會互相阻塞。
- 最後,在第 4 步,所有的傳送和接收都完成,而通道里還有幾個值,也有一些空間可以存更多的值。
建立帶緩衝區的 channel 非常簡單,只需要再新增一個緩衝區的大小就可以了,比如建立一個傳遞 int 型別資料,緩衝區為 10 的 channel:
ch := make(chan int, 10)
下面的 demo 使用一組 goroutine 來接收並完成任務,帶緩衝區的通道提供了一種清晰而直觀的方式來實現這個功能:
// 這個示例程式展示如何使用 // 有緩衝的通道和固定數目的 // goroutine來處理一堆工作 package main import( "math/rand" "sync" "time" "fmt" ) const( numberGoroutines = 2 // 要使用的goroutine的數量 taskLoad = 5 // 要處理的工作的數量 ) // wg用來等待程式結束 var wg sync.WaitGroup func init() { rand.Seed(time.Now().UnixNano()) } // main是所有Go程式的入口 func main() { // 建立一個有緩衝的通道來管理工作 tasks := make(chan string, taskLoad) // 啟動goroutine來處理工作 wg.Add(numberGoroutines) for gr := 1; gr <= numberGoroutines; gr++ { go worker(tasks, gr) } // 增加一組要完成的工作 for post := 1; post <= taskLoad; post++ { tasks <- fmt.Sprintf("Task: %d", post) } // 當所有工作都處理完時關閉通道 // 以便所有goroutine退出 close(tasks) // 等待所有工作完成 wg.Wait() } // worker作為goroutine啟動來處理 // 從有緩衝的通道傳入的工作 func worker(tasks chan string, worker int) { // 通知函式已經返回 defer wg.Done() for{ // 等待分配工作 task, ok := <-tasks if !ok{ // 這意味著通道已經空了,並且已被關閉 fmt.Printf("Worker: %d: Shutting Down\n", worker) return } // 顯示我們開始工作了 fmt.Printf("Worker: %d: Started %s\n", worker, task) // 隨機等一段時間來模擬工作 sleep := rand.Int63n(100) time.Sleep(time.Duration(sleep)* time.Millisecond) // 顯示我們完成了工作 fmt.Printf("Worker: %d: Completed %s\n", worker, task) } }
執行上面的程式,輸出結果大致如下:
Worker: 2: Started Task: 1 Worker: 1: Started Task: 2 Worker: 1: Completed Task: 2 Worker: 1: Started Task: 3 Worker: 1: Completed Task: 3 Worker: 1: Started Task: 4 Worker: 2: Completed Task: 1 Worker: 2: Started Task: 5 Worker: 1: Completed Task: 4 Worker: 1: Shutting Down Worker: 2: Completed Task: 5 Worker: 2: Shutting Down
程式碼裡有很詳細的註釋,因此不再贅言,只解釋一下通道的關閉:
關閉通道的程式碼非常重要。當通道關閉後,goroutine 依舊可以從通道接收資料,但是不能再向通道里傳送資料。能夠從已經關閉的通道接收資料這一點非常重要,因為這允許通道關閉後依舊能取出其中緩衝的全部值,而不會有資料丟失。從一個已經關閉且沒有資料的通道里獲取資料,總會立刻返回,並返回一個通道型別的零值。如果在獲取通道時還加入了可選的標誌,就能得到通道的狀態資訊。
處理超時
使用 channel 時需要小心,比如對於下面的簡單用法:
i := <-ch
碰到永遠沒有往 ch 中寫入資料的情況,那麼這個讀取動作將永遠也無法從 ch 中讀取到資料,導致的結果就是整個 goroutine 永遠阻塞並且沒有挽回的機會。如果 channel 只是被同一個開發者使用,那樣出問題的可能性還低一些。但如果一旦對外公開,就必須考慮到最差情況並對程式進行維護。
Golang 沒有提供直接的超時處理機制,但可以利用 select 機制變通地解決。因為 select 的特點是隻要其中一個 case 已經完成,程式就會繼續往下執行,而不會考慮其它的 case。基於此特性我們來實現一個 channel 的超時機制:
ch := make(chan int) // 首先實現並執行一個匿名的超時等待函式 timeout := make(chan bool, 1) go func() { time.Sleep(1e9) // 等待 1 秒 timeout <- true }() // 然後把 timeout 這個 channel 利用起來 select { case <-ch: // 從 ch 中讀取到資料 case <- timeout: // 一直沒有從 ch 中讀取到資料,但從 timeout 中讀取到了資料 fmt.Println("Timeout occurred.") }
執行上面的程式碼,輸出的結果為:
Timeout occurred.
關閉 channel
關閉 channel 非常簡單,直接呼叫 Golang 內建的 close() 函式就可以了:
close(ch)
在關閉了 channel 之後我們要面對的問題是:如何判斷一個 channel 是否已關閉?
其實在從 channel 中讀取資料的同時,還可以獲得一個布林型別的值,該值表示 channel 是否已關閉:
x, ok := <-ch
如果 ok 的值為 false,則表示 ch 已經被關閉。
參考:
《Go語言實戰》
《Go語言程式設計入門與實戰技