併發並行與Go併發程式設計
併發與並行
-
併發(concurrency) 併發的關注點在於任務切分。舉例來說,你是一個創業公司的CEO,開始只有你一個人,你一人分飾多角,一會做產品規劃,一會寫程式碼,一會見客戶,雖然你不能見客戶的同時寫程式碼,但由於你切分了任務,分配了時間片,表現出來好像是多個任務一起在執行。
-
並行(parallelism) 並行的關注點在於同時執行。還是上面的例子,你發現你自己太忙了,時間分配不過來,於是請了工程師,產品經理,市場總監,各司一職,這時候多個任務可以同時執行了。
GreenThread
-
使用者空間 首先是在使用者空間,避免核心態和使用者態的切換導致的成本。
-
由語言或者框架層排程
-
更小的棧空間允許建立大量例項(百萬級別)
幾個概念
-
Continuation 這個概念不熟悉 FP 程式設計的人可能不太熟悉,不過這裡可以簡單的顧名思義,可以理解為讓我們的程式可以暫停,然後下次呼叫繼續(contine)從上次暫停的地方開始的一種機制。相當於程式呼叫多了一種入口。
-
Coroutine 是 Continuation 的一種實現,一般表現為語言層面的元件或者類庫。主要提供 yield,resume 機制。
-
Fiber 和 Coroutine 其實是一體兩面的,主要是從系統層面描述,可以理解成 Coroutine 執行之後的東西就是 Fiber。
Goroutine
Goroutine 其實就是前面 GreenThread 系列解決方案的一種演進和實現。
-
首先,它內建了 Coroutine 機制。因為要使用者態的排程,必須有可以讓程式碼片段可以暫停/繼續的機制。
-
其次,它內建了一個排程器,實現了 Coroutine 的多執行緒並行排程,同時通過對網路等庫的封裝,對使用者遮蔽了排程細節。
-
最後,提供了 Channel 機制,用於 Goroutine 之間通訊,實現 CSP 併發模型(Communicating Sequential Processes)。因為 Go 的 Channel 是通過語法關鍵詞提供的,對使用者遮蔽了許多細節。其實 Go 的 Channel 和 Java 中的 SynchronousQueue 是一樣的機制,如果有 buffer 其實就是 ArrayBlockQueue。
Goroutine 排程器
這個圖一般講 Goroutine 排程器的地方都會引用,想要仔細瞭解的可以看看原部落格(小編:點選閱讀原文獲取)。這裡只說明幾點:
-
M 代表系統執行緒,P 代表處理器(核),G 代表 Goroutine。Go 實現了 M : N 的排程,也就是說執行緒和 Goroutine 之間是多對多的關係。這點在許多GreenThread / Coroutine 的排程器並沒有實現。比如 Java 1.1 版本之前的執行緒其實是 GreenThread(這個詞就來源於 Java),但由於沒實現多對多的排程,也就是沒有真正實現並行,發揮不了多核的優勢,所以後來改成基於系統核心的 Thread 實現了。
-
某個系統執行緒如果被阻塞,排列在該執行緒上的 Goroutine 會被遷移。當然還有其他機制,比如 M 空閒了,如果全域性佇列沒有任務,可能會從其他 M 偷任務執行,相當於一種 rebalance 機制。這裡不再細說,有需要看專門的分析文章。
-
具體的實現策略和我們前面分析的機制類似。系統啟動時,會啟動一個獨立的後臺執行緒(不在 Goroutine 的排程執行緒池裡),啟動 netpoll 的輪詢。當有 Goroutine 發起網路請求時,網路庫會將 fd(檔案描述符)和 pollDesc(用於描述 netpoll 的結構體,包含因為讀 / 寫這個 fd 而阻塞的 Goroutine)關聯起來,然後呼叫 runtime.gopark 方法,掛起當前的 Goroutine。當後臺的 netpoll 輪詢獲取到 epoll(Linux 環境下)的 event,會將 event 中的 pollDesc 取出來,找到關聯的阻塞 Goroutine,並進行恢復。
Goroutine 是銀彈麼?
Goroutine 很大程度上降低了併發的開發成本,是不是我們所有需要併發的地方直接 go func 就搞定了呢?
Go 通過 Goroutine 的排程解決了 CPU 利用率的問題。但遇到其他的瓶頸資源如何處理?比如帶鎖的共享資源,比如資料庫連線等。網際網路線上應用場景下,如果每個請求都扔到一個 Goroutine 裡,當資源出現瓶頸的時候,會導致大量的 Goroutine 阻塞,最後使用者請求超時。這時候就需要用 Goroutine 池來進行控流,同時問題又來了:池子裡設定多少個 Goroutine 合適?
所以這個問題還是沒有從更本上解決。
go沒有嚴格的內建的logical processor數量限制,但是go的runtime預設限制了每個program最多使用10,000個執行緒,可以通過SetMaxThreads修改.下圖展示了Concurrency和Parallelism的區別goroutine使用
go塊
go的用法很簡單,如下. 如果沒有最外面的括號{}(),會顯示go塊必須是一個函式呼叫.沒有()只是一個函式的宣告,有了()是一個呼叫(沒有引數的)
go func() {
for _,n := range nums {
out <- n
}
close(out)
}()
channel
channel預設上是阻塞的,也就是說,如果Channel滿了,就阻塞寫,如果Channel空了,就阻塞讀。於是,我們就可以使用這種特性來同步我們的傳送和接收端。
channel <-
,傳送一個新的值到通道中 <-channel
,從通道中接收一個值,這個更像有兩層含義,一個是會返回一個結果,當做賦值來用:msg := <-channel
;另外一個含義是等待這個channel傳送訊息,所以還有一個等的含義在.所以如果你直接寫fmt.Print(<-channel)
本意只是想輸出下這個chan傳來的值,但是其實他還會阻塞住等著channel來發.
預設傳送和接收操作是阻塞的,直到傳送方和接收方都準備完畢。
funcmain() {
messages := make(chan string)
go func() { messages <- "ping" }()
msg := <-messages
fmt.Println(msg)
}
所以你要是這麼寫:是一輩子都不會執行到print的(會死鎖)
func main() {
messages := make(chan string)
messages <- "ping"
msg := <-messages
fmt.Println(msg)
}
所以在一個go程中,傳送messages <- "msg"
channel的時候,要格外小心,不然一不留神就死鎖了.(解決方法:1. 用帶快取的chan; 2. 使用帶有default的select傳送)
select {
case messages <- "msg":
fmt.Println("sent message")
default:
fmt.Println("no message sent")
}
range
用於channel的range是阻塞的.下面程式會顯示deadloc,去掉註釋就好了.
queue := make(chan string, 2)
//queue <- "one"
//queue <- "two"
//close(queue)
for elem := range queue {
fmt.Println(elem)
}
通道緩衝
加了快取之後,就像你向channel傳送訊息的時候(message <- "ping"
),"ping"就已經發送出去了(到快取).就像一個非同步的佇列?到時候,<-message
直接從快取中取值就好了(非同步...)
但是你要這麼寫,利用通道緩衝,就可以.無緩衝的意味著只有在對應的接收(<-chan
)通道準備好接收時,才允許傳送(chan <-
),可快取通道允許在沒有對應接收方的情況下,快取限定數量的值。
func main() {
message := make(chan string,1)
message <- "ping"
msg := <-message
fmt.Print(msg)
}
要是多發一個messages <- "channel"
,fatal error: all goroutines are asleep - deadlock!
,要是多接受一個fmt.Println(<-messages)
,會打印出buffered channel
,然後報同樣的error
func main() {
messages := make(chan string, 2)
messages <- "buffered"
messages <- "channel"
fmt.Println(<-messages)
fmt.Println(<-messages)
}
通道同步
使用通道同步,如果你把 <- done
這行程式碼從程式中移除,程式甚至會在 worker還沒開始執行時就結束了。
funcworker(done chanbool) {
fmt.Print("working...")
time.Sleep(time.Second) // working
fmt.Println("done")
done <- true
}
funcmain() {
done := make(chan bool, 1)
go worker(done)
<-done //blocking 阻塞在這裡,知道worker執行完畢
}
傳送方向
可以指定這個通道是不是隻用來發送或者接收值。這個特性提升了程式的型別安全性。pong 函式允許通道(pings)來接收資料,另一通道(pongs)來發送資料。
funcping(pings chan<- string, msg string) {
pings <- msg
}
funcpong(pings <-chanstring, pongs chan<- string) {
msg := <-pings
pongs <- msg
}
funcmain() {
pings := make(chan string, 1)
pongs := make(chan string, 1)
ping(pings, "passed message")
pong(pings, pongs)
fmt.Println(<-pongs)
}
select
Go 的select 讓你可以同時等待多個通道操作。(poll/epoll?) 注意select 要麼寫個死迴圈用超時,要不就定好次數.或者加上default讓select變成非阻塞的
go func() {
time.Sleep(time.Second * 1)
c1 <- "one"
}()
go func() {
time.Sleep(time.Second * 2)
c2 <- "two"
}()
for i := 0; i < 2; i++ {
select {
case msg1 := <-c1:
fmt.Println("received", msg1)
case msg2 := <-c2:
fmt.Println("received", msg2)
}
}
超時處理
其中time.After
返回<-chan Time
,直接向select傳送訊息
select {
case res := <-c1:
fmt.Println(res)
case <-time.After(time.Second * 1):
fmt.Println("timeout 1")
}
非阻塞通道操作
default,當監聽的channel都沒有準備好的時候,預設執行的.
select {
case msg := <-messages:
fmt.Println("received message", msg)
default:
fmt.Println("no message received")
}
可以使用 select 語句來檢測 chan 是否已經滿了
ch := make (chan int, 1)
ch <- 1
select {
case ch <- 2:
default:
fmt.Println("channel is full !")
}
通道關閉
一個非空的通道也是可以關閉的,但是通道中剩下的值仍然可以被接收到
queue := make(chan string, 2)
queue <- "one"
queue <- "two"
close(queue)
for elem := range queue {
fmt.Println(elem)
}
定時器
在未來某一刻執行一次時使用的
定時器表示在未來某一時刻的獨立事件。你告訴定時器需要等待的時間,然後它將提供一個用於通知的通道。可以顯示的關閉
timer1 := time.NewTimer(time.Second * 2)
<-timer1.C
<-timer1.C
直到這個定時器的通道 C
明確的傳送了定時器失效的值(2s)之前,將一直阻塞。如果你只是要單純的等待用time.Sleep,定時器是可以在它失效之前把它給取消的stop2 := timer2.Stop()
打點器
當你想要在固定的時間間隔重複執行,定時的執行,直到我們將它停止
funcmain() {
//打點器和定時器的機制有點相似:一個通道用來發送資料。這裡我們在這個通道上使用內建的 range 來迭代值每隔500ms 傳送一次的值。
ticker := time.NewTicker(time.Millisecond * 500)
go func() {
for t := range ticker.C {
fmt.Println("Tick at", t)
}
}()
//打點器可以和定時器一樣被停止。一旦一個打點停止了,將不能再從它的通道中接收到值。我們將在執行後 1600ms停止這個打點器。
time.Sleep(time.Millisecond * 1600)
ticker.Stop()
fmt.Println("Ticker stopped")
}
生成器
類似於提供了一個服務,不過只是適用於呼叫不是很頻繁
funcrand_generator_2()chanint {
out := make(chan int)
go func() {
for {
out <- rand.Int()
}
}()
return out
}
funcmain() {
// 生成隨機數作為一個服務
rand_service_handler := rand_generator_2()
fmt.Printf("%dn", <-rand_service_handler)
}
多路複用
Apache使用處理每個連線都需要一個程序,所以其併發效能不是很好。而Nighx使用多路複用的技術,讓一個程序處理多個連線,所以併發效能比較好。
多路複用技術可以用來整合多個通道。提升效能和操作的便捷。
其實就是整合了多個上面的生成器
funcrand_generator_3()chanint {
rand_generator_1 := rand_generator_2()
rand_generator_2 := rand_generator_2()
out := make(chan int)
go func() {
for {
//讀取生成器1中的資料,整合
out <- <-rand_generator_1
}
}()
go func() {
for {
//讀取生成器2中的資料,整合
out <- <-rand_generator_2
}
}()
return out
}
Furture技術
可以在不準備好引數的情況下呼叫函式。函式呼叫和函式引數準備這兩個過程可以完全解耦。可以在呼叫的時候不關心資料是否準備好,返回值是否計算好的問題。讓程式中的元件在準備好資料的時候自動跑起來。 這個最後取得<-q.result
也是可以放到execQuery
上面的把
Furture技術可以和各個其他技術組合起來用。可以通過多路複用技術,監聽多個結果Channel,當有結果後,自動返回。也可以和生成器組合使用,生成器不斷生產資料,Furture技術逐個處理資料。Furture技術自身還可以首尾相連,形成一個併發的pipe filter。這個pipe filter可以用於讀寫資料流,操作資料流。
type query struct {
sql chan string
result chan string
}
funcexecQuery(q query) {
go func() {
sql := <-q.sql
q.result <- "get " + sql
}()
}
funcmain() {
q := query{make(chan string, 1), make(chan string, 1)}
execQuery(q)
//準備引數
q.sql <- "select * from table"
fmt.Println(<-q.result)
}
Chain Filter技術
程式建立了10個Filter,每個分別過濾一個素數,所以可以輸出前10個素數。
funcGenerate(ch chan<- int) {
for i := 2; ; i++ {
ch <- i
}
}
funcFilter(in <-chanint, out chan<- int, prime int) {
for {
i := <-in // Receive value from 'in'.
if i%prime != 0 {
out <- i // Send 'i' to 'out'.
}
}
}
// The prime sieve: Daisy-chain Filter processes.
funcmain() {
ch := make(chan int) // Create a new channel.
go Generate(ch) // Launch Generate goroutine.
for i := 0; i < 10; i++ {
prime := <-ch
print(prime, "n")
ch1 := make(chan int)
go Filter(ch, ch1, prime)
ch = ch1
}
}
共享變數
有些時候使用共享變數可以讓程式碼更加簡潔
type sharded_var struct {
reader chan int
writer chan int
}
funcsharded_var_whachdog(v sharded_var) {//共享變數維護協程
go func() {
var value int = 0
for { //監聽讀寫通道,完成服務
select {
case value = <-v.writer:
case v.reader <- value:
}
}
}()
}
funcmain() {
v := sharded_var{make(chan int), make(chan int)} //初始化,並開始維護協程
sharded_var_whachdog(v)
fmt.Println(<-v.reader)
v.writer <- 1
fmt.Println(<-v.reader)
}
Concurrency patterns
下面介紹了一些常用的併發模式.
Runner
當你的程式會執行在後臺,可以是cron job或者是Iron.io這樣的worker-based雲環境.這個程式就可以監控和中斷你的程式,如果你的程式執行的太久了.
定義了三個channel來通知任務狀態.
- interrupt:接收系統的終止訊號(比如ctrl-c),接收到之後系統就優雅的退出
- complete:指示任務完成狀態或者返回錯誤
- timeout:當超時了之後,系統就優雅的退出
tasks是一個函式型別的slice,你可以往裡面存放簽名為func funcName(id int){}的函式,作為你的任務.task(id)
就是在執行任務了(當然只是用來模擬任務,可以定義一個任務介面來存放任務,此處是為了簡便). 注意tasks裡面的任務是序列執行的,這些任務的執行發生在一個單獨的goroutine中.
New方法裡的interrupt channel buffer設定為1,也就是說當用戶重複ctrl+c的時候,程式也只會收到一個訊號,其他的訊號會被丟棄.
在run()方法中,在開始執行任務前(task(id)
),會前檢查執行流程有沒有被中斷(if r.gotInterrupt() {}
),這裡用了一個帶default語句的select.一旦收到中斷的事件,程式就不再接受任何其他事件了(signal.Stop(r.interrupt)
).
在Start()方法中,在go塊中執行run()方法,任何當前的goroutine會阻塞在select這邊,直到收到run()返回的complete channel或者超時返回.
// Runner runs a seto