1. 程式人生 > 其它 >go-channel

go-channel

1.channel簡介

Channel是Go中的一個核心型別,你可以把它看成一個管道,通過它併發核心單元就可以傳送或者接收資料進行通訊(communication)。

它的操作符是箭頭 <- 。用來協程間傳遞資料。

ch <- v    // 傳送值v到Channel ch中
v := <-ch  // 從Channel ch中接收資料,並將資料賦值給v
ch := make(chan int, 100)  //容量(capacity)代表Channel容納的最多的元素的數量,代表Channel的快取的大小。
如果沒有設定容量,或者容量設定為0, 說明Channel沒有快取,只有sender和receiver都準備好了後它們的通訊(communication)才會發生(Blocking)。如果設定了快取,就有可能不發生阻塞, 只有buffer滿了後 send才會阻塞, 而只有快取空了後receive才會阻塞。一個nil channel不會通訊。
close(ch)  //關閉channel

無緩衝阻塞場景:1.沒有協程在寫卻讀 2.沒有協程在讀卻寫

有緩衝阻塞場景:1.緩衝中無資料卻讀 2.緩衝已滿卻寫

無緩衝的channel,不管是入還是出,都會阻塞,所以在同一個goroutine中,不能同時對同一個無緩衝channel進行入和出操作;

帶緩衝的channel,在佇列滿之前,不會阻塞;佇列滿之後,依然會阻塞。

2.應用1:作為一個FIFO佇列

package main
//用channel實現佇列,檢視是否原子性
import (
	"errors"
	"fmt"
	"sync"
)

func main(){
	ch := make(chan int, 20)
	fmt.Println(len(ch))
	fmt.Println(cap(ch))
    for i:=0; i<20; i++{
    	ch <- i+1
	}
	wg := sync.WaitGroup{}
	wg.Add(30)
	var err error
	for j:=0; j<30; j++{
		go get(&ch, &wg, &err)
	}
	wg.Wait()
}

func get(ch *chan int, wg *sync.WaitGroup, err *error){
	defer wg.Done()
	select {
	case x := <- *ch:   //如果可以讀出,就讀
		fmt.Printf("get number: %d, channel size: %d\n", x, len(*ch))
		return
	default:     //失敗返回
	    fmt.Println("no number")
		*err = errors.New("channel has no data")
		return
	}
}

3.應用2:實現類似sync.WaitGroup的同步

package main

import (
	"fmt"
	"time"
)

func main() {
	//用sleep實現定時器
	fmt.Println(time.Now())
	time.Sleep(time.Second)
	fmt.Println(time.Now())
	//用timer實現定時器
	timer := time.NewTimer(time.Second)
	fmt.Println(<-timer.C)
	//用after實現定時器
	fmt.Println(<-time.After(time.Second))
	//週期定時
	tiker := time.NewTicker(time.Second)
	for i :=1; i<4; i++{
			fmt.Println(<-tiker.C)
	}
	fmt.Println("------------------")
	//定時完成的操作寫在協程裡
	ticker := time.NewTicker(time.Second * 3)
	ch := make(chan int)  //該channel完成同步,實現下面的協程執行完主執行緒才結束
	go func() {
		var x int
		for x < 10 {
			select {
			case <-ticker.C:
				x++
				fmt.Printf("%d\n", x)
				fmt.Println(time.Now())
				time.Sleep(time.Second * 1)  //case裡的操作不要超過定時時間,不然會不符合預期
			}
		}
		ticker.Stop()
		ch <- 0
	}()
	<-ch
}   
//這裡的channel 實現了一個 sync.WaitGroup的效果,主程序直接到<-ch,無擁塞channel,裡面沒東西,直接拿是會阻塞的,直到裡面有東西
//也就是上面協程裡的10次定時都結束,往channel寫一個東西,就終止阻塞,使程式結束
// ch := make(chan int)  和   ch := make(chan int, 1) 理解:
// 有1的緩衝channel,一個協程寫入拿到這個數,其他再拿都拿不到,配合select,拿不到就走,那個協程執行完操作之後,再往裡寫個數,其他協程就可以拿了。
//其實不管是有緩衝還是無緩衝,單純用都會阻塞,無非是有一個buffer
//但是,加上select,都可以實現不擁塞,直接過。

4.應用3:搭配select實現鎖效果

mutex實現的鎖:擁塞的,一個獲鎖,其他需要一直等待,直到上一個操作釋放,再執行操作

channel實現的鎖:可以選擇擁塞不擁塞,得不到可以直接放棄,也可以等待一段時間執行其他操作。

搭配select 實現的是io操作。

package main
//實現初始化一臺例項的鎖,不同appid都可以更新map
//實現channle的擁塞鎖
import (
	"errors"
	"fmt"
	"sync"
	"time"
)

func main(){
	ch := make(chan int, 1)
	ch <- 1
	wg := sync.WaitGroup{}
	var err error
	wg.Add(10)
	for i:=0; i<10; i++{
		go get_blockf(&ch, &wg, &err)  //一直擁塞
	}
	wg.Wait()
	wg.Add(10)
	for i:=0; i<10; i++{
		go get_block(&ch, &wg, &err)  //擁塞2s
	}
	wg.Wait()
	wg.Add(10)
	for i:=0; i<10; i++{
		go get_noblock(&ch, &wg, &err)  //無擁塞
	}
	wg.Wait()
}

func get_blockf(ch *chan int, wg *sync.WaitGroup, err *error){
	defer wg.Done()
	select {
	case x := <- *ch:   //如果可以讀出,就讀
		fmt.Printf("Block get number: %d, channel size: %d\n", x, len(*ch))
		time.Sleep(time.Second*1)
		*ch <-x+1
	}
	return
}

func get_block(ch *chan int, wg *sync.WaitGroup, err *error){
	defer wg.Done()
	select {
	case x := <- *ch:   //如果可以讀出,就讀
		fmt.Printf("Block 2s get number: %d, channel size: %d\n", x, len(*ch))
		time.Sleep(time.Second*1)
		*ch <-x+1
	case <- time.After(time.Second *20):  //等待過程中一直嘗試,嘗試成功即從ch取數,超時還取不到執行下面操作
		fmt.Println("no number")
		*err = errors.New("channel has no data")
	}
	return
}

func get_noblock(ch *chan int, wg *sync.WaitGroup, err *error){
	defer wg.Done()
	select {
	case x := <- *ch:   //如果可以讀出,就讀
		fmt.Printf("Block 2s get number: %d, channel size: %d\n", x, len(*ch))
		time.Sleep(time.Second*1)
		*ch <-x+1
	default:       //等待過程中一直嘗試,嘗試成功即從ch取數,超時還取不到執行下面操作
		fmt.Println("no number")
		*err = errors.New("channel has no data")
	}
	return
}