Golang---高併發
併發的啟動
這篇文章關於併發的啟動我就一概而過了,如果要讓一個函式併發執行,只需一個關鍵字"go":
go的併發啟動非常簡單,幾乎沒有什麼額外的準備工作,要併發的函式和一般的函式沒有什麼區別,引數隨意,啟動的時候只需要加一個go關鍵之即可。func Afuntion(para1, para2, para3, ...) { // Do some process // ... } func main() { go Afuntion(para1, para2, para3, ...) //只需加一個go字首,Afunction()就會併發執行 }
當然,併發的啟動沒什麼好講的,併發最精髓的部分在於這些協程(協程類似於執行緒,但是是更輕量的執行緒)的排程。
我沒法以一個資深的老專家向你全方位的講解排程的各個方面,但是我可以把我遇到過的一些場景和我所用過的排程方法(所以絕對是能用的)分享給你。
go提供了sync包和channel機制來解決協程間的同步與通訊。channel的用法非常靈活,使用的方式多種多樣,而且官網的Effective Go中給出了channel的一種併發以外的方式。我們先來介紹sync包提供的排程支援吧。
sync.WaitGroup
sync包中的WaitGroup實現了一個類似任務佇列的結構,你可以向佇列中加入任務,任務完成後就把任務從佇列中移除,如果佇列中的任務沒有全部完成,佇列就會觸發阻塞以阻止程式繼續執行,具體用法參考如下程式碼:
我們可以利用sync.WaitGroup來滿足這樣的情況:// 程式碼粘上就可以跑通 package main import ( "fmt" "sync" ) var waitgroup sync.WaitGroup func Afunction(shownum int) { fmt.Println(shownum) waitgroup.Done() //任務完成,將任務佇列中的任務數量-1,其實.Done就是.Add(-1) } func main() { for i := 0; i < 10; i++ { waitgroup.Add(1) //每建立一個goroutine,就把任務佇列中任務的數量+1 go Afunction(i) } waitgroup.Wait() //.Wait()這裡會發生阻塞,直到佇列中所有的任務結束就會解除阻塞 }
▲某個地方需要建立多個goroutine,並且一定要等它們都執行完畢後再繼續執行接下來的操作。
是的,WaitGroup最大的優點就是.Wait()可以阻塞到佇列中的任務都完畢後才解除阻塞。
channel
channel是一種golang內建的型別,英語的直譯為"通道",其實,它真的就是一根管道,而且是一個先進先出的資料結構。
我們能對channel進行的操作只有4種:
(1) 建立chennel (通過make()函式)
(2) 放入資料 (通過 channel <- data 操作)
(3) 取出資料 (通過 <-channel 操作)
(4) 關閉channel (通過close()函式)
但是channel有一些非常給力的性質需要你牢記,請一定要記住並理解好它們:
(1) channel是一種阻塞管道,是自動阻塞的。意思就是,如果管道滿了,一個對channel放入資料的操作就會阻塞,直到有某個routine從channel中取出資料,這個放入資料的操作才會執行。相反同理,如果管道是空的,一個從channel取出資料的操作就會阻塞,直到某個routine向這個channel中放入資料,這個取出資料的操作才會執行。這事channel最重要的一個性質,沒有之一。
package main
func main() {
ch := make(chan int, 3)
ch <- 1
ch <- 1
ch <- 1
ch <- 1 //這一行操作就會發生阻塞,因為前三行的放入資料的操作已經把channel填滿了
}
package main
func main() {
ch := make(chan int, 3)
<-ch //這一行會發生阻塞,因為channel才剛建立,是空的,沒有東西可以取出
}
(2)channel分為有緩衝的channel和無緩衝的channel。兩種channel的建立方法如下:
ch := make(chan int) //無緩衝的channel,同等於make(chan int, 0)
ch := make(chan int, 5) //一個緩衝區大小為5的channel
操作一個channel時一定要注意其是否帶有緩衝,因為有些操作會觸發channel的阻塞導致死鎖。下面就來解釋這些需要注意的情景。
首先來看一個一個例子,這個例子是兩段只有主函式不同的程式碼:
package main
import "fmt"
func Afuntion(ch chan int) {
fmt.Println("finish")
<-ch
}
func main() {
ch := make(chan int) //無緩衝的channel
go Afuntion(ch)
ch <- 1
// 輸出結果:
// finish
}
package main
import "fmt"
func Afuntion(ch chan int) {
fmt.Println("finish")
<-ch
}
func main() {
ch := make(chan int) //無緩衝的channel
//只是把這兩行的程式碼順序對調一下
ch <- 1
go Afuntion(ch)
// 輸出結果:
// 死鎖,無結果
}
前一段程式碼最終會輸出"finish"並正常結束,但是後一段程式碼會發生死鎖。為什麼會出現這種現象呢,咱們把上面兩段程式碼的邏輯跑一下。
第一段程式碼:
1. 建立了一個無緩衝channel
2. 啟動了一個goroutine,這個routine中對channel執行取出操作,但是因為這時候channel為空,所以這個取出操作發生阻塞,但是主routine可沒有發生阻塞,它還在繼續執行呢
3. 主goroutine這時候繼續執行下一行,往channel中放入了一個數據
4. 這時阻塞的那個routine檢測到了channel中存在資料了,所以接觸阻塞,從channel中取出資料,程式就此完畢
第二段程式碼:
1. 建立了一個無緩衝的channel
2. 主routine要向channel中放入一個數據,但是因為channel沒有緩衝,相當於channel一直都是滿的,所以這裡會發生阻塞。可是下面的那個goroutine還沒有建立呢,主routine在這裡一阻塞,整個程式就只能這麼一直阻塞下去了,然後。。。然後就沒有然後了。。死鎖!
※從這裡可以看出,對於無緩衝的channel,放入操作和取出操作不能再同一個routine中,而且應該是先確保有某個routine對它執行取出操作,然後才能在另一個routine中執行放入操作。
對於帶緩衝的channel,就沒那麼多講究了,因為有緩衝空間,所以只要緩衝區不滿,放入操作就不會阻塞,同樣,只要緩衝區不空,取出操作就不會阻塞。而且,帶有緩衝的channel的放入和取出可以用在同一個routine中。
但是,並不是說有了緩衝就可以隨意使用channel的放入和取出了,我們一定要注意放入和取出的速率問題。下面我們就舉個例子來說明這種問題:
我們經常會用利用channel自動阻塞的性質來控制當前執行的goroutine的總數量,如下:
package main
import (
"fmt"
)
func Afunction(ch chan int) {
fmt.Println("finish")
<-ch //goroutine執行完了就從channel取出一個數據
}
func main() {
ch := make(chan int, 10)
for i := 0; i < 1000; i++ {
//每當建立goroutine的時候就向channel中放入一個數據,如果裡面已經有10個數據了,就會
//阻塞,由此我們將同時執行的goroutine的總數控制在<=10個的範圍內
ch <- 1
go Afunction(ch)
}
// 這裡只是示範個例子,當然,接下來應該有些更加周密的同步操作
}
上面這種channel的使用方式幾乎經常會用到,但是再看一下接下來這段程式碼,它和上面這種使用channel的方式幾乎一樣,但是它會造成問題:package main
func Afunction(ch chan int) {
ch <- 1
ch <- 1
ch <- 1
ch <- 1
ch <- 1
<-ch
}
func main() {
//主routine的操作同上面那段程式碼
ch := make(chan int, 10)
for i := 0; i < 100; i++ {
ch <- 1
go Afunction(ch)
}
// 這段程式碼執行的結果為死鎖
}
上面這段執行和之前那一段基本上原理是一樣的,但是執行後卻會發生死鎖。為什麼呢?其實總結起來就一句話,"放得太快,取得太慢了"。
按理說,我們應該在我們主routine中建立子goroutine並每次向channel中放入資料,而子goroutine負責從channel中取出資料。但是我們的這段程式碼在建立了子goroutine後,每個routine會向channel中放入5個數據。這樣,每向channel中放入6個數據才會執行一次取出操作,這樣一來就可能會有某一時刻,channel已經滿了,但是所有的routine都在執行放入操作(因為它們當前執行放入操作的概率是執行取出操作的6倍),這樣一來,所有的routine都阻塞了,從而導致死鎖。
在使用帶緩衝的channel時一定要注意放入與取出的速率問題。
(3)關閉後的channel可以取資料,但是不能放資料。而且,channel在執行了close()後並沒有真的關閉,channel中的資料全部取走之後才會真正關閉。
package main
func main() {
ch := make(chan int, 5)
ch <- 1
ch <- 1
close(ch)
ch <- 1 //不能對關閉的channel執行放入操作
// 會觸發panic
}
package main
func main() {
ch := make(chan int, 5)
ch <- 1
ch <- 1
close(ch)
<-ch //只要channel還有資料,就可能執行取出操作
//正常結束
}
package main
import "fmt"
func main() {
ch := make(chan int, 5)
ch <- 1
ch <- 1
ch <- 1
ch <- 1
close(ch) //如果執行了close()就立即關閉channel的話,下面的迴圈就不會有任何輸出了
for {
data, ok := <-ch
if !ok {
break
}
fmt.Println(data)
}
// 輸出:
// 1
// 1
// 1
// 1
//
// 呼叫了close()後,只有channel為空時,channel才會真的關閉
}
使用channel控制goroutine數量
channel的性質到這裡就介紹完了,但是看上去,channel的使用似乎比WaitGroup要注意更多的細節,那麼有什麼理由一定要用channel來實現同步呢?channel相比WaitGroup有一個很大的優點,就是channel不僅可以實現協程的同步,而且可以控制當前正在執行的goroutine的總數。
下面就介紹幾種利用channel控制goroutine數量的方法:
一.如果任務數量是固定的:
package main
func Afunction(ch chan int) {
ch <- 1
}
func main() {
var (
ch chan int = make(chan int, 20) //可以同時執行的routine數量為20
dutycount int = 500
)
for i := 0; i < dutycount; i++ {
go Afunction(ch)
}
//知道了任務總量,可以像這樣利用固定迴圈次數的迴圈檢測所有的routine是否工作完畢
for i := 0; i < dutycount; i++ {
<-ch
}
}
二.如果任務的數量不固定
package main
import (
"fmt"
)
func Afunction(routineControl chan int, feedback chan string) {
defer func() {
<-routineControl
feedback <- "finish"
}()
// do some process
// ...
}
func main() {
var (
routineCtl chan int = make(chan int, 20)
feedback chan string = make(chan string, 10000)
msg string
allwork int
finished int
)
for i := 0; i < 1000; i++ {
routineCtl <- 1
allwork++
go Afunction(routineCtl, feedback)
}
for {
msg = <-feedback
if msg == "finish" {
finished++
}
if finished == allwork {
break
}
}
}
轉載:http://blog.csdn.net/gophers/article/details/24665419