1. 程式人生 > >Go語言8-goroutine和channel

Go語言8-goroutine和channel

Goroutine

Go語言從語言層面上就支援了併發,這與其他語言大不一樣。Go語言中有個概念叫做goroutine,這類似我們熟知的執行緒,但是更輕。

程序、執行緒、協程

程序和執行緒
程序是程式在作業系統中的一次執行過程,系統進行資源分配和排程的一個獨立單位。
執行緒是程序的一個執行實體,是CPU排程和分派的基本單位,它是比程序更小的能獨立執行的基本單位。
一個程序可以建立和撤銷多個執行緒,同一個程序中的多個執行緒之間可以併發執行。
所以程式的型別可以分為以下幾種:

  • 一個程序,它只有一個執行緒,就是單執行緒程式
  • 一個程序,它又多個執行緒,就是多執行緒程式
  • 一個程序,它可能還會fork多個子程序,就是多程序程式

併發和並行的區別

  • 多執行緒程式在單核的cou上執行,這是併發(concurrency)。
  • 多執行緒程式在多個核的cpu上執行(真正的同時執行),這才是並行(parallelism)。

併發,在微觀上,任意時刻只有一個程式在執行。因為執行緒已經是CPU排程的最小單元,一個CPU一次只能處理一個執行緒。但是巨集觀上這些程式時同時在那裡執行的,所以這個只是併發。
所以在python裡,貌似講的都是高併發,似乎沒聽過並行的概念。

協程和執行緒
協程,獨一的棧空間,共享堆空間,排程由使用者自己控制。本質上類似於使用者級執行緒,這些使用者級執行緒的排程也是自己實現的。
執行緒,一個執行緒上可以跑多個協程,協程是輕量級的執行緒。

goroutine 排程模型

Go的排程器模型:G-P-M模型。

  • G代表goroutine,它通過go關鍵字呼叫runtime.newProc建立。
  • P代表processer,可以理解為上下文。
  • M表示machine,可以理解為作業系統的執行緒。

設定Golang執行的cpu核數
設定當前的程式執行在多少核上,下面的例子是獲取CPU的核數,然後執行在所有核上:

package main

import (
    "fmt"
    "runtime"
)

func main() {
    num := runtime.NumCPU()
    runtim.GOMAXPROCS(num)
    fmt.Println(num)
}

上面P的數目就是這裡GOMAXPROCS設定的數目,通常設定為CPU核數。
1.8版本以上的Golang,是不需要做上面的設定的,預設就是執行在所有的核上。當然還是可以設定一下,比如限制只能使用多少核。

goroutine的示例:

package main

import (
    "fmt"
    "time"
)

func example() {
    var i int
    for {
        fmt.Println(i)
        i++
        time.Sleep(time.Millisecond * 30)
    }
}

func main() {
    go example()  // 起一個goroutine
    var j int
    for j > -100 {
        fmt.Println(j)
        j--
        time.Sleep(time.Millisecond * 100)
    }
    fmt.Println("執行結束")
}

Channel

不同goroutine之間要進行通訊,有下面2種方法:

  • 全域性變數和鎖同步
  • Channel

先講管道(channel),然後講 goroutine 和 channel 結合的一些用法。
這篇的channel可以參考下:
https://www.jianshu.com/p/24ede9e90490

全域性變數的實現示例

在下面的例子裡定義了變數 m 來實現goroutine之間的通訊:

package main

import (
    "fmt"
    "time"
    "sync"
)

var (
    m = make(map[int]uint64)
    lock sync.Mutex
)

type task struct {
    n int
}

func calc(t *task) {
    var res uint64
    res = 1
    for i := 1; i <= t.n; i++ {
        res *= uint64(i)
    }
    lock.Lock()
    m[t.n] = res  // 變數m用來存放結果,這樣主執行緒裡就能拿到m的值,操作要加鎖
    lock.Unlock()
}

func main() {
    for i := 0; i < 100; i++ {
        t := &task{i}
        go calc(t)
    }
    for j := 0; j < 10; j++ {
        fmt.Printf("\r已執行%d秒", j)
        time.Sleep(time.Second)
    }
    fmt.Println("\r執行完畢,輸出結果:")
    lock.Lock()
    for k, v := range m {
        if v != 0 {
            fmt.Printf("%d! = %v\n", k, v)
        }
    }
    lock.Unlock()
}

channel 概念

channel的概念如下:

  • 型別Unix中的管道(Pipe)
  • 先進先出
  • 執行緒安全,多個goroutine同時訪問,不需要加鎖
  • channel是有型別的,一個整數的channel只能存放整數

channel 宣告
var 變數名 chan 型別

var test1 chan int
var test2 chan string
var tesr3 chan map[string]string
var test4 chan stu
var test5 chan *stu

只是宣告還不夠,使用前還要make,分配記憶體空間:

package main

import "fmt"

func main() {
    var intChan chan int  // 宣告
    intChan = make(chan int, 10)  // 初始化,長度是10
    intChan <- 10  // 存入管道
    n := <- intChan  // 取出
    fmt.Println(n)
}

定義訊號(空結構體)
有一些場景中,一些 goroutine 需要一直迴圈處理資訊,直到收到 quit 訊號。作為訊號,只需要隨便傳點什麼,並不關注具體的值。那麼可以選擇使用空結構體,像下面這樣定義了2個訊號:

msgCh := make(chan struct{})
quitCh := make(chan struct{})
// 傳訊號的方法
msgCh <- struct{}{}  // 前面的 struct{} 是變數的型別,後面的 {} 則是做初始化傳入空值生成例項
quitCh <- struct{}{}

通過channel實現通訊

起一個goroutine往管道里存,再起一個goroutine從管道里把資料取出:

package main

import (
    "fmt"
    "time"
)

func write(ch chan int) {
    var i int
    for {
        ch <- i
        i ++
        time.Sleep(time.Millisecond)
    }
}

func read(ch chan int) {
    for {
        b := <- ch
        fmt.Println(b)
    }
}

func main() {
    intChan := make(chan int, 10)
    go write(intChan)
    go read(intChan)
    time.Sleep(time.Second * 5)
}

channel 的型別和阻塞

channel 分為不帶快取的 channel 和帶快取的 channel。
channel 一定要初始化後才能進行讀寫操作,否則會永久阻塞。這個不是這裡要講的重點,順便帶一下。

無快取的channle
初始化make的時候不傳入第二個引數設定容量就是:

ch := make(chan int)

從無快取的 channel 中讀取訊息會阻塞,直到有 goroutine 向該 channel 中傳送訊息;同理,向無快取的 channel 中傳送訊息也會阻塞,直到有 goroutine 從 channel 中讀取訊息。

有快取的 channel
有快取的 channel 的宣告方式為指定 make 函式的第二個引數,該引數為 channel 快取的容量:

ch := make(chan int, 10)

有快取的 channel 類似一個阻塞佇列(採用環形陣列實現)。當快取未滿時,向 channel 中傳送訊息時不會阻塞,當快取滿時,傳送操作將被阻塞,直到有其他 goroutine 從中讀取訊息;
相應的,當 channel 中訊息不為空時,讀取訊息不會出現阻塞,當 channel 為空時,讀取操作會造成阻塞,直到有 goroutine 向 channel 中寫入訊息。

緩衝區的大小
通過 len 函式可以獲得 chan 中的元素個數,通過 cap 函式可以得到 channel 的緩衝區長度。

無快取和緩衝區是1的差別
無快取的 channel 的 len和cap 始終都是0。

通過無快取的 channel 進行通訊時,接收者收到資料 happens before 傳送者 goroutine 喚醒

上面這句不好理解,不過可以先看下現象。
下面的這2行函式會報錯,說是死鎖。但是如果設定了 channel 的容量哪怕是1,就不會報錯的:

func main() {
    ch := make(chan int)
    ch <- 1
}

雖然容量1的channel也只能存1個數,但是無緩衝區的channel似乎1個數都存不了,除非馬上能取走:

func main() {
    ch := make(chan int, 1)
    // 要起一個goroutine可以馬上接收channel裡的資料
    go func () {
        fmt.Println(<- ch)
    }()
    ch <- 1
    time.Sleep(time.Second)  // 要給goroutine執行完成的時間
}

小結:無快取的channel需要有一個goroutine可以把channel裡的資料馬上取走。

channel之間的同步

在學習關閉channel之前,先看下下面的例子。由於沒有關閉channel,是會有問題的,不過例子裡都解決了。先看下不用關閉channel可以怎麼搞,然後再接著看關閉channel的用法:

package main

import (
    "time"
    "fmt"
)

func calc(taskChan chan int, resChan chan int) {
    for v := range taskChan {
        // 判斷是不是素數
        flag := true
        for i := 2; i < v; i++ {
            if v % i == 0 {
                flag = false
                break
            }
        }
        if flag {
            resChan <- v
        }
    }
}

func main() {
    intChan := make(chan int, 1000)
    // 這個也是個goroutine
    go func(){
        for i := 2; i < 100000; i++ {
            intChan <- i
        }
    }()  // 管道滿了之後,這個匿名函式會阻塞,但是不影響程式繼續往下走

    resultChan := make(chan int, 1000)
    // 同時起8個goroutine
    for i := 0; i < 8; i++ {
        go calc(intChan, resultChan)
    }

    // 再起一個取結果的goroutine,不阻塞主執行緒
    go func(){
        for v := range resultChan{
            fmt.Println("素數:", v)
        }
    }()
    // 給上面的匿名函式幾秒鐘來輸出結果
    time.Sleep(time.Second * 5)
}

上面的例子裡用了2個匿名函式,也都是起的goroutine。如果是在主執行緒裡直接for迴圈的話,那個for迴圈就會變成死鎖,程式不會自己往下走。所以執行在goroutine裡的死迴圈,在主執行緒退出後也就結束了,不會有問題。後一個匿名函式是對channel的進行遍歷,channel取空後,會進入阻塞,如果是執行在主執行緒裡的話也會形成死鎖。
range 遍歷
channel 也可以使用 range 取值,並且會一直從 channel 中讀取資料,直到有 goroutine 對改 channel 執行 close 操作,迴圈才會結束。

關閉 channel

golang 提供了內建的 close 函式對 channel 進行關閉操作:

ch := make(chan int)
close(ch)

關於 channel 的關閉,有以下的特點:

  • 關閉一個未初始化(nil) 的 channel 會產生 panic
  • 重複關閉同一個 channel 會產生 panic
  • 向一個已關閉的 channel 中傳送訊息會產生 panic
  • 可以從已關閉的 channel 裡繼續讀取訊息,若訊息均已讀出,則會讀到型別的零值。從一個已關閉的 channel 中讀取訊息不會阻塞,並且會返回一個為 false 的 ok-idiom,可以用它來判斷 channel 是否關閉
  • 關閉 channel 會產生一個廣播機制,所有向 channel 讀取訊息的 goroutine 都會收到訊息

有2種方式可以把管道里的資料都取出來,但是都需要把管道關閉:

  • 判斷管道已關閉並且取完了
  • 遍歷管道

關閉channel然後讀取的示例:

package main

import "fmt"

func main() {
    var ch chan int
    ch = make(chan int, 5)
    for i := 0; i < 5; i++ {
        ch <- i
    }
    close(ch)
    for {
        var b int
        b, ok := <- ch
        fmt.Println(b, ok)
        if ok == false {
            break
        }
    }
}

/* 執行結果
PS H:\Go\src\go_dev\day8\channel\close_chan> go run main.go
0 true
1 true
2 true
3 true
4 true
0 false
PS H:\Go\src\go_dev\day8\channel\close_chan>
*/

上面輸出的最後一條,就是channel已經空了,讀出來的就是型別的0值,並且ok變false了。

遍歷channel的示例:

package main

import "fmt"

func main() {
    var ch chan int
    ch = make(chan int)  // 這個管道沒有無快取
    // 這個goroutine一次存一個,再存會阻塞,直到主執行緒後面的for迴圈遍歷的時候取走資料
    // 存完100個數後,這裡的for迴圈結束,會關閉管道。主執行緒後面的for迴圈的遍歷就能正常結束了
    go func () {
        for i := 0; i < 100; i++ {
            ch <- i
        }
        close(ch)
    }()
    for v := range ch {
        fmt.Println(v)
    }
}

判斷子執行緒結束
學到這裡,再也不需要用Sleep等待子執行緒結束了,可以通過管道實現。可以單獨定義一個專門用來判斷子執行緒結束的管道。子執行緒完成任務後,就傳個值給管道,主執行緒就阻塞的讀管道里的資訊,一旦讀到資訊,就說明子執行緒完成了,可以繼續執行或者退出了。如果起了多個子執行緒,則主執行緒就是用for迴圈多讀幾次,就能判斷出有多少子執行緒已經結束了。

channel 只讀、只寫

宣告只讀的channel:

var ch <-chan int

宣告只寫的channel:

var ch chan<- int

應用場景,管道需要能夠可讀可寫。但是可以限制它在某個函式裡的功能,也就是在定義函式的引數的時候,把管道的型別設定為只讀或只寫。或者把管道傳給結構體,結構體裡限制管道的讀寫限制?
下面是之前的一個例子,僅僅只是把2個函式在設定引數型別的時候把管道的讀寫限制加上了:

package main

import (
    "fmt"
    "time"
)

func write(ch chan<- int) {
    var i int
    for {
        ch <- i
        i ++
        time.Sleep(time.Millisecond)
    }
}

func read(ch <-chan int) {
    for {
        b := <- ch
        fmt.Println(b)
    }
}

func main() {
    intChan := make(chan int, 10)
    go write(intChan)
    go read(intChan)
    time.Sleep(time.Second * 5)
}

配合 select 使用

select 用法類似IO多路複用,可以同時監聽多個 channel 的訊息狀態,用法如下:

select {
    case <- ch1:
    ...
    case <- ch2:
    ...
    case ch3 <- 10;
    ...
    default:
    ...
}

select 可以同時監聽多個 channel 的寫入或讀取:

  • 若只有一個 case 通過(不阻塞),則執行這個 case 塊
  • 若有多個 case 通過,則隨機挑選一個 case 執行
  • 若所有 case 均阻塞,則執行 default 塊。若未定義 default 塊,則 select 語句阻塞,直到有 case 被喚醒
  • 使用 break 會跳出 select 塊
  • select 不會迴圈,就只會執行一個塊然後就繼續往後執行了

select只會執行一次
這個例子只會輸出一次,隨機是1或者是2,然後接結束了:

package main

import "fmt"

func main() {
    ch1 := make(chan int, 1)
    ch1 <- 1
    ch2 := make(chan int, 1)
    ch2 <- 2
    select {
    case v := <- ch1:
        fmt.Println(v)
    case v := <- ch2:
        fmt.Println(v)
    default:
        fmt.Println(0)
    }
}

所以如果要把管道里的數取完,或者取多次,就要再套一層for迴圈。

for迴圈和break的效果
在select外面用for套了一層死迴圈,這樣就是反覆的執行select。不過break在這裡就沒效果了:

package main

import (
    "fmt"
     "time"
)

func main() {
    var ch1, ch2 chan int
    ch1 = make(chan int, 10)
    ch2 = make(chan int, 10)
    for i := 0; i < cap(ch1); i++ {
        ch1 <- i
        ch2 <- i * i
    }

    // LABEL1:
    for {
        select {
        case v := <- ch1:
            fmt.Println("ch1", v)
        case v := <- ch2:
            fmt.Println("ch2", v)
        default:
            fmt.Println("所有元素都已經取完")
            break  // 這個break沒有意義,因為值是跳出select,而不是for迴圈
            // break LABEL1  // 這個break可以直接跳出for迴圈
        }
        time.Sleep(time.Second)
    }
}

如果要跳出for迴圈,可以配合標籤。上面的程式碼裡已經寫好了只是註釋掉了。

定時器

定時器是在 time 包裡的,

package main

import (
    "fmt"
    "time"
)

func main() {
    t := time.NewTicker(time.Second)
    for v := range t.C {
        fmt.Println(v)
    }
}

上面呼叫的NewTicker()方法返回的是個結構體,如下:

type Ticker struct {
    C <-chan Time // The channel on which the ticks are delivered.
    // contains filtered or unexported fields
}

上面的例子裡遍歷了 t.C 就是一個channel。time包內部應該是會產生一個goroutine,每隔一段時間就傳一個數據進去。

設定超時時間
還有一個After()方法,和上面的方法是一樣的。不過這個方法直接返回管道,即 NewTimer(d).C 。而NewTimer()方法的管道在返回的結構體的屬性C裡。這個After()方法用起來更方便。結合select正好可以做成一個設定任務超時時間的功能:

package main

import (
    "fmt"
    "time"
)

func task(ch chan struct{}) {
    time.Sleep(time.Second * 3)
    ch <- struct{}{}
}

func main() {
    ch := make(chan struct{})  // 定義好訊號的管道,傳遞空結構體
    go task(ch)  // 啟動一個任務
    select {
    case <- ch:
        fmt.Println("任務執行結束")
    case <- time.After(time.Second * 2):  // 2秒後超時
        fmt.Println("任務超時")
    }
}

goroutine 中使用 recover

程式裡起的gorountine中如果panic了,並且這個goroutine裡面沒有捕獲錯誤的話,整個程式就會掛掉。
下面的程式會報錯(Panic),是gorountine裡的產生的錯誤:

package main

func divideZero(ch chan int) {
    zero := 0
    ch <- 1 / zero
}

func main() {
    ch := make(chan int)
    go divideZero(ch)
    <- ch
}

在gorountine中執行錯誤了,是可以不影響其他執行緒和主執行緒的繼續執行的。所以,好的習慣是每當產生一個goroutine,就在開頭用defer插入recover, 這樣在出現panic的時候,就只是自己退出而不影響整個程式。下面是優化後的程式碼,加入了recover來捕獲錯誤:

package main

import "fmt"

func divideZero(ch chan int) {
    defer func () {
        if err := recover(); err != nil {
            fmt.Println(err)
            // 要給管道傳值,否則主執行緒從空管道里取值會阻塞,形成死鎖
            ch <- 0
        }
    }()
    zero := 0
    ch <- 1 / zero
}

func main() {
    ch := make(chan int)
    go divideZero(ch)
    <- ch
}

單元測試

測試用例的檔名必須以_test.go結尾,測試的函式也必須以Test開頭。符合命名規則,使用 go test 命令的時候就能自動執行測試用例。
這篇的單元測試比較粗糙,不過基本怎麼用,以及用法示例都簡單記下來了。

被測試的函式

先準備一個需要被測試的函式:

package main

import "fmt"

func get_fullname(first, last string) (fullname string) {
    fullname = first + " " + last
    return
} 

func main() {
    fullname := get_fullname("Barry", "Allen")
    fmt.Println(fullname)
}

上面的 get_fullname() 函式就是接下來要進行單元測試的函式。

測試用例

package main

import "testing"

func TestName(t *testing.T) {
    r := get_fullname("Sara", "Lance")
    expect := "Sara Lance"
    if r != expect {
        t.Fatalf("ERROR: get_fullname expect: %s actual: %s", expect, r)
    }
    t.Log("測試成功")
}

執行測試

寫完測試用例,就可以執行測試了,使用命令 go test。輸出如下:

PS H:\Go\src\go_dev\day8\unit_test\name> go test
PASS
ok      go_dev/day8/unit_test/name      0.058s
PS H:\Go\src\go_dev\day8\unit_test\name>

看到PASS了,但是t.Log()並沒有輸出,要看到更多資訊,要用帶上-v引數。使用命令 go test -v ,輸出如下:

PS H:\Go\src\go_dev\day8\unit_test\name> go test -v
=== RUN   TestName
--- PASS: TestName (0.00s)
        name_test.go:11: 測試成功
PASS
ok      go_dev/day8/unit_test/name      0.053s
PS H:\Go\src\go_dev\day8\unit_test\name>

直接用go test命令,只顯示測試的結果。如果有多個測試用例,也只有一個結果。可以用-v引數看到詳細的資訊,每個測試用例的的結果都會打印出來。
如果某個測試失敗了,就會直接退出,不會繼續測試下去。