Go併發程式設計--通過channel實現生產者消費者模型
概述
生產者消費者模型是多執行緒設計的經典模型,該模型被廣泛的應用到各個系統的多執行緒/程序模型設計中。本文介紹了Go語言中channel的特性,並通過Go語言實現了兩個生產者消費者模型。
channel的一些特性
在Go中channel是非常重要的協程通訊的手段,channel是雙向的通道,通過channel可以實現協程間資料的傳遞,通過channel也可以實現協程間的同步(後面會有介紹)。本文介紹的生產者消費者模型主要用到了channel的以下特性:任意時刻只能有一個協程能夠對channel中某一個item進行訪問。
單生產者單消費者模型
把生產者和消費者都放到一個無線迴圈中,這個和我們的伺服器端的任務處理非常相似。生產者不斷的向channel中放入資料,而消費者不斷的從channel中取出資料,並對資料進行處理(列印)。由於生產者的協程不會退出,所以channel的寫入會永久存在,這樣當channel中沒有放入資料時,消費者端將會阻塞,等待生產者端放入資料。
程式碼的實現如下:
package main
import (
"fmt"
"time"
)
var ch1 chan int = make(chan int)
var bufChan chan int = make(chan int, 1000)
var msgChan chan int = make(chan int)
func sum(a int, b int) {
ch1 <- a + b
}
// write data to channel
func writer(max int) {
for {
for i := 0; i < max; i++ { // 簡單的向channel中放入一個整數
bufChan <- i
time.Sleep(1 * time.Millisecond) //控制放入的頻率
}
}
}
// read data fro m channel
func reader(max int) {
for {
r := <-bufChan
fmt.Printf("read value: %d\n", r)
}
// 通知主執行緒,工作結束了,這一步可以省略
msgChan <- 1
}
func testWriterAndReader(max int ) {
go writer(max)
go reader(max)
// writer 和reader的任務結束了,主執行緒會得到通知
res := <-msgChan
fmt.Printf("task is done: value=%d\n", res)
}
func main() {
testWriterAndReader(100)
}
多生產者消費者模型
我們可以利用channel在某個時間點只能有一個協程能夠訪問其中的某一個數據,的特性來實現生產者消費者模型。由於channel具有這樣的特性,我們在放資料和消費資料時可以不需要加鎖。
package main
import (
"time"
"fmt"
"os"
)
var ch1 chan int = make(chan int)
var bufChan chan int = make(chan int, 1000)
var msgChan chan string = make(chan string)
func sum(a int, b int) {
ch1 <- a + b
}
// write data to channel
func writer(max int) {
for {
for i := 0; i < max; i++ {
bufChan <- i
fmt.Fprintf(os.Stderr, "%v write: %d\n", os.Getpid(), i)
time.Sleep(10 * time.Millisecond)
}
}
}
// read data fro m channel
func reader(name string) {
for {
r := <-bufChan
fmt.Printf("%s read value: %d\n", name, r)
}
msgChan <- name
}
func testWriterAndReader(max int) {
// 開啟多個writer的goroutine,不斷地向channel中寫入資料
go writer(max)
go writer(max)
// 開啟多個reader的goroutine,不斷的從channel中讀取資料,並處理資料
go reader("read1")
go reader("read2")
go reader("read3")
// 獲取三個reader的任務完成狀態
name1 := <-msgChan
name2 := <-msgChan
name3 := <-msgChan
fmt.Println("%s,%s,%s: All is done!!", name1, name2, name3)
}
func main() {
testWriterAndReader(100)
}
輸出如下:
read3 read value: 0
80731 write: 0
80731 write: 0
read1 read value: 0
80731 write: 1
read2 read value: 1
80731 write: 1
read3 read value: 1
80731 write: 2
read2 read value: 2
80731 write: 2
... ...
總結
本文通過channel實現了經典的生產者和消費者模型,利用了channel的特性。但要注意,當消費者的速度小於生產者時,channel就有可能產生擁塞,導致佔用記憶體增加,所以,在實際場景中需要考慮channel的緩衝區的大小。設定了channel的大小,當生產的資料大於channel的容量時,生產者將會阻塞,這些問題都是要在實際場景中需要考慮的。
一個解決辦法就是使用一個固定的陣列或切片作為環形緩衝區,而非channel,通過Sync包的機制來進行同步,實現生產者消費者模型,這樣可以避免由於channel滿而導致消費者端阻塞。但,對於環形緩衝區而言,可能會覆蓋老的資料,同樣需要考慮具體的使用場景。關於環形緩衝區的原理和實現,在分析Sync包的使用時再進一步分析。