Go併發程式設計
阿新 • • 發佈:2021-07-08
Go併發程式設計
目錄本文是作者學習Go併發程式設計的筆記總結,主要內容是Go併發程式設計的示例程式碼,下面是與本文相關的連結:
本文中包含的程式碼段的完整程式碼可以去作者的 Github下載
說明:
在Go併發程式設計的學習過程中,除了具體如何使用Go實現併發程式設計外,還包括程序、執行緒、協程、生產者消費者模型、互斥量、鎖、條件變數等,下文並不會詳細說明這些概念,如果有想要詳細瞭解這些內容,可以去看Unix系統程式設計和Unix網路程式設計這兩本書。
Go程
Go在語言級別支援協程,叫goroutine。Go語言標準庫提供的所有系統呼叫操作(包括所有同步IO操作),都會出讓CPU給其他goroutine。這讓輕量級執行緒的切換管理不依賴於系統的執行緒和程序,也不需要依賴於CPU的核心數量。
Go程的建立與使用
建立時只需要使用關鍵字 go
func sing() { for i := 0; i < 5; i++ { fmt.Println("Sing something...") time.Sleep(100 * time.Millisecond) } } func dance() { for i := 0; i < 5; i++ { fmt.Println("Someone dancing...") time.Sleep(100 * time.Millisecond) } } func main() { fmt.Println("順序執行") sing() dance() // 併發執行 fmt.Println("併發執行") go sing() go dance() for { ; } }
Go程使用的相關函式說明:
Gosched()、GOMAXPROCS()
// runtime.Gosched() 出讓當前go程所佔用的cpu時間片
// runtime.Goexit() 結束呼叫該函式的當前go程
func main() {
go func() {
for i := 0; i < 10; i++ {
fmt.Println("this is a goroutine test")
//time.Sleep(100 * time.Microsecond)
}
}()
for {
runtime.Gosched()
fmt.Println("this is a main test")
//time.Sleep(100 * time.Microsecond)
}
}
func main() {
// 建立一個子go程
go func () {
for i := 0; i < 5; i++ {
fmt.Println("-----I am Goroutine-----")
time.Sleep(time.Second)
}
}()
// 主Goroutine
for i := 0; i < 5; i++ {
fmt.Println("-----I am main-----")
time.Sleep(time.Second)
if i == 2 {
break
}
}
// 主Goroutine退出 子go程也會退出
/*
-----I am main-----
-----I am Goroutine-----
-----I am Goroutine-----
-----I am main-----
-----I am main-----
-----I am Goroutine-----
-----I am Goroutine-----
*/
// runtime.Gosched() 出讓當前go程所佔用的cpu時間片
}
// runtime.GOMAXPROCS(n) 設定當前程序使用的最大cpu核心數 返回上一次呼叫成功的設定值 首次呼叫返回預設值
func main() {
n := runtime.GOMAXPROCS(2)// 將CPU設為雙核
fmt.Println(n)
for {
go fmt.Print(0)// 子go程
fmt.Print(1)// 主go程
}
}
Channel
channel是一種資料型別(管道),主要用於解決go程同步問題以及協程之間資料共享的問題。
特點:一端寫一端讀
Channel的定義與使用
/*
make(chan 在channel中傳遞的資料型別, 容量)
容量為0表示無緩衝
容量大於0表示有緩衝
*/
// 全域性定義channel 完成資料同步
var channel = make(chan int)
func printer(s string) {
for _, ch := range s {
fmt.Printf("%c", ch)
time.Sleep(300 * time.Millisecond)
}
}
// 先執行
func person1() {
printer("Hello")
channel <- 1// 向channel寫資料 如果寫的資料沒有被讀走 channel阻塞
}
// 後執行
func person2() {
<- channel// 從channel讀
printer("World")
}
func main() {
go person1()
go person2()
// 輸出WHeorllldo person1 person2 交替使用標準輸出 導致輸出結果亂序
for {
;
}
}
Channel同步傳遞資料
func main() {
ch := make(chan string)
// len() 得到channel中剩餘未讀取資料個數
// cap() 得到通道容量
//fmt.Println("len(ch) =", len(ch), "cap(ch) =", cap(ch))
go func () {
for i := 0; i < 5; i++ {
fmt.Println("i = ", i)
}
// 通知主go列印完畢
ch <- "Completed..."
fmt.Println("len(ch) =", len(ch), "cap(ch) =", cap(ch))
}()
//fmt.Println("len(ch) =", len(ch), "cap(ch) =", cap(ch))
str := <- ch
//fmt.Println("len(ch) =", len(ch), "cap(ch) =", cap(ch))
fmt.Println("主go", str)
}
無緩衝Channel和有緩衝Channel
func main() {
ch := make(chan int)
go func() {
for i := 0; i < 5; i++ {
fmt.Println("子go程寫, i =", i)
ch <- i
}
}()
//time.Sleep(time.Second * 2)
for i := 0; i < 5; i++ {
num := <- ch
fmt.Println("主go程讀,i = ", num)
}
}
func main() {
ch := make(chan int, 3)// 存滿3個元素之前不會阻塞
fmt.Println("len =", len(ch), "cap =", cap(ch))
go func() {
for i := 0; i < 5; i++ {
ch <- i
fmt.Println("子go程:", i)
fmt.Println("len =", len(ch), "cap =", cap(ch))
}
}()
time.Sleep(time.Second * 3)
for i := 0; i < 5; i++ {
num := <- ch
fmt.Println("主go程:", num)
fmt.Println("len =", len(ch), "cap =", cap(ch))
}
}
Channel的關閉
func main() {
ch := make(chan int)
go func() {
for i := 0; i < 8; i++ {
ch <- i
}
close(ch)// 寫端寫完資料 主動關閉channel
}()
for {
// 檢測對端是否關閉
if num, ok := <- ch; ok == true {// ok == true, 讀到資料
fmt.Println("Read num =", num)
} else {// channel已經關閉
break
}
}
// 或者換種寫法
// for num := range ch {}
}
/*
資料未傳送完不應該關閉channel
無緩衝channel 無法向已經關閉的channel中寫資料 但是還可以讀
*/
單向Channel的使用
有時在函式中,我們只需要從channel中讀取資料或者寫入資料,這時我們可以使用單向channel
/*
func main() {
// 雙向channel 預設
ch := make(chan int)
var sendCh chan <- int// 單向寫channel
// 可以將雙向channel轉換為單向channel 但是反之不行
sendCh = ch
sendCh <- 754
// 出錯 單向寫channel不能讀
//num := <- sendCh
var recvCh <- chan int = ch// 單向讀channel
num := <- recvCh
fmt.Println(num)
// 反向賦值 出錯
//var ch2 chan int = sendCh
}
*/
func send(out chan <- int) {
out <- 88
close(out)
}
func recv(in <- chan int) {
n := <- in
fmt.Println("Recv num =", n)
}
func main() {
ch := make(chan int)// 雙向channel
go func(){
send(ch)// 雙向channel轉為寫channel
}()
recv(ch)
}
使用Channel實現生產者消費者模型
生產者:傳送端
消費者:接收端
緩衝區作用:
解耦(降低生產者與消費者之間的耦合度)
併發(生產者與消費者數量不對等時 能保持正常通訊)
快取(生產者與消費者資料處理速度不一致時 暫存資料)
func producer(out chan <- int) {
for i := 0; i < 10; i++ {
fmt.Println("producer send", i * i)
out <- i * i
}
close(out)
}
func consumer(in <- chan int) {
for num := range in {
fmt.Println("consumer recv", num)
}
}
func main() {
// 無緩衝channel實現生產者消費者
//ch := make(chan int)
// 有緩衝
ch := make(chan int, 5)
go producer(ch)// 子go程作為生產者
consumer(ch)
}
定時器
建立定時器
func main() {
fmt.Println("Now time:", time.Now())
// 建立定時器
myTimer := time.NewTimer(time.Second * 2)
nowTime := <- myTimer.C// 當系統向定時器中的channel寫完後 再從中讀
fmt.Println("Now time:", nowTime)
}
/*
time.Timer
定時器,由channel實現,當設定的時間到達時,系統會向定時器中的channel寫
type Timer struct {
C <- chan Time
r runtimeTimer
}
*/
定時的3種方式
// 定時器的停止和重置
func main() {
myTimer := time.NewTimer(time.Second * 3)
myTimer.Reset(1 * time.Second)// 重置定時器
go func() {
<- myTimer.C
fmt.Println("子go程讀取定時完畢")
}()
//myTimer.Stop()// 設定定時器停止 子go程無法從定時器讀到任何資料
for {
;
}
}
週期定時
func main() {
// 建立一個是否終止的channel
quit := make(chan bool)
fmt.Println("now:", time.Now())
// 週期定時 每隔一秒 系統會向Ticker.C寫一次
myTicker := time.NewTicker(time.Second)
i := 0
go func() {
for {
nowTime := <- myTicker.C
i++
fmt.Println("nowTime:", nowTime)
// 定時器迴圈了五次後 向quit寫資料
// 主go程從quit讀到資料後 程式退出
if i == 5 {
quit <- true
}
}
}()
<- quit
}
select
使用select監聽每個channel
select的使用
func main() {
ch := make(chan int)// 用於資料通訊的channel
quit := make(chan bool)// 用於判斷是否退出的channel
// 子go寫資料
go func() {
for i := 0; i < 5; i++ {
ch <- i
time.Sleep(time.Second)
}
close(ch)// ch 雖然關閉 但是還可以讀到0
quit <- true
}()
// 主go讀資料
for {
// select下的case中 若果某個case可讀 則執行
// 如果所有case都不可讀 則阻塞在select
// case中有多個滿足監聽條件 任選一個執行
// 可以使用default來處理所有case都不滿足監聽條件的狀況 通常不會這麼使用 會產生忙等待
// select自身不帶有迴圈機制 需要藉助外層for迴圈來監聽
// break只能跳出select
select {
case num := <- ch:
fmt.Println("Read:", num)
case <- quit:// quit 可讀 退出for
fmt.Println("quit")
// break跳出的是select
//break
return
}
// select執行後執行
fmt.Println("-----------------")
}
}
select超時處理
func main() {
ch := make(chan int)
quit := make(chan bool)
go func() {
for {
select {
case num := <- ch:
fmt.Println("Read:", num)
case <- time.After(3 * time.Second):// 超過3秒還沒讀到資料
quit <- true
}
}
}()
for i := 0; i < 5; i++ {
ch <- i
time.Sleep(time.Second)
}
<- quit
fmt.Println("quit")
}
同步相關
使用channel產生死鎖
// 單go程自己死鎖
// channel應該在至少兩個以上go程中進行通訊 否則死鎖
func main1() {
ch := make(chan int)
// fatal error: all goroutines are asleep - deadlock
ch <- 748// 程式死鎖 卡在這一步 等待ch被讀取 而不會執行下面讀取ch的那一步
num := <- ch
fmt.Println("Read:", num)
}
// go程間channel訪問順序導致死鎖
// 使用channel時 讀寫兩端要有同時有機會執行
func main2() {
ch := make(chan int)
num := <- ch// 死鎖 等待讀 導致子go程不會執行 即寫操作不會執行
fmt.Println("Read:", num)
go func() {
ch <- 789
}()
}
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
//
go func() {
for {
select {
case num := <- ch1:
ch2 <- num
}
}
}()
for {
select {
case num := <- ch2:
ch1 <- num
}
}
}
互斥鎖
// 藉助channel完成資料同步
//var ch := make(chan int)
// 通過鎖完成資料同步
var mutex sync.Mutex// 建立互斥鎖 新建互斥鎖狀態為未加鎖0
func printer(str string) {
mutex.Lock()// 訪問共享資料之前加鎖
for _, ch := range str {
fmt.Printf("%c", ch)
time.Sleep(time.Millisecond * 300)
}
mutex.Unlock()// 共享資料訪問結束 解鎖
}
func person1() {
printer("Hello")
//ch <- 111
}
func person2() {
//<- ch
printer("World")
}
func main() {
go person1()
go person2()
for {
;
}
}
讀寫鎖
讀寫鎖的使用
// 讀寫鎖
//var rwMutex sync.RWMutex
// 在go中儘量不要將互斥鎖 讀寫鎖 與 channel混用 可能造成隱形死鎖
// 下面程式會死鎖
// 不使用channel 而是用全域性變數
func readGo(in <- chan int, idx int) {
for {
rwMutex.RLock()// 讀 加鎖
num := <- in
fmt.Println("Id", idx, "Read", num)
rwMutex.RUnlock()// 讀 解鎖
}
}
func writeGo(out chan <- int, idx int) {
for {
// 生成隨機數
num := rand.Intn(1000)
rwMutex.Lock()// 寫 加鎖
out <- num
fmt.Println("Id", idx, "Write", num)
//time.Sleep(time.Millisecond * 300)
rwMutex.Unlock()
time.Sleep(time.Millisecond * 300)
}
}
func main() {
// 隨機數種子
rand.Seed(time.Now().UnixNano())
ch := make(chan int)
//quit := make(chan bool)
// 5個讀go程 5個寫go程
for i := 0; i < 5; i++ {
go readGo(ch, i)
}
for i := 0; i < 5; i++ {
go writeGo(ch, i)
}
//<- quit
for {
;
}
}
// 使用全域性變數
var value int// 定義全域性變數 模擬共享資料
func readGo(idx int) {
for {
rwMutex.RLock()// 讀 加鎖
num := value
fmt.Println("Id", idx, "Read", num)
time.Sleep(time.Millisecond * 300)
rwMutex.RUnlock()// 讀 解鎖
}
}
func writeGo(idx int) {
for {
// 生成隨機數
num := rand.Intn(1000)
rwMutex.Lock()// 寫 加鎖
value = num
fmt.Println("Id", idx, "Write", num)
time.Sleep(time.Millisecond * 300)
rwMutex.Unlock()
}
}
func main() {
// 隨機數種子
rand.Seed(time.Now().UnixNano())
//ch := make(chan int)
//quit := make(chan bool)
// 5個讀go程 5個寫go程
for i := 0; i < 5; i++ {
go readGo(i)
}
for i := 0; i < 5; i++ {
go writeGo(i)
}
//<- quit
for {
;
}
}
使用channel模擬讀寫鎖
var value int
func readGo(in <- chan int, idx int) {
for {
num := <- in
fmt.Println("Id", idx, "Read", num)
time.Sleep(time.Millisecond * 300)
}
}
func writeGo(out chan <- int, idx int) {
for {
// 生成隨機數
num := rand.Intn(1000)
out <- num
fmt.Println("Id", idx, "Write", num)
time.Sleep(time.Millisecond * 300)
}
}
func main() {
// 隨機數種子
rand.Seed(time.Now().UnixNano())
ch := make(chan int)
//quit := make(chan bool)
// 5個讀go程 5個寫go程
for i := 0; i < 5; i++ {
go readGo(ch, i)
}
for i := 0; i < 5; i++ {
go writeGo(ch, i)
}
//<- quit
for {
;
}
}
條件變數
var cond sync.Cond// 全域性條件變數
func producer(out chan <- int, idx int) {
for {
// 加鎖
cond.L.Lock()
// 判斷緩衝區是否滿
for len(out) == 5 {
cond.Wait()// 等待緩衝區有位置可寫
}
num := rand.Intn(800)
out <- num
fmt.Println("Idx", idx, "Write", num)
// 解鎖
cond.L.Unlock()
// 喚醒對端 即消費者
cond.Signal()
time.Sleep(time.Millisecond * 200)
}
}
func consumer(in <- chan int, idx int) {
for {
cond.L.Lock()
for len(in) == 0 {
cond.Wait()
}
num := <- in
fmt.Println("idx", idx, "Read", num)
cond.L.Unlock()
cond.Signal()
time.Sleep(time.Millisecond * 200)
}
}
func main() {
ch := make(chan int, 5)
//quit := make(chan int)
rand.Seed(time.Now().UnixNano())
// 指定條件變數使用的鎖
cond.L = new(sync.Mutex)
for i := 0; i < 5; i++ {
go producer(ch, i)
}
for i := 0; i < 5; i++ {
go consumer(ch, i)
}
//<- quit
for {
;
}
}
轉載請註明出處