go-channel
阿新 • • 發佈:2021-08-12
1.channel簡介
Channel是Go中的一個核心型別,你可以把它看成一個管道,通過它併發核心單元就可以傳送或者接收資料進行通訊(communication)。
它的操作符是箭頭 <- 。用來協程間傳遞資料。
ch <- v // 傳送值v到Channel ch中 v := <-ch // 從Channel ch中接收資料,並將資料賦值給v ch := make(chan int, 100) //容量(capacity)代表Channel容納的最多的元素的數量,代表Channel的快取的大小。 如果沒有設定容量,或者容量設定為0, 說明Channel沒有快取,只有sender和receiver都準備好了後它們的通訊(communication)才會發生(Blocking)。如果設定了快取,就有可能不發生阻塞, 只有buffer滿了後 send才會阻塞, 而只有快取空了後receive才會阻塞。一個nil channel不會通訊。 close(ch) //關閉channel
無緩衝阻塞場景:1.沒有協程在寫卻讀 2.沒有協程在讀卻寫
有緩衝阻塞場景:1.緩衝中無資料卻讀 2.緩衝已滿卻寫
無緩衝的channel,不管是入還是出,都會阻塞,所以在同一個goroutine中,不能同時對同一個無緩衝channel進行入和出操作;
帶緩衝的channel,在佇列滿之前,不會阻塞;佇列滿之後,依然會阻塞。
2.應用1:作為一個FIFO佇列
package main //用channel實現佇列,檢視是否原子性 import ( "errors" "fmt" "sync" ) func main(){ ch := make(chan int, 20) fmt.Println(len(ch)) fmt.Println(cap(ch)) for i:=0; i<20; i++{ ch <- i+1 } wg := sync.WaitGroup{} wg.Add(30) var err error for j:=0; j<30; j++{ go get(&ch, &wg, &err) } wg.Wait() } func get(ch *chan int, wg *sync.WaitGroup, err *error){ defer wg.Done() select { case x := <- *ch: //如果可以讀出,就讀 fmt.Printf("get number: %d, channel size: %d\n", x, len(*ch)) return default: //失敗返回 fmt.Println("no number") *err = errors.New("channel has no data") return } }
3.應用2:實現類似sync.WaitGroup的同步
package main import ( "fmt" "time" ) func main() { //用sleep實現定時器 fmt.Println(time.Now()) time.Sleep(time.Second) fmt.Println(time.Now()) //用timer實現定時器 timer := time.NewTimer(time.Second) fmt.Println(<-timer.C) //用after實現定時器 fmt.Println(<-time.After(time.Second)) //週期定時 tiker := time.NewTicker(time.Second) for i :=1; i<4; i++{ fmt.Println(<-tiker.C) } fmt.Println("------------------") //定時完成的操作寫在協程裡 ticker := time.NewTicker(time.Second * 3) ch := make(chan int) //該channel完成同步,實現下面的協程執行完主執行緒才結束 go func() { var x int for x < 10 { select { case <-ticker.C: x++ fmt.Printf("%d\n", x) fmt.Println(time.Now()) time.Sleep(time.Second * 1) //case裡的操作不要超過定時時間,不然會不符合預期 } } ticker.Stop() ch <- 0 }() <-ch } //這裡的channel 實現了一個 sync.WaitGroup的效果,主程序直接到<-ch,無擁塞channel,裡面沒東西,直接拿是會阻塞的,直到裡面有東西 //也就是上面協程裡的10次定時都結束,往channel寫一個東西,就終止阻塞,使程式結束 // ch := make(chan int) 和 ch := make(chan int, 1) 理解: // 有1的緩衝channel,一個協程寫入拿到這個數,其他再拿都拿不到,配合select,拿不到就走,那個協程執行完操作之後,再往裡寫個數,其他協程就可以拿了。 //其實不管是有緩衝還是無緩衝,單純用都會阻塞,無非是有一個buffer //但是,加上select,都可以實現不擁塞,直接過。
4.應用3:搭配select實現鎖效果
mutex實現的鎖:擁塞的,一個獲鎖,其他需要一直等待,直到上一個操作釋放,再執行操作
channel實現的鎖:可以選擇擁塞不擁塞,得不到可以直接放棄,也可以等待一段時間執行其他操作。
搭配select 實現的是io操作。
package main
//實現初始化一臺例項的鎖,不同appid都可以更新map
//實現channle的擁塞鎖
import (
"errors"
"fmt"
"sync"
"time"
)
func main(){
ch := make(chan int, 1)
ch <- 1
wg := sync.WaitGroup{}
var err error
wg.Add(10)
for i:=0; i<10; i++{
go get_blockf(&ch, &wg, &err) //一直擁塞
}
wg.Wait()
wg.Add(10)
for i:=0; i<10; i++{
go get_block(&ch, &wg, &err) //擁塞2s
}
wg.Wait()
wg.Add(10)
for i:=0; i<10; i++{
go get_noblock(&ch, &wg, &err) //無擁塞
}
wg.Wait()
}
func get_blockf(ch *chan int, wg *sync.WaitGroup, err *error){
defer wg.Done()
select {
case x := <- *ch: //如果可以讀出,就讀
fmt.Printf("Block get number: %d, channel size: %d\n", x, len(*ch))
time.Sleep(time.Second*1)
*ch <-x+1
}
return
}
func get_block(ch *chan int, wg *sync.WaitGroup, err *error){
defer wg.Done()
select {
case x := <- *ch: //如果可以讀出,就讀
fmt.Printf("Block 2s get number: %d, channel size: %d\n", x, len(*ch))
time.Sleep(time.Second*1)
*ch <-x+1
case <- time.After(time.Second *20): //等待過程中一直嘗試,嘗試成功即從ch取數,超時還取不到執行下面操作
fmt.Println("no number")
*err = errors.New("channel has no data")
}
return
}
func get_noblock(ch *chan int, wg *sync.WaitGroup, err *error){
defer wg.Done()
select {
case x := <- *ch: //如果可以讀出,就讀
fmt.Printf("Block 2s get number: %d, channel size: %d\n", x, len(*ch))
time.Sleep(time.Second*1)
*ch <-x+1
default: //等待過程中一直嘗試,嘗試成功即從ch取數,超時還取不到執行下面操作
fmt.Println("no number")
*err = errors.New("channel has no data")
}
return
}