1. 程式人生 > 其它 >GO 中 Chan 實現原理分享

GO 中 Chan 實現原理分享

GO 中 Chan 實現原理分享

嗨,我是小魔童哪吒,還記得咱們之前分享過GO 通道 和sync包的使用嗎?咱們來回顧一下

  • 分享了通道是什麼,通道的種類
  • 無緩衝,有緩衝,單向通道具體對應什麼
  • 對於通道的具體實踐
  • 分享了關於通道的異常情況整理
  • 簡單分享了sync包的使用

要是對上述內容還有點興趣的話,歡迎檢視文章 GO通道和 sync 包的分享

chan 是什麼?

是一種特殊的型別,是連線併發goroutine的管道

channel 通道是可以讓一個 goroutine 協程傳送特定值到另一個 goroutine 協程的通訊機制

通道像一個傳送帶或者佇列,總是遵循先入先出(First In First Out)的規則,保證收發資料的順序,這一點和管道

是一樣的

一個協程從通道的一頭放入資料,另一個協程從通道的另一頭讀出資料

每一個通道都是一個具體型別的導管,宣告 channel 的時候需要為其指定元素型別。

本篇文章主要是分享關於通道的實現原理,關於通道的使用,可以檢視文章 GO通道和 sync 包的分享 ,這裡有詳細的說明

GO 中 Chan 的底層資料結構

瞭解每一個元件或者每一個數據型別的實現原理,咱們都會去看原始碼中的資料結構是如何設計的

同樣,我們一起來看看 GO 的 Chan 的資料結構

GO 的 Chan 的原始碼實現是在 : src/runtime/chan.go

type hchan struct {
   qcount   uint           // total data in the queue
   dataqsiz uint           // size of the circular queue
   buf      unsafe.Pointer // points to an array of dataqsiz elements
   elemsize uint16
   closed   uint32
   elemtype *_type // element type
   sendx    uint   // send index
   recvx    uint   // receive index
   recvq    waitq  // list of recv waiters
   sendq    waitq  // list of send waiters

   // lock protects all fields in hchan, as well as several
   // fields in sudogs blocked on this channel.
   //
   // Do not change another G's status while holding this lock
   // (in particular, do not ready a G), as this can deadlock
   // with stack shrinking.
   lock mutex
}

hchan 是實現通道的核心資料結構,對應的成員也是不少,咱們根據原始碼註釋一個引數一個引數的來看看

tag 說明
qcount 當前的佇列,剩餘元素個數
dataqsiz 環形佇列可以存放的元素個數,也就是環形佇列的長度
buf 指標,指向環形佇列
elemsize 指的的佇列中每個元素的大小
closed 具體標識關閉的狀態
elemtype 見名知意,元素的型別
sendx 傳送佇列的下標,向佇列中寫入資料的時候,存放在佇列中的位置
recvx 接受佇列的下標,從佇列的 這個位置開始讀取資料
recvq 協程佇列,等待讀取訊息的協程佇列
sendq 協程佇列,等待發送訊息的協程佇列
lock 互斥鎖,在 chan 中,不可以併發的讀寫資料

根據上面的引數,我們或多或少就可以知道 GO 中的通道實現原理設計了哪些知識點:

  • 指標
  • 環形佇列
  • 協程
  • 互斥鎖

我們順便再來看看上述成員的協程佇列 waitq 對應的是啥樣的資料結構

type waitq struct {
   first *sudog
   last  *sudog
}

sudog 結構是在 src/runtime/runtime2.go中 ,咱們順便多學一手

// sudog represents a g in a wait list, such as for sending/receiving
// on a channel.
type sudog struct {
   // The following fields are protected by the hchan.lock of the
   // channel this sudog is blocking on. shrinkstack depends on
   // this for sudogs involved in channel ops.

   g *g

   next *sudog
   prev *sudog
   elem unsafe.Pointer // data element (may point to stack)

   // The following fields are never accessed concurrently.
   // For channels, waitlink is only accessed by g.
   // For semaphores, all fields (including the ones above)
   // are only accessed when holding a semaRoot lock.

   acquiretime int64
   releasetime int64
   ticket      uint32

   // isSelect indicates g is participating in a select, so
   // g.selectDone must be CAS'd to win the wake-up race.
   isSelect bool

   // success indicates whether communication over channel c
   // succeeded. It is true if the goroutine was awoken because a
   // value was delivered over channel c, and false if awoken
   // because c was closed.
   success bool

   parent   *sudog // semaRoot binary tree
   waitlink *sudog // g.waiting list or semaRoot
   waittail *sudog // semaRoot
   c        *hchan // channel
}

根據原始碼註釋,咱們大致知道sudog 是幹啥的

Sudog表示等待列表中的 g,例如在一個通道上傳送/接收

Sudog是很必要的,因為g↔synchronization物件關係是多對多

一個 g 可能在很多等候佇列上,所以一個 g 可能有很多sudogs

而且許多 g 可能在等待同一個同步物件,所以一個物件可能有許多sudogs

咱們抓住主要矛盾

Sudog的資料結構,主要的東西就是一個 g 和一個 elem

g,上面有說到他和 Sudog的對應關係

無論是讀通道還是寫通道,都會需要 elem

  • 讀通道

資料會從hchan的佇列中,拷貝到sudogelem

  • 寫通道

與讀通道類似,是將資料從 sudogelem處拷貝到hchan的佇列中

咱們來畫個圖看看

此處咱們畫一個 hchan的結構,主要畫一下 recvq等待讀取訊息的協程佇列,此處的佇列,實際上就是用連結串列來實現的

recvq會對應到 waitq結構,waitq 分為first頭結點 和 last尾節點 結構分別是 sudog

sudog裡面 elem存放具體的資料,next 指標指向下一個 sudog,直到指到lastsudog

通過上述的,應該就能明白 GO 中的 chan 基本結構了吧

咱來再來詳細看看 hchan 中其他引數都具體是啥意思

  • dataqsiz 對應的環形佇列是啥樣的
  • sendq和 讀 recvq 等待佇列是啥樣的
  • elemtype元素型別資訊又是啥

dataqsiz 對應的環形佇列是啥樣的

環形佇列,故名思議就是 一個首尾連線,成環狀的佇列

GO 中的 chan內部的環形佇列,主要作用是作為緩衝區

這個環形佇列的長度,我們在建立佇列的時候, 也就是建立 hchan 結構的時候,就已經指定好了的

就是 dataqsiz ,環形佇列的長度

咱們畫個圖清醒一下

上圖需要表達的意思是這個樣子的,上述的佇列是迴圈佇列,預設首尾連線哦

  • dataqsiz 表示 迴圈佇列的長度是 8 個
  • qcount 表示 當前佇列中有 5 個元素
  • buf 是指標,指向迴圈佇列頭
  • sendx 是傳送佇列的下標,這裡為 1 ,則指向佇列的第 2 個區域 ,這個引數可選範圍是 [0 , 8)
  • recvx 是接收佇列的下標,這裡為 4 ,則指向的是 佇列的第 5 個區域進行讀取資料

這裡順帶提一下,hchan 中讀取資料還是寫入資料,都是需要去拿 lock 互斥鎖的,同一個通道,在同一個時刻只能允許一個協程進行讀寫

sendq和 讀 recvq 等待佇列是啥樣的

hchan 結構中的 2 個協程佇列,一個是用於讀取資料,一個是用於傳送資料,他們都是等待佇列,我們來看看這個等待佇列都是咋放資料上去的,分別有啥特性需要注意

當從通道中讀取 或者 傳送資料:

  • 若通道的緩衝區為空,或者沒有緩衝區,此時從通道中讀取資料,則協程是會被阻塞
  • 若通道緩衝區為滿,或者沒有緩衝區,此時從通道中寫資料,則協程仍然也會被阻塞

這些被阻塞的協程就會被放到等待佇列中,按照讀 和 寫 的動作來進行分類為寫 sendq和 讀 recvq 佇列

那麼這些阻塞的協程,啥時候會被喚醒呢?

看過之前的文章 GO通道和 sync 包的分享,應該就能知道

我們在來回顧一下,這篇文章的表格,通道會存在的異常情況:

channel 狀態 未初始化的通道(nil) 通道非空 通道是空的 通道滿了 通道未滿
接收資料 阻塞 接收資料 阻塞 接收資料 接收資料
傳送資料 阻塞 傳送資料 傳送資料 阻塞 傳送資料
關閉 panic 關閉通道成功
待資料讀取完畢後
返回零值
關閉通道成功
直接返回零值
關閉通道成功
待資料讀取完畢後
返回零值
關閉通道成功
待資料讀取完畢後
返回零值

此時,我們就知道,具體什麼時候被阻塞的協程會被喚醒了

  • 因為讀阻塞的協程,會被通道中的寫入資料的協程喚醒,反之亦然

  • 因為寫阻塞的協程,也會被通道中讀取資料的協程喚醒

elemtype元素型別資訊又是啥

這個元素型別資訊就不難理解了,對於我們使用通道,建立通道的時候我們需要填入通道中資料的型別,一個通道,只能寫一種資料型別,指的就是這裡的elemtype

另外 hchan 還有一個成員是elemsize,代表上述元素型別的佔用空間大小

那麼這倆成員有啥作用呢?

elemtypeelemsize就可以計算指定型別的資料佔用空間大小了

前者用於在資料傳遞的過程中進行賦值

後者可以用來在環形佇列中定位具體的元素

建立 chan 是咋實現的?

我們再來瞅瞅 chan.go 的原始碼實現 ,看到原始碼中的 makechan 具體實現

func makechan(t *chantype, size int) *hchan {
   elem := t.elem

   // compiler checks this but be safe.
   if elem.size >= 1<<16 {
      throw("makechan: invalid channel element type")
   }
   if hchanSize%maxAlign != 0 || elem.align > maxAlign {
      throw("makechan: bad alignment")
   }

   mem, overflow := math.MulUintptr(elem.size, uintptr(size))
   if overflow || mem > maxAlloc-hchanSize || size < 0 {
      panic(plainError("makechan: size out of range"))
   }

   // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
   // buf points into the same allocation, elemtype is persistent.
   // SudoG's are referenced from their owning thread so they can't be collected.
   // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
   var c *hchan
   switch {
   case mem == 0:
      // Queue or element size is zero.
      c = (*hchan)(mallocgc(hchanSize, nil, true))
      // Race detector uses this location for synchronization.
      c.buf = c.raceaddr()
   case elem.ptrdata == 0:
      // Elements do not contain pointers.
      // Allocate hchan and buf in one call.
      c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
      c.buf = add(unsafe.Pointer(c), hchanSize)
   default:
      // Elements contain pointers.
      c = new(hchan)
      c.buf = mallocgc(mem, elem, true)
   }

   c.elemsize = uint16(elem.size)
   c.elemtype = elem
   c.dataqsiz = uint(size)
   lockInit(&c.lock, lockRankHchan)

   if debugChan {
      print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
   }
   return c
}

如上原始碼實際上就是初始化 chan 對應的成員,其中迴圈佇列 buf 的大小,是由 makechan 函式傳入的 型別資訊和緩衝區長度決定的,也就是makechan 的入參

可以通過上述程式碼的 3 個位置就可以知道

// 1
func makechan(t *chantype, size int) *hchan
// 2
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
// 3
var c *hchan
   switch {
   case mem == 0:
      // Queue or element size is zero.
      c = (*hchan)(mallocgc(hchanSize, nil, true))
      // Race detector uses this location for synchronization.
      c.buf = c.raceaddr()
   case elem.ptrdata == 0:
      // Elements do not contain pointers.
      // Allocate hchan and buf in one call.
      c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
      c.buf = add(unsafe.Pointer(c), hchanSize)
   default:
      // Elements contain pointers.
      c = new(hchan)
      c.buf = mallocgc(mem, elem, true)
   }

讀寫 chan 的基本流程

第一張圖說明白向 chan 寫入資料的流程

向通道中寫入資料,我們會涉及sendqrecvq佇列,和迴圈佇列的資源問題

根據圖示可以看出向通道中寫入資料分為 3 種情況:

  • 寫入資料的時候,若recvq 佇列為空,且迴圈佇列有空位,那麼就直接將資料寫入到 迴圈佇列的隊尾 即可
  • recvq 佇列為空,且迴圈佇列無空位,則將當前的協程放到sendq等待佇列中進行阻塞,等待被喚醒,當被喚醒的時候,需要寫入的資料,已經被讀取出來,且已經完成了寫入操作
  • recvq 佇列為不為空,那麼可以說明迴圈佇列中沒有資料,或者迴圈佇列是空的,即沒有緩衝區(向無緩衝的通道寫入資料),此時,直接將recvq等待佇列中取出一個G,寫入資料,喚醒G,完成寫入操作

第二張圖說明白向 chan 讀取資料的流程

向通道中讀取資料,我們會涉及sendqrecvq佇列,和迴圈佇列的資源問題

根據圖示可以看出向通道中讀取資料分為 4 種情況:

  • sendq為空,且迴圈佇列無元素的時候,那就將當前的協程加入recvq等待佇列,把recvq等待佇列對頭的一個協程取出來,喚醒,讀取資料
  • sendq為空,且迴圈佇列有元素的時候,直接讀取迴圈佇列中的資料即可
  • sendq有資料,且迴圈佇列有元素的時候,直接讀取迴圈佇列中的資料即可,且把sendq佇列取一個G放到迴圈佇列中,進行補充
  • sendq有資料,且迴圈佇列無元素的時候,則從sendq取出一個G,並且喚醒他,進行資料讀取操作

上面說了通道的建立,讀寫,那麼通道咋關閉?

通道的關閉,我們在應用的時候直接 close 就搞定了,那麼對應close的時候,底層的佇列都是做了啥呢?

若關閉了當前的通道,那麼系統會把recvq 讀取資料的等待佇列裡面的所有協程,全部喚醒,這裡面的每一個G 寫入的資料 預設就寫個 nil,因為通道關閉了,從關閉的通道里面讀取資料,讀到的是nil

系統還會把sendq寫資料的等待佇列裡面的每一個協程喚醒,但是此時就會有問題了,向已經關閉的協程裡面寫入資料,會報panic

我們再來梳理一下,什麼情況下對通道操作,會報panic,咱們現在對之前提到的表格再來補充一波

channel 狀態 未初始化的通道(nil) 通道非空 通道是空的 通道滿了 通道未滿 關閉的通道
接收資料 阻塞 接收資料 阻塞 接收資料 接收資料 nil
傳送資料 阻塞 傳送資料 傳送資料 阻塞 傳送資料 panic
關閉 panic 關閉通道成功
待資料讀取完畢後
返回零值
關閉通道成功
直接返回零值
關閉通道成功
待資料讀取完畢後
返回零值
關閉通道成功
待資料讀取完畢後
返回零值
panic
  • 關閉一個已經被關閉了的通道,會報panic
  • 關閉一個未初始化的通道,即為nil的通道,也會報panic
  • 向一個已經關閉的通道寫入資料,會報panic

你以為這就完了嗎?

GO 裡面Chan 一般會和 select 搭配使用,我們最後來簡單說一下GO 的 通道咋和select使用

GO 裡面select 就和 C/C++裡面的多路IO複用類似,在C/C++中多路IO複用有如下幾種方式

  • SELECT
  • POLL
  • EPOLL

都可以自己去模擬實現多路IO複用,各有利弊,一般使用的最多的是 EPOLL,且C/C++也有對應的網路庫

當我們寫GO 的多路IO複用的時候,那就相當爽了,GO 預設支援select 關鍵字

SELECT 簡單使用

我們就來看看都是咋用的,不廢話,咱直接上DEMO

package main

import (
   "log"
   "time"
)

func main() {

   // 簡單設定log引數
   log.SetFlags(log.Lshortfile | log.LstdFlags)

   // 建立 2 個通道,元素資料型別為 int,緩衝區大小為 5
   var ch1 = make(chan int, 5)
   var ch2 = make(chan int, 5)

   // 分別向通道中各自寫入資料,咱預設寫1吧
   // 直接寫一個匿名函式 向通道中新增資料
   go func (){
      var num = 1
      for {
         ch1 <- num
         num += 1
         time.Sleep(1 * time.Second)
      }
   }()

   go func (){
      var num = 1
      for {
         ch2 <- num
         num += 1
         time.Sleep(1 * time.Second)
      }
   }()

   for {
      select {// 讀取資料
      case num := <-ch1:
         log.Printf("read ch1 data is  %d\n", num)

      case num := <-ch2:
         log.Printf("read ch2 data is: %d\n", num)

      default:
         log.Printf("ch1 and ch2 is empty\n")
          // 休息 1s 再讀
         time.Sleep(1 * time.Second)
      }
   }
}

執行效果

2021/06/18 17:43:06 main.go:54: ch1 and ch2 is empty
2021/06/18 17:43:07 main.go:48: read ch1 data is  1
2021/06/18 17:43:07 main.go:48: read ch1 data is  2
2021/06/18 17:43:07 main.go:51: read ch2 data is: 1
2021/06/18 17:43:07 main.go:51: read ch2 data is: 2
2021/06/18 17:43:07 main.go:54: ch1 and ch2 is empty
2021/06/18 17:43:08 main.go:48: read ch1 data is  3
2021/06/18 17:43:08 main.go:51: read ch2 data is: 3
2021/06/18 17:43:08 main.go:54: ch1 and ch2 is empty
2021/06/18 17:43:09 main.go:48: read ch1 data is  4
2021/06/18 17:43:09 main.go:51: read ch2 data is: 4
2021/06/18 17:43:09 main.go:54: ch1 and ch2 is empty
2021/06/18 17:43:10 main.go:51: read ch2 data is: 5
2021/06/18 17:43:10 main.go:48: read ch1 data is  5

從執行結果來看,select 監控的 2個 通道,讀取到的資料是隨機的

可是我們看到case這個關鍵字,是不是會想到 switch ... case...,此處的的case 是順序執行的(GO 中沒有switch),select 裡面的 case 應該也是順序執行才對呀,為啥結果是隨機的?

大家要是感興趣的話,可以深入研究一下,咱們今天就先到這裡了。

總結

  • 分享了 GO 中通道是什麼
  • 通道的底層資料結構詳細解析
  • 通道在GO原始碼中是如何實現的
  • Chan 讀寫的基本原理
  • 關閉通道會出現哪些異常,panic
  • select 的簡單應用

歡迎點贊,關注,收藏

朋友們,你的支援和鼓勵,是我堅持分享,提高質量的動力

好了,本次就到這裡,下一次 GO 中 defer的實現原理分享

技術是開放的,我們的心態,更應是開放的。擁抱變化,向陽而生,努力向前行。

我是小魔童哪吒,歡迎點贊關注收藏,下次見~