GO 中 Chan 實現原理分享
GO 中 Chan 實現原理分享
嗨,我是小魔童哪吒,還記得咱們之前分享過GO 通道 和sync包的使用嗎?咱們來回顧一下
- 分享了通道是什麼,通道的種類
- 無緩衝,有緩衝,單向通道具體對應什麼
- 對於通道的具體實踐
- 分享了關於通道的異常情況整理
- 簡單分享了sync包的使用
要是對上述內容還有點興趣的話,歡迎檢視文章 GO通道和 sync 包的分享
chan 是什麼?
是一種特殊的型別,是連線併發goroutine
的管道
channel 通道是可以讓一個 goroutine 協程傳送特定值到另一個 goroutine 協程的通訊機制。
通道像一個傳送帶或者佇列,總是遵循先入先出(First In First Out)的規則,保證收發資料的順序,這一點和管道
一個協程從通道的一頭放入資料,另一個協程從通道的另一頭讀出資料
每一個通道都是一個具體型別的導管,宣告 channel 的時候需要為其指定元素型別。
本篇文章主要是分享關於通道的實現原理,關於通道的使用,可以檢視文章 GO通道和 sync 包的分享 ,這裡有詳細的說明
GO 中 Chan 的底層資料結構
瞭解每一個元件或者每一個數據型別的實現原理,咱們都會去看原始碼中的資料結構是如何設計的
同樣,我們一起來看看 GO 的 Chan 的資料結構
GO 的 Chan 的原始碼實現是在 : src/runtime/chan.go
type hchan struct { qcount uint // total data in the queue dataqsiz uint // size of the circular queue buf unsafe.Pointer // points to an array of dataqsiz elements elemsize uint16 closed uint32 elemtype *_type // element type sendx uint // send index recvx uint // receive index recvq waitq // list of recv waiters sendq waitq // list of send waiters // lock protects all fields in hchan, as well as several // fields in sudogs blocked on this channel. // // Do not change another G's status while holding this lock // (in particular, do not ready a G), as this can deadlock // with stack shrinking. lock mutex }
hchan
是實現通道的核心資料結構,對應的成員也是不少,咱們根據原始碼註釋一個引數一個引數的來看看
tag | 說明 |
---|---|
qcount | 當前的佇列,剩餘元素個數 |
dataqsiz | 環形佇列可以存放的元素個數,也就是環形佇列的長度 |
buf | 指標,指向環形佇列 |
elemsize | 指的的佇列中每個元素的大小 |
closed | 具體標識關閉的狀態 |
elemtype | 見名知意,元素的型別 |
sendx | 傳送佇列的下標,向佇列中寫入資料的時候,存放在佇列中的位置 |
recvx | 接受佇列的下標,從佇列的 這個位置開始讀取資料 |
recvq | 協程佇列,等待讀取訊息的協程佇列 |
sendq | 協程佇列,等待發送訊息的協程佇列 |
lock | 互斥鎖,在 chan 中,不可以併發的讀寫資料 |
根據上面的引數,我們或多或少就可以知道 GO 中的通道實現原理設計了哪些知識點:
- 指標
- 環形佇列
- 協程
- 互斥鎖
我們順便再來看看上述成員的協程佇列 waitq
對應的是啥樣的資料結構
type waitq struct {
first *sudog
last *sudog
}
sudog
結構是在 src/runtime/runtime2.go
中 ,咱們順便多學一手
// sudog represents a g in a wait list, such as for sending/receiving
// on a channel.
type sudog struct {
// The following fields are protected by the hchan.lock of the
// channel this sudog is blocking on. shrinkstack depends on
// this for sudogs involved in channel ops.
g *g
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
// The following fields are never accessed concurrently.
// For channels, waitlink is only accessed by g.
// For semaphores, all fields (including the ones above)
// are only accessed when holding a semaRoot lock.
acquiretime int64
releasetime int64
ticket uint32
// isSelect indicates g is participating in a select, so
// g.selectDone must be CAS'd to win the wake-up race.
isSelect bool
// success indicates whether communication over channel c
// succeeded. It is true if the goroutine was awoken because a
// value was delivered over channel c, and false if awoken
// because c was closed.
success bool
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}
根據原始碼註釋,咱們大致知道sudog
是幹啥的
Sudog
表示等待列表中的 g,例如在一個通道上傳送/接收
Sudog
是很必要的,因為g↔synchronization物件關係是多對多
一個 g 可能在很多等候佇列上,所以一個 g 可能有很多sudogs
而且許多 g 可能在等待同一個同步物件,所以一個物件可能有許多sudogs
咱們抓住主要矛盾
Sudog
的資料結構,主要的東西就是一個 g
和一個 elem
,
g,上面有說到他和 Sudog
的對應關係
無論是讀通道還是寫通道,都會需要 elem
- 讀通道
資料會從hchan
的佇列中,拷貝到sudog
的elem
中
- 寫通道
與讀通道類似,是將資料從 sudog
的elem
處拷貝到hchan
的佇列中
咱們來畫個圖看看
此處咱們畫一個 hchan
的結構,主要畫一下 recvq
等待讀取訊息的協程佇列,此處的佇列,實際上就是用連結串列來實現的
recvq
會對應到 waitq
結構,waitq
分為first
頭結點 和 last
尾節點 結構分別是 sudog
sudog
裡面 elem存放具體的資料,next 指標指向下一個 sudog
,直到指到last
的 sudog
通過上述的,應該就能明白 GO 中的 chan
基本結構了吧
咱來再來詳細看看 hchan
中其他引數都具體是啥意思
dataqsiz
對應的環形佇列是啥樣的- 寫
sendq
和 讀recvq
等待佇列是啥樣的 elemtype
元素型別資訊又是啥
dataqsiz
對應的環形佇列是啥樣的
環形佇列,故名思議就是 一個首尾連線,成環狀的佇列
GO 中的 chan
內部的環形佇列,主要作用是作為緩衝區
這個環形佇列的長度,我們在建立佇列的時候, 也就是建立 hchan
結構的時候,就已經指定好了的
就是 dataqsiz
,環形佇列的長度
咱們畫個圖清醒一下
上圖需要表達的意思是這個樣子的,上述的佇列是迴圈佇列,預設首尾連線哦:
- dataqsiz 表示 迴圈佇列的長度是 8 個
- qcount 表示 當前佇列中有 5 個元素
- buf 是指標,指向迴圈佇列頭
- sendx 是傳送佇列的下標,這裡為 1 ,則指向佇列的第 2 個區域 ,這個引數可選範圍是 [0 , 8)
- recvx 是接收佇列的下標,這裡為 4 ,則指向的是 佇列的第 5 個區域進行讀取資料
這裡順帶提一下,hchan
中讀取資料還是寫入資料,都是需要去拿 lock
互斥鎖的,同一個通道,在同一個時刻只能允許一個協程進行讀寫
寫 sendq
和 讀 recvq
等待佇列是啥樣的
hchan
結構中的 2 個協程佇列,一個是用於讀取資料,一個是用於傳送資料,他們都是等待佇列,我們來看看這個等待佇列都是咋放資料上去的,分別有啥特性需要注意
當從通道中讀取 或者 傳送資料:
- 若通道的緩衝區為空,或者沒有緩衝區,此時從通道中讀取資料,則協程是會被阻塞的
- 若通道緩衝區為滿,或者沒有緩衝區,此時從通道中寫資料,則協程仍然也會被阻塞
這些被阻塞的協程就會被放到等待佇列中,按照讀 和 寫 的動作來進行分類為寫 sendq
和 讀 recvq
佇列
那麼這些阻塞的協程,啥時候會被喚醒呢?
看過之前的文章 GO通道和 sync 包的分享,應該就能知道
我們在來回顧一下,這篇文章的表格,通道會存在的異常情況:
channel 狀態 | 未初始化的通道(nil) | 通道非空 | 通道是空的 | 通道滿了 | 通道未滿 |
---|---|---|---|---|---|
接收資料 | 阻塞 |
接收資料 | 阻塞 |
接收資料 | 接收資料 |
傳送資料 | 阻塞 |
傳送資料 | 傳送資料 | 阻塞 |
傳送資料 |
關閉 | panic | 關閉通道成功 待資料讀取完畢後 返回零值 |
關閉通道成功 直接返回零值 |
關閉通道成功 待資料讀取完畢後 返回零值 |
關閉通道成功 待資料讀取完畢後 返回零值 |
此時,我們就知道,具體什麼時候被阻塞的協程會被喚醒了
-
因為讀阻塞的協程,會被通道中的寫入資料的協程喚醒,反之亦然
-
因為寫阻塞的協程,也會被通道中讀取資料的協程喚醒
elemtype
元素型別資訊又是啥
這個元素型別資訊就不難理解了,對於我們使用通道,建立通道的時候我們需要填入通道中資料的型別,一個通道,只能寫一種資料型別,指的就是這裡的elemtype
另外 hchan
還有一個成員是elemsize
,代表上述元素型別的佔用空間大小
那麼這倆成員有啥作用呢?
elemtype
和elemsize
就可以計算指定型別的資料佔用空間大小了
前者用於在資料傳遞的過程中進行賦值
後者可以用來在環形佇列中定位具體的元素
建立 chan 是咋實現的?
我們再來瞅瞅 chan.go
的原始碼實現 ,看到原始碼中的 makechan
具體實現
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
如上原始碼實際上就是初始化 chan
對應的成員,其中迴圈佇列 buf 的大小,是由 makechan
函式傳入的 型別資訊和緩衝區長度決定的,也就是makechan
的入參
可以通過上述程式碼的 3 個位置就可以知道
// 1
func makechan(t *chantype, size int) *hchan
// 2
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
// 3
var c *hchan
switch {
case mem == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
讀寫 chan 的基本流程
第一張圖說明白向 chan 寫入資料的流程
向通道中寫入資料,我們會涉及sendq
、 recvq
佇列,和迴圈佇列的資源問題
根據圖示可以看出向通道中寫入資料分為 3 種情況:
- 寫入資料的時候,若
recvq
佇列為空,且迴圈佇列有空位,那麼就直接將資料寫入到 迴圈佇列的隊尾 即可 - 若
recvq
佇列為空,且迴圈佇列無空位,則將當前的協程放到sendq
等待佇列中進行阻塞,等待被喚醒,當被喚醒的時候,需要寫入的資料,已經被讀取出來,且已經完成了寫入操作 - 若
recvq
佇列為不為空,那麼可以說明迴圈佇列中沒有資料,或者迴圈佇列是空的,即沒有緩衝區(向無緩衝的通道寫入資料),此時,直接將recvq
等待佇列中取出一個G,寫入資料,喚醒G,完成寫入操作
第二張圖說明白向 chan 讀取資料的流程
向通道中讀取資料,我們會涉及sendq
、 recvq
佇列,和迴圈佇列的資源問題
根據圖示可以看出向通道中讀取資料分為 4 種情況:
- 若
sendq
為空,且迴圈佇列無元素的時候,那就將當前的協程加入recvq
等待佇列,把recvq
等待佇列對頭的一個協程取出來,喚醒,讀取資料 - 若
sendq
為空,且迴圈佇列有元素的時候,直接讀取迴圈佇列中的資料即可 - 若
sendq
有資料,且迴圈佇列有元素的時候,直接讀取迴圈佇列中的資料即可,且把sendq
佇列取一個G放到迴圈佇列中,進行補充 - 若
sendq
有資料,且迴圈佇列無元素的時候,則從sendq
取出一個G,並且喚醒他,進行資料讀取操作
上面說了通道的建立,讀寫,那麼通道咋關閉?
通道的關閉,我們在應用的時候直接 close
就搞定了,那麼對應close
的時候,底層的佇列都是做了啥呢?
若關閉了當前的通道,那麼系統會把recvq
讀取資料的等待佇列裡面的所有協程,全部喚醒,這裡面的每一個G 寫入的資料 預設就寫個 nil,因為通道關閉了,從關閉的通道里面讀取資料,讀到的是nil
系統還會把sendq
寫資料的等待佇列裡面的每一個協程喚醒,但是此時就會有問題了,向已經關閉的協程裡面寫入資料,會報panic
我們再來梳理一下,什麼情況下對通道操作,會報panic
,咱們現在對之前提到的表格再來補充一波
channel 狀態 | 未初始化的通道(nil) | 通道非空 | 通道是空的 | 通道滿了 | 通道未滿 | 關閉的通道 |
---|---|---|---|---|---|---|
接收資料 | 阻塞 |
接收資料 | 阻塞 |
接收資料 | 接收資料 | nil |
傳送資料 | 阻塞 |
傳送資料 | 傳送資料 | 阻塞 |
傳送資料 | panic |
關閉 | panic | 關閉通道成功 待資料讀取完畢後 返回零值 |
關閉通道成功 直接返回零值 |
關閉通道成功 待資料讀取完畢後 返回零值 |
關閉通道成功 待資料讀取完畢後 返回零值 |
panic |
- 關閉一個已經被關閉了的通道,會報
panic
- 關閉一個未初始化的通道,即為
nil
的通道,也會報panic
- 向一個已經關閉的通道寫入資料,會報
panic
你以為這就完了嗎?
GO 裡面Chan
一般會和 select
搭配使用,我們最後來簡單說一下GO 的 通道咋和select使用吧
GO 裡面select
就和 C/C++
裡面的多路IO複用
類似,在C/C++
中多路IO複用有如下幾種方式
- SELECT
- POLL
- EPOLL
都可以自己去模擬實現多路IO複用,各有利弊,一般使用的最多的是 EPOLL,且C/C++也有對應的網路庫
當我們寫GO 的多路IO複用的時候,那就相當爽了,GO 預設支援select
關鍵字
SELECT 簡單使用
我們就來看看都是咋用的,不廢話,咱直接上DEMO
package main
import (
"log"
"time"
)
func main() {
// 簡單設定log引數
log.SetFlags(log.Lshortfile | log.LstdFlags)
// 建立 2 個通道,元素資料型別為 int,緩衝區大小為 5
var ch1 = make(chan int, 5)
var ch2 = make(chan int, 5)
// 分別向通道中各自寫入資料,咱預設寫1吧
// 直接寫一個匿名函式 向通道中新增資料
go func (){
var num = 1
for {
ch1 <- num
num += 1
time.Sleep(1 * time.Second)
}
}()
go func (){
var num = 1
for {
ch2 <- num
num += 1
time.Sleep(1 * time.Second)
}
}()
for {
select {// 讀取資料
case num := <-ch1:
log.Printf("read ch1 data is %d\n", num)
case num := <-ch2:
log.Printf("read ch2 data is: %d\n", num)
default:
log.Printf("ch1 and ch2 is empty\n")
// 休息 1s 再讀
time.Sleep(1 * time.Second)
}
}
}
執行效果
2021/06/18 17:43:06 main.go:54: ch1 and ch2 is empty
2021/06/18 17:43:07 main.go:48: read ch1 data is 1
2021/06/18 17:43:07 main.go:48: read ch1 data is 2
2021/06/18 17:43:07 main.go:51: read ch2 data is: 1
2021/06/18 17:43:07 main.go:51: read ch2 data is: 2
2021/06/18 17:43:07 main.go:54: ch1 and ch2 is empty
2021/06/18 17:43:08 main.go:48: read ch1 data is 3
2021/06/18 17:43:08 main.go:51: read ch2 data is: 3
2021/06/18 17:43:08 main.go:54: ch1 and ch2 is empty
2021/06/18 17:43:09 main.go:48: read ch1 data is 4
2021/06/18 17:43:09 main.go:51: read ch2 data is: 4
2021/06/18 17:43:09 main.go:54: ch1 and ch2 is empty
2021/06/18 17:43:10 main.go:51: read ch2 data is: 5
2021/06/18 17:43:10 main.go:48: read ch1 data is 5
從執行結果來看,select
監控的 2個 通道,讀取到的資料是隨機的
可是我們看到case
這個關鍵字,是不是會想到 switch ... case...
,此處的的case
是順序執行的(GO 中沒有switch),select
裡面的 case
應該也是順序執行才對呀,為啥結果是隨機的?
大家要是感興趣的話,可以深入研究一下,咱們今天就先到這裡了。
總結
- 分享了 GO 中通道是什麼
- 通道的底層資料結構詳細解析
- 通道在GO原始碼中是如何實現的
- Chan 讀寫的基本原理
- 關閉通道會出現哪些異常,panic
- select 的簡單應用
歡迎點贊,關注,收藏
朋友們,你的支援和鼓勵,是我堅持分享,提高質量的動力
好了,本次就到這裡,下一次 GO 中 defer的實現原理分享
技術是開放的,我們的心態,更應是開放的。擁抱變化,向陽而生,努力向前行。
我是小魔童哪吒,歡迎點贊關注收藏,下次見~