1. 程式人生 > >golang channel 使用總結

golang channel 使用總結

原文地址

不同於傳統的多執行緒併發模型使用共享記憶體來實現執行緒間通訊的方式,golang 的哲學是通過 channel 進行協程(goroutine)之間的通訊來實現資料共享:

Do not communicate by sharing memory; instead, share memory by communicating.

這種方式的優點是通過提供原子的通訊原語,避免了競態情形(race condition)下複雜的鎖機制。 channel 可以看成一個 FIFO 佇列,對 FIFO 佇列的讀寫都是原子的操作,不需要加鎖。對 channel 的操作行為結果總結如下:

操作 nil channel closed channel not-closed non-nil channel
close panic panic 成功 close
ch <- 一直阻塞 panic 阻塞或成功寫入資料
<- ch 一直阻塞 讀取對應型別零值 阻塞或成功讀取資料

讀取一個已關閉的 channel 時,總是能讀取到對應型別的零值,為了和讀取非空未關閉 channel 的行為區別,可以使用兩個接收值:

// ok is false when ch is closed
v, ok := <-ch

golang 中大部分型別都是值型別(只有 slice / channel / map 是引用型別),讀/寫型別是值型別的 channel 時,如果元素 size 比較大時,應該使用指標代替,避免頻繁的記憶體拷貝開銷。

內部實現

如圖所示,在 channel 的內部實現中(具體定義在 $GOROOT/src/runtime/chan.go 裡),維護了 3 個佇列:

  • 讀等待協程佇列 recvq,維護了阻塞在讀此 channel 的協程列表
  • 寫等待協程佇列 sendq,維護了阻塞在寫此 channel 的協程列表
  • 緩衝資料佇列 buf,用環形佇列實現,不帶緩衝的 channel 此佇列 size 則為 0

當協程嘗試從未關閉的 channel 中讀取資料時,內部的操作如下:

  1. 當 buf 非空時,此時 recvq 必為空,buf 彈出一個元素給讀協程,讀協程獲得資料後繼續執行,此時若 sendq 非空,則從 sendq 中彈出一個寫協程轉入 running 狀態,待寫資料入佇列 buf ,此時讀取操作 <- ch
    未阻塞;
  2. 當 buf 為空但 sendq 非空時(不帶緩衝的 channel),則從 sendq 中彈出一個寫協程轉入 running 狀態,待寫資料直接傳遞給讀協程,讀協程繼續執行,此時讀取操作 <- ch 未阻塞;
  3. 當 buf 為空並且 sendq 也為空時,讀協程入佇列 recvq 並轉入 blocking 狀態,當後續有其他協程往 channel 寫資料時,讀協程才會重新轉入 running 狀態,此時讀取操作 <- ch 阻塞。

類似的,當協程嘗試往未關閉的 channel 中寫入資料時,內部的操作如下:

  1. 當佇列 recvq 非空時,此時佇列 buf 必為空,從 recvq 彈出一個讀協程接收待寫資料,此讀協程此時結束阻塞並轉入 running 狀態,寫協程繼續執行,此時寫入操作 ch <- 未阻塞;
  2. 當佇列 recvq 為空但 buf 未滿時,此時 sendq 必為空,寫協程的待寫資料入 buf 然後繼續執行,此時寫入操作 ch <- 未阻塞;
  3. 當佇列 recvq 為空並且 buf 為滿時,此時寫協程入佇列 sendq 並轉入 blokcing 狀態,當後續有其他協程從 channel 中讀資料時,寫協程才會重新轉入 running 狀態,此時寫入操作 ch <- 阻塞。

當關閉 non-nil channel 時,內部的操作如下:

  1. 當佇列 recvq 非空時,此時 buf 必為空,recvq 中的所有協程都將收到對應型別的零值然後結束阻塞狀態;
  2. 當佇列 sendq 非空時,此時 buf 必為滿,sendq 中的所有協程都會產生 panic ,在 buf 中資料仍然會保留直到被其他協程讀取。

使用場景

除了常規的用來在協程之間傳遞資料外,本節列出了一些特殊的使用 channel 的場景。

futures / promises

golang 雖然沒有直接提供 futrue / promise 模型的操作原語,但通過 goroutine 和 channel 可以實現類似的功能:

package main

import (
	"io/ioutil"
	"log"
	"net/http"
)

// RequestFuture, http request promise.
func RequestFuture(url string) <-chan []byte {
    c := make(chan []byte, 1)
    go func() {
        var body []byte
        defer func() {
            c <- body
        }()

        res, err := http.Get(url)
        if err != nil {
            return
        }
        defer res.Body.Close()

        body, _ = ioutil.ReadAll(res.Body)
    }()

    return c
}

func main() {
    future := RequestFuture("https://api.github.com/users/octocat/orgs")
    body := <-future
    log.Printf("reponse length: %d", len(body))
}

條件變數(condition variable)

型別於 POSIX 介面中執行緒通知其他執行緒某個事件發生的條件變數,channel 的特性也可以用來當成協程之間同步的條件變數。因為 channel 只是用來通知,所以 channel 中具體的資料型別和值並不重要,這種場景一般用 strct {} 作為 channel 的型別。

一對一通知

類似 pthread_cond_signal() 的功能,用來在一個協程中通知另個某一個協程事件發生:

package main

import (
	"fmt"
	"time"
)

func main() {
	ch := make(chan struct{})
	nums := make([]int, 100)

	go func() {
		time.Sleep(time.Second)
		for i := 0; i < len(nums); i++ {
			nums[i] = i
		}
		// send a finish signal
		ch <- struct{}{}
	}()

	// wait for finish signal
	<-ch
	fmt.Println(nums)
}
廣播通知

類似 pthread_cond_broadcast() 的功能。利用從已關閉的 channel 讀取資料時總是非阻塞的特性,可以實現在一個協程中向其他多個協程廣播某個事件發生的通知:

package main

import (
	"fmt"
	"time"
)

func main() {
	N := 10
	exit := make(chan struct{})
	done := make(chan struct{}, N)

	// start N worker goroutines
	for i := 0; i < N; i++ {
		go func(n int) {
			for {
				select {
				// wait for exit signal
				case <-exit:
					fmt.Printf("worker goroutine #%d exit\n", n)
					done <- struct{}{}
					return
				case <-time.After(time.Second):
					fmt.Printf("worker goroutine #%d is working...\n", n)
				}
			}
		}(i)
	}

	time.Sleep(3 * time.Second)
	// broadcast exit signal
	close(exit)
	// wait for all worker goroutines exit
	for i := 0; i < N; i++ {
		<-done
	}
	fmt.Println("main goroutine exit")
}

訊號量

channel 的讀/寫相當於訊號量的 P / V 操作,下面的示例程式中 channel 相當於訊號量:

package main

import (
	"log"
	"math/rand"
	"time"
)

type Seat int
type Bar chan Seat

func (bar Bar) ServeConsumer(customerId int) {
	log.Print("-> consumer#", customerId, " enters the bar")
	seat := <-bar // need a seat to drink
	log.Print("consumer#", customerId, " drinks at seat#", seat)
	time.Sleep(time.Second * time.Duration(2+rand.Intn(6)))
	log.Print("<- consumer#", customerId, " frees seat#", seat)
	bar <- seat // free the seat and leave the bar
}

func main() {
	rand.Seed(time.Now().UnixNano())

	bar24x7 := make(Bar, 10) // the bar has 10 seats
	// Place seats in an bar.
	for seatId := 0; seatId < cap(bar24x7); seatId++ {
		bar24x7 <- Seat(seatId) // none of the sends will block
	}

	// a new consumer try to enter the bar for each second
	for customerId := 0; ; customerId++ {
		time.Sleep(time.Second)
		go bar24x7.ServeConsumer(customerId)
	}
}

互斥量

互斥量相當於二元訊號裡,所以 cap 為 1 的 channel 可以當成互斥量使用:

package main

import "fmt"

func main() {
	mutex := make(chan struct{}, 1) // the capacity must be one

	counter := 0
	increase := func() {
		mutex <- struct{}{} // lock
		counter++
		<-mutex // unlock
	}

	increase1000 := func(done chan<- struct{}) {
		for i := 0; i < 1000; i++ {
			increase()
		}
		done <- struct{}{}
	}

	done := make(chan struct{})
	go increase1000(done)
	go increase1000(done)
	<-done; <-done
	fmt.Println(counter) // 2000
}

關閉 channel

關閉不再需要使用的 channel 並不是必須的。跟其他資源比如開啟的檔案、socket 連線不一樣,這類資源使用完後不關閉後會造成控制代碼洩露,channel 使用完後不關閉也沒有關係,channel 沒有被任何協程用到後最終會被 GC 回收。關閉 channel 一般是用來通知其他協程某個任務已經完成了。golang 也沒有直接提供判斷 channel 是否已經關閉的介面,雖然可以用其他不太優雅的方式自己實現一個:

func isClosed(ch chan int) bool {
	select {
	case <-ch:
		return true
	default:
	}
	return false
}

不過實現一個這樣的介面也沒什麼必要。因為就算通過 isClosed() 得到當前 channel 當前還未關閉,如果試圖往 channel 裡寫資料,仍然可能會發生 panic ,因為在呼叫 isClosed() 後,其他協程可能已經把 channel 關閉了。 關閉 channel 時應該注意以下準則:

  • 不要在讀取端關閉 channel ,因為寫入端無法知道 channel 是否已經關閉,往已關閉的 channel 寫資料會 panic ;
  • 有多個寫入端時,不要再寫入端關閉 channle ,因為其他寫入端無法知道 channel 是否已經關閉,關閉已經關閉的 channel 會發生 panic ;
  • 如果只有一個寫入端,可以在這個寫入端放心關閉 channel 。

關閉 channel 粗暴一點的做法是隨意關閉,如果產生了 panic 就用 recover 避免程序掛掉。稍好一點的方案是使用標準庫的 sync 包來做關閉 channel 時的協程同步,不過使用起來也稍微複雜些。下面介紹一種優雅些的做法。

一寫多讀

這種場景下這個唯一的寫入端可以關閉 channel 用來通知讀取端所有資料都已經寫入完成了。讀取端只需要用 for range 把 channel 中資料遍歷完就可以了,當 channel 關閉時,for range 仍然會將 channel 緩衝中的資料全部遍歷完然後再退出迴圈:

package main

import (
	"fmt"
	"sync"
)

func main() {
	wg := &sync.WaitGroup{}
	ch := make(chan int, 100)

	send := func() {
		for i := 0; i < 100; i++ {
			ch <- i
		}
		// signal sending finish
		close(ch)
	}

	recv := func(id int) {
		defer wg.Done()
		for i := range ch {
			fmt.Printf("receiver #%d get %d\n", id, i)
		}
		fmt.Printf("receiver #%d exit\n", id)
	}

	wg.Add(3)
	go recv(0)
	go recv(1)
	go recv(2)
	send()

	wg.Wait()
}

多寫一讀

這種場景下雖然可以用 sync.Once 來解決多個寫入端重複關閉 channel 的問題,但更優雅的辦法設定一個額外的 channel ,由讀取端通過關閉來通知寫入端任務完成不要再繼續再寫入資料了:

package main

import (
	"fmt"
	"sync"
)

func main() {
	wg := &sync.WaitGroup{}
	ch := make(chan int, 100)
	done := make(chan struct{})

	send := func(id int) {
		defer wg.Done()
		for i := 0; ; i++ {
			select {
			case <-done:
				// get exit signal
				fmt.Printf("sender #%d exit\n", id)
				return
			case ch <- id*1000 + i:
			}
		}
	}

	recv := func() {
		count := 0
		for i := range ch {
			fmt.Printf("receiver get %d\n", i)
			count++
			if count >= 1000 {
				// signal recving finish
				close(done)
				return
			}
		}
	}

	wg.Add(3)
	go send(0)
	go send(1)
	go send(2)
	recv()

	wg.Wait()
}

多寫多讀

這種場景稍微複雜,和上面的例子一樣,也需要設定一個額外 channel 用來通知多個寫入端和讀取端。另外需要起一個額外的協程來通過關閉這個 channel 來廣播通知:

package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	wg := &sync.WaitGroup{}
	ch := make(chan int, 100)
	done := make(chan struct{})

	send := func(id int) {
		defer wg.Done()
		for i := 0; ; i++ {
			select {
			case <-done:
				// get exit signal
				fmt.Printf("sender #%d exit\n", id)
				return
			case ch <- id*1000 + i:
			}
		}
	}

	recv := func(id int) {
		defer wg.Done()
		for {
			select {
			case <-done:
				// get exit signal
				fmt.Printf("receiver #%d exit\n", id)
				return
			case i := <-ch:
				fmt.Printf("receiver #%d get %d\n", id, i)
				time.Sleep(time.Millisecond)
			}
		}
	}

	wg.Add(6)
	go send(0)
	go send(1)
	go send(2)
	go recv(0)
	go recv(1)
	go recv(2)

	time.Sleep(time.Second)
	// signal finish
	close(done)
	// wait all sender and receiver exit
	wg.Wait()
}

總結

channle 作為 golang 最重要的特性,用起來還是比較爽的。傳統的 C 裡要實現型別的功能的話,一般需要用到 socket 或者 FIFO 來實現,另外還要考慮資料包的完整性與併發衝突的問題,channel 則遮蔽了這些底層細節,使用者只需要考慮讀寫就可以了。 channel 是引用型別,瞭解一下 channel 底層的機制對更好的使用 channel 還是很用必要的。雖然操作原語簡單,但涉及到阻塞的問題,使用不當可能會造成死鎖或者無限制的協程建立最終導致程序掛掉。 channel 除在可以用來在協程之間通訊外,其阻塞和喚醒協程的特性也可以用作協程之間的同步機制,文中也用示例簡單介紹了這種場景下的用法。 關閉 channel 並不是必須的,只要沒有協程沒用引用 channel ,最終會被 GC 清理。所以使用的時候要特別注意,不要讓協程阻塞在 channel 上,這種情況很難檢測到,而且會造成 channel 和阻塞在 channel 的協程佔有的資源無法被 GC 清理最終導致記憶體洩露。 channle 方便 golang 程式使用 CSP 的程式設計範形,但是 golang 是一種多範形的程式語言,golang 也支援傳統的通過共享記憶體來通訊的程式設計方式。終極的原則是根據場景選擇合適的程式設計範型,不要因為 channel 好用而濫用 CSP 。

參考資料