1. 程式人生 > 其它 >Go併發程式設計

Go併發程式設計

Go併發程式設計

目錄

本文是作者學習Go併發程式設計的筆記總結,主要內容是Go併發程式設計的示例程式碼,下面是與本文相關的連結:

Go基礎部分

本文中包含的程式碼段的完整程式碼可以去作者的

Github下載

程序、執行緒、協程、Go程的解釋

說明:

在Go併發程式設計的學習過程中,除了具體如何使用Go實現併發程式設計外,還包括程序、執行緒、協程、生產者消費者模型、互斥量、鎖、條件變數等,下文並不會詳細說明這些概念,如果有想要詳細瞭解這些內容,可以去看Unix系統程式設計和Unix網路程式設計這兩本書。

Go程

Go在語言級別支援協程,叫goroutine。Go語言標準庫提供的所有系統呼叫操作(包括所有同步IO操作),都會出讓CPU給其他goroutine。這讓輕量級執行緒的切換管理不依賴於系統的執行緒和程序,也不需要依賴於CPU的核心數量。

Go程的建立與使用

建立時只需要使用關鍵字 go

func sing() {
    for i := 0; i < 5; i++ {
        fmt.Println("Sing something...")
        time.Sleep(100 * time.Millisecond)
    }
}


func dance() {
    for i := 0; i < 5; i++ {
        fmt.Println("Someone dancing...")
        time.Sleep(100 * time.Millisecond)
    }
}

func main() {
    fmt.Println("順序執行")
    sing()
    dance()
    // 併發執行
    fmt.Println("併發執行")
    go sing()
    go dance()

    for {
        ;
    }
}

Go程使用的相關函式說明:

Gosched()、GOMAXPROCS()

// runtime.Gosched() 出讓當前go程所佔用的cpu時間片
// runtime.Goexit() 結束呼叫該函式的當前go程

func main() {
    go func() {
        for i := 0; i < 10; i++ {
            fmt.Println("this is a goroutine test")
            //time.Sleep(100 * time.Microsecond)
        }
    }()

    for {
        runtime.Gosched()
        fmt.Println("this is a main test")
        //time.Sleep(100 * time.Microsecond)
    }
}
func main() {
    // 建立一個子go程
    go func () {
        for i := 0; i < 5; i++ {
            fmt.Println("-----I am Goroutine-----")
            time.Sleep(time.Second)
        }
    }()

    // 主Goroutine
    for i := 0; i < 5; i++ {
        fmt.Println("-----I am main-----")
        time.Sleep(time.Second)
        if i == 2 {
             break
        }
    }

     
    // 主Goroutine退出 子go程也會退出
    /*
    -----I am main-----
    -----I am Goroutine-----
    -----I am Goroutine-----
    -----I am main-----
    -----I am main-----
    -----I am Goroutine-----
    -----I am Goroutine-----
    */

    // runtime.Gosched() 出讓當前go程所佔用的cpu時間片
}
// runtime.GOMAXPROCS(n) 設定當前程序使用的最大cpu核心數 返回上一次呼叫成功的設定值 首次呼叫返回預設值

func main() {
    n := runtime.GOMAXPROCS(2)// 將CPU設為雙核
    fmt.Println(n)

    for {
        go fmt.Print(0)// 子go程
        fmt.Print(1)// 主go程
    }
}

Channel

channel是一種資料型別(管道),主要用於解決go程同步問題以及協程之間資料共享的問題。

特點:一端寫一端讀

Channel的定義與使用

/*
make(chan 在channel中傳遞的資料型別, 容量)
容量為0表示無緩衝
容量大於0表示有緩衝
*/
// 全域性定義channel 完成資料同步
var channel = make(chan int)

func printer(s string) {
    for _, ch := range s {
        fmt.Printf("%c", ch)
        time.Sleep(300 * time.Millisecond)
    }
}

// 先執行
func person1() {
    printer("Hello")
    channel <- 1// 向channel寫資料 如果寫的資料沒有被讀走 channel阻塞
}

// 後執行
func person2() {
    <- channel// 從channel讀 
    printer("World")
}

func main() {
    go person1()
    go person2()
    // 輸出WHeorllldo person1 person2 交替使用標準輸出 導致輸出結果亂序

    for {
        ;
    }
}

Channel同步傳遞資料

func main() {
    ch := make(chan string)
    // len() 得到channel中剩餘未讀取資料個數
    // cap() 得到通道容量
    //fmt.Println("len(ch) =", len(ch), "cap(ch) =", cap(ch))

    go func () {
        for i := 0; i < 5; i++ {
            fmt.Println("i = ", i)
        }
        // 通知主go列印完畢
        ch <- "Completed..."
        fmt.Println("len(ch) =", len(ch), "cap(ch) =", cap(ch))
    }()
    
    //fmt.Println("len(ch) =", len(ch), "cap(ch) =", cap(ch))
    str := <- ch
    //fmt.Println("len(ch) =", len(ch), "cap(ch) =", cap(ch))
    fmt.Println("主go", str)
}

無緩衝Channel和有緩衝Channel

func main() {
    ch := make(chan int)

    go func() {
        for i := 0; i < 5; i++ {
            fmt.Println("子go程寫, i =", i)
            ch <- i
        }
    }()

    //time.Sleep(time.Second * 2)

    for i := 0; i < 5; i++ {
        num := <- ch
        fmt.Println("主go程讀,i = ", num)
    }
}
func main() {
    ch := make(chan int, 3)// 存滿3個元素之前不會阻塞
    fmt.Println("len =", len(ch), "cap =", cap(ch))

    go func() {
        for i := 0; i < 5; i++ {
            ch <- i
            fmt.Println("子go程:", i)
            fmt.Println("len =", len(ch), "cap =", cap(ch))
        }
    }()
    time.Sleep(time.Second * 3)
    for i := 0; i < 5; i++ {
        num := <- ch
        fmt.Println("主go程:", num)
        fmt.Println("len =", len(ch), "cap =", cap(ch))
    }
}

Channel的關閉

func main() {
    ch := make(chan int)

    go func() {
        for i := 0; i < 8; i++ {
            ch <- i
        }
        close(ch)// 寫端寫完資料 主動關閉channel
    }()

    for {
        // 檢測對端是否關閉
        if num, ok := <- ch; ok == true {// ok == true, 讀到資料
            fmt.Println("Read num =", num)
        } else {// channel已經關閉
            break
        }
    }

    // 或者換種寫法
    // for num := range ch {}
}

/*
資料未傳送完不應該關閉channel
無緩衝channel 無法向已經關閉的channel中寫資料 但是還可以讀
*/

單向Channel的使用

有時在函式中,我們只需要從channel中讀取資料或者寫入資料,這時我們可以使用單向channel

/*
func main() {
    // 雙向channel 預設
    ch := make(chan int)

    var sendCh chan <- int// 單向寫channel
    // 可以將雙向channel轉換為單向channel 但是反之不行
    sendCh = ch
    sendCh <- 754

    // 出錯 單向寫channel不能讀
    //num := <- sendCh

    var recvCh <- chan int = ch// 單向讀channel
    num := <- recvCh
    fmt.Println(num)

    // 反向賦值 出錯
    //var ch2 chan int = sendCh
}
*/
func send(out chan <- int) {
    out <- 88
    close(out)
}

func recv(in <- chan int) {
    n := <- in
    fmt.Println("Recv num =", n)
}

func main() {
    ch := make(chan int)// 雙向channel
    
    go func(){
        send(ch)// 雙向channel轉為寫channel
    }()

    recv(ch)
}

使用Channel實現生產者消費者模型

生產者:傳送端
消費者:接收端

緩衝區作用:
解耦(降低生產者與消費者之間的耦合度)
併發(生產者與消費者數量不對等時 能保持正常通訊)
快取(生產者與消費者資料處理速度不一致時 暫存資料)

func producer(out chan <- int) {
    for i := 0; i < 10; i++ {
        fmt.Println("producer send", i * i)
        out <- i * i
    }
    close(out)
}

func consumer(in <- chan int) {
    for num := range in {
        fmt.Println("consumer recv", num)
    }
}

func main() {
    // 無緩衝channel實現生產者消費者
    //ch := make(chan int)
    // 有緩衝
    ch := make(chan int, 5)

    go producer(ch)// 子go程作為生產者
    consumer(ch)
}

定時器

建立定時器

func main() {
    fmt.Println("Now time:", time.Now())
    // 建立定時器
    myTimer := time.NewTimer(time.Second * 2)
    nowTime := <- myTimer.C// 當系統向定時器中的channel寫完後 再從中讀
    fmt.Println("Now time:", nowTime)
}
/*
time.Timer
定時器,由channel實現,當設定的時間到達時,系統會向定時器中的channel寫
type Timer struct {
    C <- chan Time
    r runtimeTimer
}
*/

定時的3種方式

// 定時器的停止和重置
func main() {
    myTimer := time.NewTimer(time.Second * 3)
    myTimer.Reset(1 * time.Second)// 重置定時器
    go func() {
        <- myTimer.C
        fmt.Println("子go程讀取定時完畢")
    }()

    //myTimer.Stop()// 設定定時器停止 子go程無法從定時器讀到任何資料
    for {
        ;
    }
}

週期定時

func main() {
    // 建立一個是否終止的channel
    quit := make(chan bool)
    fmt.Println("now:", time.Now())
    // 週期定時 每隔一秒 系統會向Ticker.C寫一次
    myTicker := time.NewTicker(time.Second)
    i := 0
    go func() {
        for {
            nowTime := <- myTicker.C
            i++
            fmt.Println("nowTime:", nowTime)

            // 定時器迴圈了五次後 向quit寫資料
            // 主go程從quit讀到資料後 程式退出
            if i == 5 {
                quit <- true
            }
        }
    }()

    <- quit
}

select

使用select監聽每個channel

select的使用

func main() {
    ch := make(chan int)// 用於資料通訊的channel
    quit := make(chan bool)// 用於判斷是否退出的channel

    // 子go寫資料
    go func() {
        for i := 0; i < 5; i++ {
            ch <- i
            time.Sleep(time.Second)
        }
        close(ch)// ch 雖然關閉 但是還可以讀到0
        quit <- true
    }()

    // 主go讀資料
    for {
        // select下的case中 若果某個case可讀 則執行 
        // 如果所有case都不可讀 則阻塞在select
        // case中有多個滿足監聽條件 任選一個執行
        // 可以使用default來處理所有case都不滿足監聽條件的狀況 通常不會這麼使用  會產生忙等待
        // select自身不帶有迴圈機制 需要藉助外層for迴圈來監聽
        // break只能跳出select
        select {
        case num := <- ch:
            fmt.Println("Read:", num)
        case <- quit:// quit 可讀 退出for
            fmt.Println("quit")
            // break跳出的是select
            //break
            return
        }
        // select執行後執行
        fmt.Println("-----------------")
    }
}

select超時處理

func main() {
    ch := make(chan int)
    quit := make(chan bool)

    go func() {
        for {
            select {
            case num := <- ch:
                fmt.Println("Read:", num)
            case <- time.After(3 * time.Second):// 超過3秒還沒讀到資料
                quit <- true
            }
        }
    }()

    for i := 0; i < 5; i++ {
        ch <- i
        time.Sleep(time.Second)
    }

    <- quit
    fmt.Println("quit")
}

同步相關

使用channel產生死鎖

// 單go程自己死鎖
// channel應該在至少兩個以上go程中進行通訊 否則死鎖
func main1() {
    ch := make(chan int)

    // fatal error: all goroutines are asleep - deadlock
    ch <- 748// 程式死鎖 卡在這一步 等待ch被讀取 而不會執行下面讀取ch的那一步

    num := <- ch
    fmt.Println("Read:", num)

}

// go程間channel訪問順序導致死鎖
// 使用channel時 讀寫兩端要有同時有機會執行
func main2() {
    ch := make(chan int)
    num := <- ch// 死鎖 等待讀 導致子go程不會執行 即寫操作不會執行
    fmt.Println("Read:", num)

    go func() {
        ch <- 789
    }()
}

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)

    // 
    go func() {
        for {
            select {
            case num := <- ch1:
                ch2 <- num
            }
        }
    }()

    for {
        select {
        case num := <- ch2:
            ch1 <- num
        }
    }
}

互斥鎖

// 藉助channel完成資料同步
//var ch := make(chan int)

// 通過鎖完成資料同步
var mutex sync.Mutex// 建立互斥鎖 新建互斥鎖狀態為未加鎖0 

func printer(str string) {
    mutex.Lock()// 訪問共享資料之前加鎖
    for _, ch := range str {
        fmt.Printf("%c", ch)
        time.Sleep(time.Millisecond * 300)
    }
    mutex.Unlock()// 共享資料訪問結束 解鎖
}

func person1() {
    printer("Hello")
    //ch <- 111
}

func person2() {
    //<- ch
    printer("World")
}

func main() {
    go person1()
    go person2()

    for {
        ;
    }
}

讀寫鎖

讀寫鎖的使用

// 讀寫鎖
//var rwMutex sync.RWMutex

// 在go中儘量不要將互斥鎖 讀寫鎖 與 channel混用 可能造成隱形死鎖
// 下面程式會死鎖
// 不使用channel 而是用全域性變數

func readGo(in <- chan int, idx int) {
    for {
        rwMutex.RLock()// 讀 加鎖
        num := <- in
        fmt.Println("Id", idx, "Read", num)
        rwMutex.RUnlock()// 讀 解鎖
    }
}

func writeGo(out chan <- int, idx int) {
    for {
        // 生成隨機數
        num := rand.Intn(1000)
        rwMutex.Lock()// 寫 加鎖
        out <- num
        fmt.Println("Id", idx, "Write", num)
        //time.Sleep(time.Millisecond * 300)
        rwMutex.Unlock()
        time.Sleep(time.Millisecond * 300)
    }
}

func main() {
    // 隨機數種子
    rand.Seed(time.Now().UnixNano())

    ch := make(chan int)
    //quit := make(chan bool)

    // 5個讀go程 5個寫go程
    for i := 0; i < 5; i++ {
        go readGo(ch, i)
    }

    for i := 0; i < 5; i++ {
        go writeGo(ch, i)
    }

    //<- quit
    for {
        ;
    }
}
// 使用全域性變數
var value int// 定義全域性變數 模擬共享資料

func readGo(idx int) {
    for {
        rwMutex.RLock()// 讀 加鎖
        num := value
        fmt.Println("Id", idx, "Read", num)
        time.Sleep(time.Millisecond * 300)
        rwMutex.RUnlock()// 讀 解鎖
    }
}

func writeGo(idx int) {
    for {
        // 生成隨機數
        num := rand.Intn(1000)
        rwMutex.Lock()// 寫 加鎖
        value = num
        fmt.Println("Id", idx, "Write", num)
        time.Sleep(time.Millisecond * 300)
        rwMutex.Unlock()
    }
}

func main() {
    // 隨機數種子
    rand.Seed(time.Now().UnixNano())

    //ch := make(chan int)
    //quit := make(chan bool)

    // 5個讀go程 5個寫go程
    for i := 0; i < 5; i++ {
        go readGo(i)
    }

    for i := 0; i < 5; i++ {
        go writeGo(i)
    }

    //<- quit
    for {
        ;
    }
}

使用channel模擬讀寫鎖

var value int

func readGo(in <- chan int, idx int) {
    for {
        num := <- in
        fmt.Println("Id", idx, "Read", num)
        time.Sleep(time.Millisecond * 300)
    }
}

func writeGo(out chan <- int, idx int) {
    for {
        // 生成隨機數
        num := rand.Intn(1000)
        out <- num
        fmt.Println("Id", idx, "Write", num)
        time.Sleep(time.Millisecond * 300)
    }
}

func main() {
    // 隨機數種子
    rand.Seed(time.Now().UnixNano())

    ch := make(chan int)
    //quit := make(chan bool)

    // 5個讀go程 5個寫go程
    for i := 0; i < 5; i++ {
        go readGo(ch, i)
    }

    for i := 0; i < 5; i++ {
        go writeGo(ch, i)
    }

    //<- quit
    for {
        ;
    }
}

條件變數

var cond sync.Cond// 全域性條件變數

func producer(out chan <- int, idx int) {
    for {
        // 加鎖
        cond.L.Lock()
        // 判斷緩衝區是否滿
        for len(out) == 5 {
            cond.Wait()// 等待緩衝區有位置可寫
        }
        num := rand.Intn(800)
        out <- num
        fmt.Println("Idx", idx, "Write", num)
        // 解鎖
        cond.L.Unlock()
        // 喚醒對端 即消費者
        cond.Signal()
        time.Sleep(time.Millisecond * 200)
    }
}

func consumer(in <- chan int, idx int) {
    for { 
        cond.L.Lock()
        for len(in) == 0 {
            cond.Wait()
        }
        num := <- in
        fmt.Println("idx", idx, "Read", num)
        cond.L.Unlock()
        cond.Signal()
        time.Sleep(time.Millisecond * 200)
    }
}

func main() {
    ch := make(chan int, 5)
    //quit := make(chan int)
    rand.Seed(time.Now().UnixNano())

    // 指定條件變數使用的鎖
    cond.L = new(sync.Mutex)

    for i := 0; i < 5; i++ {
        go producer(ch, i)
    }

    for i := 0; i < 5; i++ {
        go consumer(ch, i)
    }
    
    //<- quit
    for {
        ;
    }
}
轉載請註明出處