1. 程式人生 > >Go的channel常見使用方式

Go的channel常見使用方式

go關鍵字可以用來開啟一個goroutine(協程))進行任務處理,而多個任務之間如果需要通訊,就需要用到channel了。

func testSimple(){
	intChan := make(chan int)

	go func() {
		intChan <- 1
	}()

	value := <- intChan
	fmt.Println("value : ", value)
}

上面這個簡單的例子就是新開啟的goroutine向intChan傳送了一個1的值,那麼在主執行緒的intChan就會收到這個值的資訊。

channel型別:無緩衝和緩衝型別

channel有兩種形式的,一種是無緩衝的,一個執行緒向這個channel傳送了訊息後,會阻塞當前的這個執行緒,知道其他執行緒去接收這個channel的訊息。無緩衝的形式如下:

intChan := make(chan int)

帶緩衝的channel,是可以指定緩衝的訊息數量,當訊息數量小於指定值時,不會出現阻塞,超過之後才會阻塞,需要等待其他執行緒去接收channel處理,帶緩衝的形式如下:

//3為緩衝數量
intChan := make(chan int, 3)

傳輸struct結構資料

channel可以傳輸基本型別的資料如int, string,同時也可以傳輸struct資料


type Person struct {
	Name    string
	Age     uint8
	Address Addr
}

type Addr struct {
	city     string
	district string
}

/*
測試channel傳輸複雜的Struct資料
 */
func testTranslateStruct() {
	personChan := make(chan Person, 1)

	person := Person{"xiaoming", 10, Addr{"shenzhen", "longgang"}}
	personChan <- person

	person.Address = Addr{"guangzhou", "huadu"}
	fmt.Printf("src person : %+v \n", person)

	newPerson := <-personChan
	fmt.Printf("new person : %+v \n", newPerson)
}

這裡可以看到可以通過channel傳輸自定義的Person物件,同時一端修改了資料,不影響另一端的資料,也就是說通過channel傳遞後的資料是獨立的。

關閉channel

channel可以進行關閉,例如寫的一段關閉了channel,那麼讀的一端讀取時就可以檢測讀取失敗

/*
測試關閉channel
 */
func testClose() {
	ch := make(chan int, 5)
	sign := make(chan int, 2)

	go func() {
		for i := 1; i <= 5; i++ {
			ch <- i
			time.Sleep(time.Second)
		}

		close(ch)

		fmt.Println("the channel is closed")

		sign <- 0

	}()

	go func() {
		for {
			i, ok := <-ch
			fmt.Printf("%d, %v \n", i, ok)

			if !ok {
				break
			}

			time.Sleep(time.Second * 2)
		}

		sign <- 1

	}()

	<-sign
	<-sign
}

合併多個channel的輸出

可以將多個channel的資料合併到一個channel進行輸出,形成一個訊息佇列

/**
將多個輸入的channel進行合併成一個channel
 */
func testMergeInput() {
	input1 := make(chan int)
	input2 := make(chan int)
	output := make(chan int)

	go func(in1, in2 <-chan int, out chan<- int) {
		for {
			select {
			case v := <-in1:
				out <- v
			case v := <-in2:
				out <- v
			}
		}
	}(input1, input2, output)

	go func() {
		for i := 0; i < 10; i++ {
			input1 <- i
			time.Sleep(time.Millisecond * 100)
		}
	}()

	go func() {
		for i := 20; i < 30; i++ {
			input2 <- i
			time.Sleep(time.Millisecond * 100)
		}
	}()

	go func() {
		for {
			select {
			case value := <-output:
				fmt.Println("輸出:", value)
			}
		}
	}()

	time.Sleep(time.Second * 5)
	fmt.Println("主執行緒退出")
}

通過channel實現退出的通知

定義一個用於退出的channel比如quit,不斷執行任務的執行緒通過select監聽quit的讀取,當讀取到quit中的訊息時,退出當前的任務執行緒,這裡是主執行緒通知任務執行緒退出。

/*
測試channel用於通知中斷退出的問題
 */
func testQuit() {
	g := make(chan int)
	quit := make(chan bool)

	go func() {
		for {
			select {
			case v := <-g:
				fmt.Println(v)
			case <-quit:
				fmt.Println("B退出")
				return
			}
		}
	}()

	for i := 0; i < 3; i++ {
		g <- i
	}
	quit <- true
	fmt.Println("testAB退出")
}

生產者消費者問題

通過channel可以比較方便的實現生產者消費者模型,這裡開啟一個生產者執行緒,一個消費者執行緒,生產者執行緒往channel中傳送訊息,同時阻塞,消費者執行緒輪詢獲取channel中的訊息,
進行處理,然後阻塞,這時生產者執行緒喚醒繼續後面的邏輯,如此便形成了簡單的生產者消費者模型。同時生產者在完成了所有的訊息傳送後,可以通過quit這個channel通知消費者執行緒退出,
而消費者執行緒退出時,通知主執行緒退出,整個程式完成退出。

/**
生產者消費者問題
 */
func testPCB() {
	fmt.Println("test PCB")

	intchan := make(chan int)
	quitChan := make(chan bool)
	quitChan2 := make(chan bool)

	value := 0

	go func() {
		for i := 0; i < 3; i++ {

			value = value + 1
			intchan <- value

			fmt.Println("write finish, value ", value)

			time.Sleep(time.Second)
		}
		quitChan <- true
	}()
	go func() {
		for {
			select {
			case v := <-intchan:
				fmt.Println("read finish, value ", v)
			case <-quitChan:
				quitChan2 <- true
				return
			}
		}

	}()

	<-quitChan2
	fmt.Println("task is done ")
}

輸出順序問題

/*
這個結果輸出是1,2, 也可能是2,1, 也可能是2,順序是不一定的
 */
func testSequnse() {
	ch := make(chan int)

	go func() {
		v := <-ch
		fmt.Println(v)
	}()
	ch <- 1
	fmt.Println("2")
}

上面這個輸出結果是什麼呢?執行一下會發現,可能是1,2,也可能是2,1, 也可能是2,順序是不一定的,那麼為什麼會是這樣的,我覺得因為這是兩個不同的執行緒,
它們是獨立執行的,當v := <-ch 執行之後,主執行緒和當前執行緒都是執行狀態(非阻塞),先執行主執行緒還是新執行緒的輸出就看cpu運行了,所以結果是不確定的。

channel的超時處理

通過time可以實現channel的超時處理,當一個channel讀取超過一定時間沒有訊息到來時,就可以得到超時通知處理,防止一直阻塞當前執行緒

/*
檢查channel讀寫超時,並做超時的處理
 */
func testTimeout() {
	g := make(chan int)
	quit := make(chan bool)

	go func() {
		for {
			select {
			case v := <-g:
				fmt.Println(v)
			case <-time.After(time.Second * time.Duration(3)):
				quit <- true
				fmt.Println("超時,通知主執行緒退出")
				return
			}
		}
	}()

	for i := 0; i < 3; i++ {
		g <- i
	}

	<-quit
	fmt.Println("收到退出通知,主執行緒退出")
}

channel的輸入輸出型別指定

channel可以在顯示指定它是輸入型還是輸出型的,指定為輸入型,則不能使用它輸出訊息,否則出錯編譯不通過,同理,輸出型不能接受訊息輸入,
這樣可以在編寫程式碼時防止手誤寫錯誤輸入輸出型別而導致程式錯誤的問題。指定輸入輸出型別可以在方法引數時設定,那麼它只在當前方法中會做輸入輸出限制,
可看下面實現。

/*
指定channel是輸入還是輸出型的,防止編寫時寫錯誤輸入輸出,指定了的話,可以在編譯時期作錯誤的檢查
 */
func testInAndOutChan() {
	ch := make(chan int)
	quit := make(chan bool)

	//輸入型的chan是這種格式的:inChan chan<- int,如果換成輸出型的,則編譯時會報錯
	go func(inChan chan<- int) {
		for i := 0; i < 10; i++ {
			inChan <- i
			time.Sleep(time.Millisecond * 500)
		}
		quit <- true
		quit <- true
	}(ch)

	go func(outChan <-chan int) {
		for {
			select {
			case v := <-outChan:
				fmt.Println("print out value : ", v)
			case <-quit:
				fmt.Println("收到退出通知,退出")
				return
			}
		}
	}(ch)

	<-quit
	fmt.Println("收到退出通知,主執行緒退出")
}

channel實現併發數量控制

通過設定一個帶緩衝數量的的channel來實現最大併發數量,最大併發數量即為緩衝數量,任務開始時想limit這個channel傳送訊息,
任務執行完成後從這個limit讀取訊息,這樣就可以保證當併發數量達到limit的緩衝數量時,limit <- true 這裡會發生阻塞,停止
建立新的執行緒,知道某個執行緒執行完成任務後,從limit讀取資料,這樣就能保證最大併發數量控制在緩衝數量。

/*
測試通過channel來控制最大併發數,來處理事件
 */
func testMaxNumControl()  {
	maxNum := 3
	limit := make(chan bool, maxNum)
	quit := make(chan bool)

	for i:=0; i<100; i++{
		fmt.Println("start worker : ", i)

		limit <- true

		go func(i int) {
			fmt.Println("do worker start: ", i)
			time.Sleep(time.Millisecond * 20)
			fmt.Println("do worker finish: ", i)

			<- limit

			if i == 99{
				fmt.Println("完成任務")
				quit <- true
			}

		}(i)
	}

	<-quit
	fmt.Println("收到退出通知,主程式退出")
}

監聽中斷訊號的channel

可以建立一個signal訊號的channel,同時通過signal.Notify來監聽os.Interrupt這個中斷訊號,因此執行到<- quit時就會阻塞在這裡,
直到收到了os.Interrupt這個中斷訊號,比如按Ctrl+C中斷程式的時候,主程式就會退出了。當然還可以監聽其他訊號,例如os.Kill等。
/*
監聽中斷訊號的channel
*/
func testSignal() {
quit := make(chan os.Signal)
signal.Notify(quit, os.Interrupt)

go func() {
	time.Sleep(time.Second * 2)

	number := 0;
	for{
		number++
		println("number : ", number)
		time.Sleep(time.Second)
	}
}()

fmt.Println("按Ctrl+C可退出程式")
<- quit
fmt.Println("主程式退出")

}

channel實現同步控制,生產者消費者模型

開啟多個執行緒做賺錢和花錢的操作,共享讀寫remainMoney這個剩餘金額變數,實現生產者消費者模型

//同步控制模型,生產者模型
var lockChan = make(chan int, 1)
var remainMoney = 1000
func testSynchronize()  {
	quit := make(chan bool, 2)

	go func() {
		for i:=0; i<10; i++{
			money := (rand.Intn(12) + 1) * 100
			go testSynchronize_expense(money)

			time.Sleep(time.Millisecond * time.Duration(rand.Intn(500)))
		}

		quit <- true
	}()

	go func() {
		for i:=0; i<10; i++{
			money := (rand.Intn(12) + 1) * 100
			go testSynchronize_gain(money)

			time.Sleep(time.Millisecond * time.Duration(rand.Intn(500)))
		}

		quit <- true
	}()

	<- quit
	<- quit

	fmt.Println("主程式退出")
}

func testSynchronize_expense(money int)  {
	lockChan <- 0

	if(remainMoney >= money){
		srcRemainMoney := remainMoney
		remainMoney -= money
		fmt.Printf("原來有%d, 花了%d,剩餘%d\n", srcRemainMoney, money, remainMoney)
	}else{
		fmt.Printf("想消費%d錢不夠了, 只剩%d\n", money, remainMoney)
	}

	<- lockChan
}

func testSynchronize_gain(money int)  {
	lockChan <- 0

	srcRemainMoney := remainMoney
	remainMoney += money
	fmt.Printf("原來有%d, 賺了%d,剩餘%d\n", srcRemainMoney, money, remainMoney)

	<- lockChan
}