go——併發
併發與並行的區別:
併發:邏輯上具備同時處理多個任務的能力。
並行:物理上在同一時刻執行多個併發任務。
通常都會說程式是併發設計的,也就是說它允許多個任務同時執行,但實際上並不一定真在同一時刻發生。
在單核處理器上,它們能以間隔方式切換執行。
而並行則依賴多核處理器等物理裝置,讓多個任務真正在同一時刻執行,它代表了當前程式執行狀態。
簡單點說,並行是併發設計的理想執行模式。
多執行緒或多程序是並行的基本條件,但單執行緒也可用協程(coroutine)做到併發。
儘管協程在單個執行緒上通過主動切換來實現多工併發,但它也有自己的優勢。
除了將因阻塞而浪費的時間找回來外,還免去了執行緒切換開銷,有著不錯的執行效率。
協程上執行的多個任務本質上依舊序列,加上可控自主排程,所以並不需要做同步處理。
即使採用多執行緒也未必能並行。Python就因GIL限制,預設只能併發而不能並行,所以很多時候轉而使用“多程序”+“協程”架構。
通常情況下,用多程序來實現分散式和負載均衡,減輕單程序垃圾回收壓力;
用多執行緒(LWP)搶奪更多的處理器資源;用協程來提高處理器時間片利用率。
簡單將goroutine歸納為協程並不合適。執行時會建立多個執行緒來執行併發任務,且任務單元可被排程到其它執行緒並行執行。
這更像是多執行緒和協程的綜合體,能最大限度提升執行效率,發揮多核處理能力。
只須在函式呼叫前新增go關鍵字即可建立併發任務。
go fmt.PrintLn("hello")
關鍵字go並非執行併發操作,而是建立一個併發任務單元。
新建任務被放置在系統佇列中,等待排程器安排合適系統執行緒去獲取執行權。
當前流程不會阻塞,不會等待該任務啟動,且執行時也不保證併發任務的執行次序。
每個任務單元除儲存函式指標、呼叫引數外,還會分配執行所需的棧記憶體空間。
相比系統預設MB級別的執行緒棧,goroutine自定義棧初始僅須2KB,所以才能建立成千上萬的併發任務。
自定義棧採取按需分配策略,在需要時進行擴容,最大能到GB規模。
與defer一樣,goroutine也會因“延遲執行”而立即計算並複製執行引數。
package main import ( "fmt" "time" ) var c int //初始為0 func counter() int { c++ return c } func main() { a := 100 go func(x, y int) { //匿名函式直接執行 //hour min second 時分秒 time.Sleep(time.Second) //休息一秒鐘 fmt.Println("go", x, y) //執行counter 100 1 //goroutine在main之後執行 }(a, counter()) //立即計算並複製引數 a += 100 fmt.Println("main:", a, counter()) //200 2 time.Sleep(time.Second * 3) //等待goroute結束 } /* main: 200 2 go 100 1 為什麼先列印main? go建立一個併發單元,但是不會馬上執行,而是會放置在佇列中 */
程序退出時不會等待併發任務結束,可用通道(channel)阻塞,然後發出退出訊號。
package main import ( "fmt" "time" ) func main() { // exit := make(chan struct{}) //建立通道。因為僅僅是通知,資料並沒有實際意義。 go func() { time.Sleep(time.Second) fmt.Println("goroutine done.") // close(exit) //關閉通道發出訊號 }() fmt.Println("main...") // <-exit //如通道關閉,立即解除阻塞。 fmt.Println("main exit...") }
<-exit接收操作符,如果exit代表了元素型別為byte的通道型別值,則此表示式就表示從exit中接收一個byte型別值的操作。
如果不建立通道,main直接結束之後程序就結束了,而不會等待併發任務的結束,這樣就不會執行併發任務。
所以,我們可以建立一個通道,當通道關閉後才會解除阻塞,整個程式才會結束
除關閉通道外,寫入資料也可解除阻塞。
如果等待多個任務結束,推薦使用sync.WaitGroup。
通過設定計數器,讓每個goroutine在退出前遞減,直至歸零時解除阻塞。
package main import ( "fmt" "sync" "time" ) func main() { var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) //累加計數,每迴圈以此加一 go func(id int) { //建立多個任務單元 defer wg.Done() //遞減計數,這個任務計數之前減一 time.Sleep(time.Second) fmt.Println("goroutine", id, "done") }(i) } fmt.Println("main...") wg.Wait() //阻塞,直至wg歸零 fmt.Println("main exit") } /* main... goroutine 6 done goroutine 1 done goroutine 2 done goroutine 4 done goroutine 9 done goroutine 8 done goroutine 0 done goroutine 7 done goroutine 5 done goroutine 3 done main exit */
儘管WaitGroup.Add實現了原子操作,但建議在goroutine外累加計數器,以免Add尚未執行,wait已經退出。
go func(id int) { wg.Add(1) defer wg.Done() }
可在多處使用wait阻塞,它們都可以接收到通知。
package main import ( "fmt" "sync" "time" ) func main() { var wg sync.WaitGroup wg.Add(1) go func() { wg.Wait() fmt.Println("wait exit") }() go func() { time.Sleep(time.Second * 3) fmt.Println("done.") wg.Done() }() wg.Wait() fmt.Println("main exit.") } /* done. wait exit main exit. */
GOMAXPROCS 執行緒數量
執行時可能會建立很多執行緒,但任何時候僅有限的幾個執行緒參與併發任務執行。
該數量預設與處理器核數相等,可用runtime.GOMAXPROCS函式(或環境變數)修改。
package main import ( "fmt" "math" "runtime" "sync" "time" ) //測試目標函式 func count() { x := 0 for i := 0; i < math.MaxUint32; i++ { x += 1 } fmt.Println(x) } //迴圈執行 func test(n int) { for i := 0; i < n; i++ { count() } } //併發執行 func test2(n int) { var wg sync.WaitGroup wg.Add(n) //計數為4 for i := 0; i < n; i++ { go func() { count() wg.Done() }() } wg.Wait() } func main() { n := runtime.GOMAXPROCS(0) //4 fmt.Println(time.Now()) // test(n) //6s test2(n) //3s fmt.Println(time.Now()) }
Local Storage 區域性儲存器
與執行緒不同,goroutine任務無法設定優先順序,無法獲取編號,沒有區域性儲存(TLS),甚至連返回值都會被拋棄。
package main import ( "fmt" "sync" ) func main() { var wg sync.WaitGroup var gs [5]struct { //使用陣列來進行區域性儲存 id int //編號 result int //返回值 } fmt.Println(gs) for i := 0; i < len(gs); i++ { wg.Add(1) go func(id int) { defer wg.Done() gs[id].id = id gs[id].result = (id + 1) * 100 }(i) } wg.Wait() fmt.Printf("%+v\n", gs) } /* [{0 0} {0 0} {0 0} {0 0} {0 0}] [{id:0 result:100} {id:1 result:200} {id:2 result:300} {id:3 result:400} {id:4 result:500}] */
如果使用map作為區域性儲存容器,建議做同步處理,因為執行時會對其做併發讀寫檢查。
Gosched
暫停,釋放執行緒去執行其它任務。當前任務被放回佇列,等待下次排程時恢復執行
package main import ( "fmt" "runtime" ) func main() { runtime.GOMAXPROCS(1) exit := make(chan struct{}) go func() { /任務a defer close(exit) go func() { //任務b fmt.Println("b") }() for i := 0; i < 4; i++ { fmt.Println("a:", i) if i == 1 { runtime.Gosched() //讓出當前執行緒,排程執行b } } }() <-exit } /* a: 0 a: 1 b //這個b有可能打印不出來 a: 2 a: 3 */
該函式很少被使用,因為執行時會主動向長時間執行的任務發出搶佔排程。
Goexit
立即終止當前任務,執行時確保所有已註冊延遲呼叫被執行。
該函式不會影響其它併發任務,不會引發panic,自然也就無法捕獲。
package main import ( "fmt" "runtime" ) func main() { exit := make(chan struct{}) go func() { defer close(exit) defer fmt.Println("a") //執行3 func() { defer func() { fmt.Println("b", recover() == nil) //defer總會執行 2 }() func() { fmt.Println("c") //執行1 runtime.Goexit() //立即終止當前任務 fmt.Println("c done.") //不會執行 }() fmt.Println("b done.") //不會執行 }() fmt.Println("a done.") //不會執行 }() <-exit fmt.Println("main exit") //主程式 } /* c b true a main exit */
如果在main裡呼叫Goexit,它會等其它任務結束,然後讓程序直接崩潰。
package main import ( "fmt" "runtime" "time" ) func main() { for i := 0; i < 2; i++ { go func(x int) { for n := 0; n < 2; n++ { fmt.Printf("%c: %d\n", 'a'+x, n) time.Sleep(time.Millisecond) } }(i) } runtime.Goexit() //等待所有任務結束 fmt.Println("main exit.") } /* b: 0 a: 0 a: 1 b: 1 fatal error: no goroutines (main called runtime.Goexit) - deadlock! */
無論身處那一層,Goexit都能立即終止整個呼叫堆疊,這與return僅退出當前函式不同。
標準庫函式os.Exit可終止程序,但不會執行延遲呼叫。