Go語言8-goroutine和channel
Goroutine
Go語言從語言層面上就支援了併發,這與其他語言大不一樣。Go語言中有個概念叫做goroutine,這類似我們熟知的執行緒,但是更輕。
程序、執行緒、協程
程序和執行緒
程序是程式在作業系統中的一次執行過程,系統進行資源分配和排程的一個獨立單位。
執行緒是程序的一個執行實體,是CPU排程和分派的基本單位,它是比程序更小的能獨立執行的基本單位。
一個程序可以建立和撤銷多個執行緒,同一個程序中的多個執行緒之間可以併發執行。
所以程式的型別可以分為以下幾種:
- 一個程序,它只有一個執行緒,就是單執行緒程式
- 一個程序,它又多個執行緒,就是多執行緒程式
- 一個程序,它可能還會fork多個子程序,就是多程序程式
併發和並行的區別
- 多執行緒程式在單核的cou上執行,這是併發(concurrency)。
- 多執行緒程式在多個核的cpu上執行(真正的同時執行),這才是並行(parallelism)。
併發,在微觀上,任意時刻只有一個程式在執行。因為執行緒已經是CPU排程的最小單元,一個CPU一次只能處理一個執行緒。但是巨集觀上這些程式時同時在那裡執行的,所以這個只是併發。
所以在python裡,貌似講的都是高併發,似乎沒聽過並行的概念。
協程和執行緒
協程,獨一的棧空間,共享堆空間,排程由使用者自己控制。本質上類似於使用者級執行緒,這些使用者級執行緒的排程也是自己實現的。
執行緒,一個執行緒上可以跑多個協程,協程是輕量級的執行緒。
goroutine 排程模型
Go的排程器模型:G-P-M模型。
- G代表goroutine,它通過go關鍵字呼叫runtime.newProc建立。
- P代表processer,可以理解為上下文。
- M表示machine,可以理解為作業系統的執行緒。
設定Golang執行的cpu核數
設定當前的程式執行在多少核上,下面的例子是獲取CPU的核數,然後執行在所有核上:
package main import ( "fmt" "runtime" ) func main() { num := runtime.NumCPU() runtim.GOMAXPROCS(num) fmt.Println(num) }
上面P的數目就是這裡GOMAXPROCS設定的數目,通常設定為CPU核數。
1.8版本以上的Golang,是不需要做上面的設定的,預設就是執行在所有的核上。當然還是可以設定一下,比如限制只能使用多少核。
goroutine的示例:
package main
import (
"fmt"
"time"
)
func example() {
var i int
for {
fmt.Println(i)
i++
time.Sleep(time.Millisecond * 30)
}
}
func main() {
go example() // 起一個goroutine
var j int
for j > -100 {
fmt.Println(j)
j--
time.Sleep(time.Millisecond * 100)
}
fmt.Println("執行結束")
}
Channel
不同goroutine之間要進行通訊,有下面2種方法:
- 全域性變數和鎖同步
- Channel
先講管道(channel),然後講 goroutine 和 channel 結合的一些用法。
這篇的channel可以參考下:
https://www.jianshu.com/p/24ede9e90490
全域性變數的實現示例
在下面的例子裡定義了變數 m 來實現goroutine之間的通訊:
package main
import (
"fmt"
"time"
"sync"
)
var (
m = make(map[int]uint64)
lock sync.Mutex
)
type task struct {
n int
}
func calc(t *task) {
var res uint64
res = 1
for i := 1; i <= t.n; i++ {
res *= uint64(i)
}
lock.Lock()
m[t.n] = res // 變數m用來存放結果,這樣主執行緒裡就能拿到m的值,操作要加鎖
lock.Unlock()
}
func main() {
for i := 0; i < 100; i++ {
t := &task{i}
go calc(t)
}
for j := 0; j < 10; j++ {
fmt.Printf("\r已執行%d秒", j)
time.Sleep(time.Second)
}
fmt.Println("\r執行完畢,輸出結果:")
lock.Lock()
for k, v := range m {
if v != 0 {
fmt.Printf("%d! = %v\n", k, v)
}
}
lock.Unlock()
}
channel 概念
channel的概念如下:
- 型別Unix中的管道(Pipe)
- 先進先出
- 執行緒安全,多個goroutine同時訪問,不需要加鎖
- channel是有型別的,一個整數的channel只能存放整數
channel 宣告
var 變數名 chan 型別
var test1 chan int
var test2 chan string
var tesr3 chan map[string]string
var test4 chan stu
var test5 chan *stu
只是宣告還不夠,使用前還要make,分配記憶體空間:
package main
import "fmt"
func main() {
var intChan chan int // 宣告
intChan = make(chan int, 10) // 初始化,長度是10
intChan <- 10 // 存入管道
n := <- intChan // 取出
fmt.Println(n)
}
定義訊號(空結構體)
有一些場景中,一些 goroutine 需要一直迴圈處理資訊,直到收到 quit 訊號。作為訊號,只需要隨便傳點什麼,並不關注具體的值。那麼可以選擇使用空結構體,像下面這樣定義了2個訊號:
msgCh := make(chan struct{})
quitCh := make(chan struct{})
// 傳訊號的方法
msgCh <- struct{}{} // 前面的 struct{} 是變數的型別,後面的 {} 則是做初始化傳入空值生成例項
quitCh <- struct{}{}
通過channel實現通訊
起一個goroutine往管道里存,再起一個goroutine從管道里把資料取出:
package main
import (
"fmt"
"time"
)
func write(ch chan int) {
var i int
for {
ch <- i
i ++
time.Sleep(time.Millisecond)
}
}
func read(ch chan int) {
for {
b := <- ch
fmt.Println(b)
}
}
func main() {
intChan := make(chan int, 10)
go write(intChan)
go read(intChan)
time.Sleep(time.Second * 5)
}
channel 的型別和阻塞
channel 分為不帶快取的 channel 和帶快取的 channel。
channel 一定要初始化後才能進行讀寫操作,否則會永久阻塞。這個不是這裡要講的重點,順便帶一下。
無快取的channle
初始化make的時候不傳入第二個引數設定容量就是:
ch := make(chan int)
從無快取的 channel 中讀取訊息會阻塞,直到有 goroutine 向該 channel 中傳送訊息;同理,向無快取的 channel 中傳送訊息也會阻塞,直到有 goroutine 從 channel 中讀取訊息。
有快取的 channel
有快取的 channel 的宣告方式為指定 make 函式的第二個引數,該引數為 channel 快取的容量:
ch := make(chan int, 10)
有快取的 channel 類似一個阻塞佇列(採用環形陣列實現)。當快取未滿時,向 channel 中傳送訊息時不會阻塞,當快取滿時,傳送操作將被阻塞,直到有其他 goroutine 從中讀取訊息;
相應的,當 channel 中訊息不為空時,讀取訊息不會出現阻塞,當 channel 為空時,讀取操作會造成阻塞,直到有 goroutine 向 channel 中寫入訊息。
緩衝區的大小
通過 len 函式可以獲得 chan 中的元素個數,通過 cap 函式可以得到 channel 的緩衝區長度。
無快取和緩衝區是1的差別
無快取的 channel 的 len和cap 始終都是0。
通過無快取的 channel 進行通訊時,接收者收到資料 happens before 傳送者 goroutine 喚醒
上面這句不好理解,不過可以先看下現象。
下面的這2行函式會報錯,說是死鎖。但是如果設定了 channel 的容量哪怕是1,就不會報錯的:
func main() {
ch := make(chan int)
ch <- 1
}
雖然容量1的channel也只能存1個數,但是無緩衝區的channel似乎1個數都存不了,除非馬上能取走:
func main() {
ch := make(chan int, 1)
// 要起一個goroutine可以馬上接收channel裡的資料
go func () {
fmt.Println(<- ch)
}()
ch <- 1
time.Sleep(time.Second) // 要給goroutine執行完成的時間
}
小結:無快取的channel需要有一個goroutine可以把channel裡的資料馬上取走。
channel之間的同步
在學習關閉channel之前,先看下下面的例子。由於沒有關閉channel,是會有問題的,不過例子裡都解決了。先看下不用關閉channel可以怎麼搞,然後再接著看關閉channel的用法:
package main
import (
"time"
"fmt"
)
func calc(taskChan chan int, resChan chan int) {
for v := range taskChan {
// 判斷是不是素數
flag := true
for i := 2; i < v; i++ {
if v % i == 0 {
flag = false
break
}
}
if flag {
resChan <- v
}
}
}
func main() {
intChan := make(chan int, 1000)
// 這個也是個goroutine
go func(){
for i := 2; i < 100000; i++ {
intChan <- i
}
}() // 管道滿了之後,這個匿名函式會阻塞,但是不影響程式繼續往下走
resultChan := make(chan int, 1000)
// 同時起8個goroutine
for i := 0; i < 8; i++ {
go calc(intChan, resultChan)
}
// 再起一個取結果的goroutine,不阻塞主執行緒
go func(){
for v := range resultChan{
fmt.Println("素數:", v)
}
}()
// 給上面的匿名函式幾秒鐘來輸出結果
time.Sleep(time.Second * 5)
}
上面的例子裡用了2個匿名函式,也都是起的goroutine。如果是在主執行緒裡直接for迴圈的話,那個for迴圈就會變成死鎖,程式不會自己往下走。所以執行在goroutine裡的死迴圈,在主執行緒退出後也就結束了,不會有問題。後一個匿名函式是對channel的進行遍歷,channel取空後,會進入阻塞,如果是執行在主執行緒裡的話也會形成死鎖。
range 遍歷
channel 也可以使用 range 取值,並且會一直從 channel 中讀取資料,直到有 goroutine 對改 channel 執行 close 操作,迴圈才會結束。
關閉 channel
golang 提供了內建的 close 函式對 channel 進行關閉操作:
ch := make(chan int)
close(ch)
關於 channel 的關閉,有以下的特點:
- 關閉一個未初始化(nil) 的 channel 會產生 panic
- 重複關閉同一個 channel 會產生 panic
- 向一個已關閉的 channel 中傳送訊息會產生 panic
- 可以從已關閉的 channel 裡繼續讀取訊息,若訊息均已讀出,則會讀到型別的零值。從一個已關閉的 channel 中讀取訊息不會阻塞,並且會返回一個為 false 的 ok-idiom,可以用它來判斷 channel 是否關閉
- 關閉 channel 會產生一個廣播機制,所有向 channel 讀取訊息的 goroutine 都會收到訊息
有2種方式可以把管道里的資料都取出來,但是都需要把管道關閉:
- 判斷管道已關閉並且取完了
- 遍歷管道
關閉channel然後讀取的示例:
package main
import "fmt"
func main() {
var ch chan int
ch = make(chan int, 5)
for i := 0; i < 5; i++ {
ch <- i
}
close(ch)
for {
var b int
b, ok := <- ch
fmt.Println(b, ok)
if ok == false {
break
}
}
}
/* 執行結果
PS H:\Go\src\go_dev\day8\channel\close_chan> go run main.go
0 true
1 true
2 true
3 true
4 true
0 false
PS H:\Go\src\go_dev\day8\channel\close_chan>
*/
上面輸出的最後一條,就是channel已經空了,讀出來的就是型別的0值,並且ok變false了。
遍歷channel的示例:
package main
import "fmt"
func main() {
var ch chan int
ch = make(chan int) // 這個管道沒有無快取
// 這個goroutine一次存一個,再存會阻塞,直到主執行緒後面的for迴圈遍歷的時候取走資料
// 存完100個數後,這裡的for迴圈結束,會關閉管道。主執行緒後面的for迴圈的遍歷就能正常結束了
go func () {
for i := 0; i < 100; i++ {
ch <- i
}
close(ch)
}()
for v := range ch {
fmt.Println(v)
}
}
判斷子執行緒結束
學到這裡,再也不需要用Sleep等待子執行緒結束了,可以通過管道實現。可以單獨定義一個專門用來判斷子執行緒結束的管道。子執行緒完成任務後,就傳個值給管道,主執行緒就阻塞的讀管道里的資訊,一旦讀到資訊,就說明子執行緒完成了,可以繼續執行或者退出了。如果起了多個子執行緒,則主執行緒就是用for迴圈多讀幾次,就能判斷出有多少子執行緒已經結束了。
channel 只讀、只寫
宣告只讀的channel:
var ch <-chan int
宣告只寫的channel:
var ch chan<- int
應用場景,管道需要能夠可讀可寫。但是可以限制它在某個函式裡的功能,也就是在定義函式的引數的時候,把管道的型別設定為只讀或只寫。或者把管道傳給結構體,結構體裡限制管道的讀寫限制?
下面是之前的一個例子,僅僅只是把2個函式在設定引數型別的時候把管道的讀寫限制加上了:
package main
import (
"fmt"
"time"
)
func write(ch chan<- int) {
var i int
for {
ch <- i
i ++
time.Sleep(time.Millisecond)
}
}
func read(ch <-chan int) {
for {
b := <- ch
fmt.Println(b)
}
}
func main() {
intChan := make(chan int, 10)
go write(intChan)
go read(intChan)
time.Sleep(time.Second * 5)
}
配合 select 使用
select 用法類似IO多路複用,可以同時監聽多個 channel 的訊息狀態,用法如下:
select {
case <- ch1:
...
case <- ch2:
...
case ch3 <- 10;
...
default:
...
}
select 可以同時監聽多個 channel 的寫入或讀取:
- 若只有一個 case 通過(不阻塞),則執行這個 case 塊
- 若有多個 case 通過,則隨機挑選一個 case 執行
- 若所有 case 均阻塞,則執行 default 塊。若未定義 default 塊,則 select 語句阻塞,直到有 case 被喚醒
- 使用 break 會跳出 select 塊
- select 不會迴圈,就只會執行一個塊然後就繼續往後執行了
select只會執行一次
這個例子只會輸出一次,隨機是1或者是2,然後接結束了:
package main
import "fmt"
func main() {
ch1 := make(chan int, 1)
ch1 <- 1
ch2 := make(chan int, 1)
ch2 <- 2
select {
case v := <- ch1:
fmt.Println(v)
case v := <- ch2:
fmt.Println(v)
default:
fmt.Println(0)
}
}
所以如果要把管道里的數取完,或者取多次,就要再套一層for迴圈。
for迴圈和break的效果
在select外面用for套了一層死迴圈,這樣就是反覆的執行select。不過break在這裡就沒效果了:
package main
import (
"fmt"
"time"
)
func main() {
var ch1, ch2 chan int
ch1 = make(chan int, 10)
ch2 = make(chan int, 10)
for i := 0; i < cap(ch1); i++ {
ch1 <- i
ch2 <- i * i
}
// LABEL1:
for {
select {
case v := <- ch1:
fmt.Println("ch1", v)
case v := <- ch2:
fmt.Println("ch2", v)
default:
fmt.Println("所有元素都已經取完")
break // 這個break沒有意義,因為值是跳出select,而不是for迴圈
// break LABEL1 // 這個break可以直接跳出for迴圈
}
time.Sleep(time.Second)
}
}
如果要跳出for迴圈,可以配合標籤。上面的程式碼裡已經寫好了只是註釋掉了。
定時器
定時器是在 time 包裡的,
package main
import (
"fmt"
"time"
)
func main() {
t := time.NewTicker(time.Second)
for v := range t.C {
fmt.Println(v)
}
}
上面呼叫的NewTicker()方法返回的是個結構體,如下:
type Ticker struct {
C <-chan Time // The channel on which the ticks are delivered.
// contains filtered or unexported fields
}
上面的例子裡遍歷了 t.C 就是一個channel。time包內部應該是會產生一個goroutine,每隔一段時間就傳一個數據進去。
設定超時時間
還有一個After()方法,和上面的方法是一樣的。不過這個方法直接返回管道,即 NewTimer(d).C 。而NewTimer()方法的管道在返回的結構體的屬性C裡。這個After()方法用起來更方便。結合select正好可以做成一個設定任務超時時間的功能:
package main
import (
"fmt"
"time"
)
func task(ch chan struct{}) {
time.Sleep(time.Second * 3)
ch <- struct{}{}
}
func main() {
ch := make(chan struct{}) // 定義好訊號的管道,傳遞空結構體
go task(ch) // 啟動一個任務
select {
case <- ch:
fmt.Println("任務執行結束")
case <- time.After(time.Second * 2): // 2秒後超時
fmt.Println("任務超時")
}
}
goroutine 中使用 recover
程式裡起的gorountine中如果panic了,並且這個goroutine裡面沒有捕獲錯誤的話,整個程式就會掛掉。
下面的程式會報錯(Panic),是gorountine裡的產生的錯誤:
package main
func divideZero(ch chan int) {
zero := 0
ch <- 1 / zero
}
func main() {
ch := make(chan int)
go divideZero(ch)
<- ch
}
在gorountine中執行錯誤了,是可以不影響其他執行緒和主執行緒的繼續執行的。所以,好的習慣是每當產生一個goroutine,就在開頭用defer插入recover, 這樣在出現panic的時候,就只是自己退出而不影響整個程式。下面是優化後的程式碼,加入了recover來捕獲錯誤:
package main
import "fmt"
func divideZero(ch chan int) {
defer func () {
if err := recover(); err != nil {
fmt.Println(err)
// 要給管道傳值,否則主執行緒從空管道里取值會阻塞,形成死鎖
ch <- 0
}
}()
zero := 0
ch <- 1 / zero
}
func main() {
ch := make(chan int)
go divideZero(ch)
<- ch
}
單元測試
測試用例的檔名必須以_test.go結尾,測試的函式也必須以Test開頭。符合命名規則,使用 go test 命令的時候就能自動執行測試用例。
這篇的單元測試比較粗糙,不過基本怎麼用,以及用法示例都簡單記下來了。
被測試的函式
先準備一個需要被測試的函式:
package main
import "fmt"
func get_fullname(first, last string) (fullname string) {
fullname = first + " " + last
return
}
func main() {
fullname := get_fullname("Barry", "Allen")
fmt.Println(fullname)
}
上面的 get_fullname() 函式就是接下來要進行單元測試的函式。
測試用例
package main
import "testing"
func TestName(t *testing.T) {
r := get_fullname("Sara", "Lance")
expect := "Sara Lance"
if r != expect {
t.Fatalf("ERROR: get_fullname expect: %s actual: %s", expect, r)
}
t.Log("測試成功")
}
執行測試
寫完測試用例,就可以執行測試了,使用命令 go test。輸出如下:
PS H:\Go\src\go_dev\day8\unit_test\name> go test
PASS
ok go_dev/day8/unit_test/name 0.058s
PS H:\Go\src\go_dev\day8\unit_test\name>
看到PASS了,但是t.Log()並沒有輸出,要看到更多資訊,要用帶上-v引數。使用命令 go test -v ,輸出如下:
PS H:\Go\src\go_dev\day8\unit_test\name> go test -v
=== RUN TestName
--- PASS: TestName (0.00s)
name_test.go:11: 測試成功
PASS
ok go_dev/day8/unit_test/name 0.053s
PS H:\Go\src\go_dev\day8\unit_test\name>
直接用go test命令,只顯示測試的結果。如果有多個測試用例,也只有一個結果。可以用-v引數看到詳細的資訊,每個測試用例的的結果都會打印出來。
如果某個測試失敗了,就會直接退出,不會繼續測試下去。