【我的區塊鏈之路】- golang原始碼分析之channel的底層實現
【轉載請標明出處】https://blog.csdn.net/qq_25870633/article/details/83388952
接上篇文章 【我的區塊鏈之路】- golang原始碼分析之select的底層實現 我這裡因為面試的時候也有被問到過 channel的底層實現,所以就一併的去把 channel 啊,goroutine啊,go協程的排程器啊,interface啊,mutex啊,rwmutex啊,timer啊等等底層全部都去錄了一遍。沒辦法啊,曾經我以為自己是個逆風尿三丈的耀眼王者,然而這段時間發現自己只是一隻順風尿溼鞋的不屈黑鐵。。。
參考:
https://blog.csdn.net/nobugtodebug/article/details/45396507
https://studygolang.com/articles/10575?fr=sidebar
那麼,我們這篇文章主要是講channel的底層實現的。
首先,我們先看channel的實現是在哪裡?在runtime包下面咯,路徑為:./src/runtime/chan.go 檔案中,其中主要的結構體為:
/** 定義了 channel 的結構體 */ type hchan struct { qcount uint // total data in the queue 佇列中的當前資料的個數 dataqsiz uint // size of the circular queue channel的大小 buf unsafe.Pointer // points to an array of dataqsiz elements 資料緩衝區,存放資料的環形陣列 elemsize uint16 // channel 中資料型別的大小 (單個元素的大小) closed uint32 // 表示 channel 是否關閉的標識位 elemtype *_type // element type 佇列中的元素型別 // send 和 recieve 的索引,用於實現環形陣列佇列 sendx uint // send index 當前傳送元素的索引 recvx uint // receive index 當前接收元素的索引 recvq waitq // list of recv waiters 接收等待佇列;由 recv 行為(也就是 <-ch)阻塞在 channel 上的 goroutine 佇列 sendq waitq // list of send waiters 傳送等待佇列;由 send 行為 (也就是 ch<-) 阻塞在 channel 上的 goroutine 佇列 // lock protects all fields in hchan, as well as several // fields in sudogs blocked on this channel. // lock保護hchan中的所有欄位,以及此通道上阻塞的sudoG中的幾個欄位。 // // Do not change another G's status while holding this lock // (in particular, do not ready a G), as this can deadlock // with stack shrinking. // 保持此鎖定時不要更改另一個G的狀態(特別是,沒有準備好G),因為這可能會因堆疊收縮而死鎖。 lock mutex } /** 傳送及接收佇列的結構體 等待佇列的連結串列實現 */ type waitq struct { first *sudog last *sudog }
然後還有在 runtime包的 ./src/runtime/runtime2.go 中定義的 sudoG對應的結構體:
/** 對 G 的封裝 */ type sudog struct { // The following fields are protected by the hchan.lock of the // channel this sudog is blocking on. shrinkstack depends on // this for sudogs involved in channel ops. g *g selectdone *uint32 // CAS to 1 to win select race (may point to stack) next *sudog prev *sudog elem unsafe.Pointer // data element (may point to stack) // The following fields are never accessed concurrently. // For channels, waitlink is only accessed by g. // For semaphores, all fields (including the ones above) // are only accessed when holding a semaRoot lock. acquiretime int64 releasetime int64 ticket uint32 parent *sudog // semaRoot binary tree waitlink *sudog // g.waiting list or semaRoot waittail *sudog // semaRoot c *hchan // channel }
有上述的結構體我們大致可以看出channel其實就是由一個環形陣列實現的佇列,用於儲存訊息元素;兩個連結串列實現的 goroutine 等待佇列,用於儲存阻塞在 recv 和 send 操作上的 goroutine;一個互斥鎖,用於各個屬性變動的同步,只不過這個鎖是一個輕量級鎖。其中 recvq 是讀操作阻塞在 channel 的 goroutine 列表,sendq 是寫操作阻塞在 channel 的 goroutine 列表。列表的實現是 sudog,其實就是一個對 g 的結構的封裝。
和select類似,hchan其實只是channel的頭部。頭部後面的一段記憶體連續的陣列將作為channel的緩衝區,即用於存放channel資料的環形佇列。qcount 和 dataqsiz 分別描述了緩衝區當前使用量【len】和容量【cap】。若channel是無緩衝的,則size是0,就沒有這個環形隊列了。如圖:
下面我們來看看例項化一個channel的實現:
make:
make 的過程還比較簡單,需要注意一點的是當元素不含指標的時候,會將整個 hchan 分配成一個連續的空間。下面就是make建立channel的程式碼實現:
//go:linkname reflect_makechan reflect.makechan
func reflect_makechan(t *chantype, size int64) *hchan {
return makechan(t, size)
}
/**
建立 chan
*/
func makechan(t *chantype, size int64) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
if size < 0 || int64(uintptr(size)) != size || (elem.size > 0 && uintptr(size) > (_MaxMem-hchanSize)/elem.size) {
panic(plainError("makechan: size out of range"))
}
var c *hchan
if elem.kind&kindNoPointers != 0 || size == 0 {
// Allocate memory in one call.
// Hchan does not contain pointers interesting for GC in this case:
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
if size > 0 && elem.size != 0 {
c.buf = add(unsafe.Pointer(c), hchanSize)
} else {
// race detector uses this location for synchronization
// Also prevents us from pointing beyond the allocation (see issue 9401).
c.buf = unsafe.Pointer(c)
}
} else {
c = new(hchan)
c.buf = newarray(elem, int(size))
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
}
return c
}
可以看出來和之前說的select一樣用了 //go:linkname 技巧,把函式關聯到了 reflect包中的對應函式上了,這樣紙就使用反射區出發整個make的入口。騷微過一下reflect包中真正make(chan type, int) 的函式吧
// MakeChan creates a new channel with the specified type and buffer size.
func MakeChan(typ Type, buffer int) Value {
if typ.Kind() != Chan {
panic("reflect.MakeChan of non-chan type")
}
if buffer < 0 {
panic("reflect.MakeChan: negative buffer size")
}
if typ.ChanDir() != BothDir {
panic("reflect.MakeChan: unidirectional channel type")
}
ch := makechan(typ.(*rtype), uint64(buffer))
return Value{typ.common(), ch, flag(Chan)}
}
// 這個才是被runtime中 用 //go:linkname 連結的函式
func makechan(typ *rtype, size uint64) (ch unsafe.Pointer)
劫爭上面繼續說,make中做了什麼:首先兩個 if 主要是一些異常情況的判斷,第三個 if 也很明顯,判斷 size 大小是否小於 0 或者過大。int64(uintptr(size)) != size
這句也是判斷 size 是否為負。
然後接著判斷,如果channel中元素型別不為指標或者channel為無緩衝通道那麼就將其分配在連續的記憶體區域。【使用 mallocgc 函式進行分配記憶體空間】順便看下 mallocgc 函式(這個函式在select那章其實也用到的)的程式碼吧:
// Allocate an object of size bytes.
// Small objects are allocated from the per-P cache's free lists.
// Large objects (> 32 kB) are allocated straight from the heap.
/**
分配大小為位元組的物件。
從每個P快取的空閒列表中分配小物件。
大型物件(> 32 kB)直接從堆中分配。
*/
func mallocgc(size uintptr, typ *_type, needzero bool) unsafe.Pointer {
if gcphase == _GCmarktermination {
throw("mallocgc called with gcphase == _GCmarktermination")
}
if size == 0 {
return unsafe.Pointer(&zerobase)
}
if debug.sbrk != 0 {
align := uintptr(16)
if typ != nil {
align = uintptr(typ.align)
}
return persistentalloc(size, align, &memstats.other_sys)
}
// assistG is the G to charge for this allocation, or nil if
// GC is not currently active.
var assistG *g
if gcBlackenEnabled != 0 {
// Charge the current user G for this allocation.
assistG = getg()
if assistG.m.curg != nil {
assistG = assistG.m.curg
}
// Charge the allocation against the G. We'll account
// for internal fragmentation at the end of mallocgc.
assistG.gcAssistBytes -= int64(size)
if assistG.gcAssistBytes < 0 {
// This G is in debt. Assist the GC to correct
// this before allocating. This must happen
// before disabling preemption.
gcAssistAlloc(assistG)
}
}
// Set mp.mallocing to keep from being preempted by GC.
mp := acquirem()
if mp.mallocing != 0 {
throw("malloc deadlock")
}
if mp.gsignal == getg() {
throw("malloc during signal")
}
mp.mallocing = 1
shouldhelpgc := false
dataSize := size
c := gomcache()
var x unsafe.Pointer
noscan := typ == nil || typ.kind&kindNoPointers != 0
if size <= maxSmallSize {
if noscan && size < maxTinySize {
// Tiny allocator.
//
// Tiny allocator combines several tiny allocation requests
// into a single memory block. The resulting memory block
// is freed when all subobjects are unreachable. The subobjects
// must be noscan (don't have pointers), this ensures that
// the amount of potentially wasted memory is bounded.
//
// Size of the memory block used for combining (maxTinySize) is tunable.
// Current setting is 16 bytes, which relates to 2x worst case memory
// wastage (when all but one subobjects are unreachable).
// 8 bytes would result in no wastage at all, but provides less
// opportunities for combining.
// 32 bytes provides more opportunities for combining,
// but can lead to 4x worst case wastage.
// The best case winning is 8x regardless of block size.
//
// Objects obtained from tiny allocator must not be freed explicitly.
// So when an object will be freed explicitly, we ensure that
// its size >= maxTinySize.
//
// SetFinalizer has a special case for objects potentially coming
// from tiny allocator, it such case it allows to set finalizers
// for an inner byte of a memory block.
//
// The main targets of tiny allocator are small strings and
// standalone escaping variables. On a json benchmark
// the allocator reduces number of allocations by ~12% and
// reduces heap size by ~20%.
off := c.tinyoffset
// Align tiny pointer for required (conservative) alignment.
if size&7 == 0 {
off = round(off, 8)
} else if size&3 == 0 {
off = round(off, 4)
} else if size&1 == 0 {
off = round(off, 2)
}
if off+size <= maxTinySize && c.tiny != 0 {
// The object fits into existing tiny block.
x = unsafe.Pointer(c.tiny + off)
c.tinyoffset = off + size
c.local_tinyallocs++
mp.mallocing = 0
releasem(mp)
return x
}
// Allocate a new maxTinySize block.
span := c.alloc[tinySpanClass]
v := nextFreeFast(span)
if v == 0 {
v, _, shouldhelpgc = c.nextFree(tinySpanClass)
}
x = unsafe.Pointer(v)
(*[2]uint64)(x)[0] = 0
(*[2]uint64)(x)[1] = 0
// See if we need to replace the existing tiny block with the new one
// based on amount of remaining free space.
if size < c.tinyoffset || c.tiny == 0 {
c.tiny = uintptr(x)
c.tinyoffset = size
}
size = maxTinySize
} else {
var sizeclass uint8
if size <= smallSizeMax-8 {
sizeclass = size_to_class8[(size+smallSizeDiv-1)/smallSizeDiv]
} else {
sizeclass = size_to_class128[(size-smallSizeMax+largeSizeDiv-1)/largeSizeDiv]
}
size = uintptr(class_to_size[sizeclass])
spc := makeSpanClass(sizeclass, noscan)
span := c.alloc[spc]
v := nextFreeFast(span)
if v == 0 {
v, span, shouldhelpgc = c.nextFree(spc)
}
x = unsafe.Pointer(v)
if needzero && span.needzero != 0 {
memclrNoHeapPointers(unsafe.Pointer(v), size)
}
}
} else {
var s *mspan
shouldhelpgc = true
systemstack(func() {
s = largeAlloc(size, needzero, noscan)
})
s.freeindex = 1
s.allocCount = 1
x = unsafe.Pointer(s.base())
size = s.elemsize
}
var scanSize uintptr
if !noscan {
// If allocating a defer+arg block, now that we've picked a malloc size
// large enough to hold everything, cut the "asked for" size down to
// just the defer header, so that the GC bitmap will record the arg block
// as containing nothing at all (as if it were unused space at the end of
// a malloc block caused by size rounding).
// The defer arg areas are scanned as part of scanstack.
if typ == deferType {
dataSize = unsafe.Sizeof(_defer{})
}
heapBitsSetType(uintptr(x), size, dataSize, typ)
if dataSize > typ.size {
// Array allocation. If there are any
// pointers, GC has to scan to the last
// element.
if typ.ptrdata != 0 {
scanSize = dataSize - typ.size + typ.ptrdata
}
} else {
scanSize = typ.ptrdata
}
c.local_scan += scanSize
}
// Ensure that the stores above that initialize x to
// type-safe memory and set the heap bits occur before
// the caller can make x observable to the garbage
// collector. Otherwise, on weakly ordered machines,
// the garbage collector could follow a pointer to x,
// but see uninitialized memory or stale heap bits.
publicationBarrier()
// Allocate black during GC.
// All slots hold nil so no scanning is needed.
// This may be racing with GC so do it atomically if there can be
// a race marking the bit.
if gcphase != _GCoff {
gcmarknewobject(uintptr(x), size, scanSize)
}
if raceenabled {
racemalloc(x, size)
}
if msanenabled {
msanmalloc(x, size)
}
mp.mallocing = 0
releasem(mp)
if debug.allocfreetrace != 0 {
tracealloc(x, size, typ)
}
if rate := MemProfileRate; rate > 0 {
if size < uintptr(rate) && int32(size) < c.next_sample {
c.next_sample -= int32(size)
} else {
mp := acquirem()
profilealloc(mp, x, size)
releasem(mp)
}
}
if assistG != nil {
// Account for internal fragmentation in the assist
// debt now that we know it.
assistG.gcAssistBytes -= int64(size - dataSize)
}
if shouldhelpgc {
if t := (gcTrigger{kind: gcTriggerHeap}); t.test() {
gcStart(gcBackgroundMode, t)
}
}
return x
}
否則,在建立chan需要知道資料型別和緩衝區大小。channel 和 channel.buf 是分別進行分配的。對應上面的結構圖 newarray
將生成這個環形佇列。之所以要分開指標型別緩衝區主要是為了區分gc操作,需要將它設定為flagNoScan。並且指標大小固定,可以跟hchan頭部一起分配記憶體,不需要先new(hchan)
再newarry
。
我們再看下 newarray函式:
func newarray(typ *_type, n int) unsafe.Pointer {
if n < 0 || uintptr(n) > maxSliceCap(typ.size) {
panic(plainError("runtime: allocation size out of range"))
}
return mallocgc(typ.size*uintptr(n), typ, true)
}
可以看出來,其實newarray函式底層也是調了 mallocgc 函式來分配記憶體空間的。
總結:make chan 的過程是在堆上進行分配,返回是一個 hchan 的指標。
宣告但不make初始化的chan是nil chan。讀寫nil chan會阻塞,關閉nil chan會panic。
chan的讀寫:
從實現中可見讀寫chan都要lock,這跟讀寫共享記憶體一樣都有lock的開銷。
資料在chan中的傳遞方向從chansend開始從入參最終寫入recvq中的goroutine的資料域,這中間如果發生阻塞可能先寫入sendq中goroutine的資料域等待中轉。
從gopark返回後sudog物件可重用。
首先,我們來看下對應chan的讀寫函式的定義:
sned:
// entry point for c <- x from compiled code
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc(unsafe.Pointer(&c)))
}
/*
* generic single channel send/recv
* If block is not nil,
* then the protocol will not
* sleep but return if it could
* not complete.
*
* sleep can wake up with g.param == nil
* when a channel involved in the sleep has
* been closed. it is easiest to loop and re-run
* the operation; we'll see that it's now closed.
* 通用單通道傳送/接收
* 如果阻塞不是nil,則將不會休眠,但如果無法完成則返回。
* 當睡眠中涉及的通道關閉時,睡眠可以通過g.param == nil喚醒。 最簡單的迴圈和重新執行操作; 我們會
* 看到它現在已經關閉了。
*/
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 當 channel 未初始化或為 nil 時,向其中傳送資料將會永久阻塞
if c == nil {
if !block {
return false
}
// gopark 會使當前 goroutine 休眠,並通過 unlockf 喚醒,但是此時傳入的 unlockf 為 nil, 因此,goroutine 會一直休眠
gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "\n")
}
if raceenabled {
racereadpc(unsafe.Pointer(c), callerpc, funcPC(chansend))
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not closed, we observe that the channel is
// not ready for sending. Each of these observations is a single word-sized read
// (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).
// Because a closed channel cannot transition from 'ready for sending' to
// 'not ready for sending', even if the channel is closed between the two observations,
// they imply a moment between the two when the channel was both not yet closed
// and not ready for sending. We behave as if we observed the channel at that moment,
// and report that the send cannot proceed.
//
// It is okay if the reads are reordered here: if we observe that the channel is not
// ready for sending and then observe that it is not closed, that implies that the
// channel wasn't closed during the first observation.
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 獲取同步鎖
lock(&c.lock)
// 向已經關閉的 channel 傳送訊息會產生 panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// CASE1: 當有 goroutine 在 recv 佇列上等待時,跳過快取佇列,將訊息直接發給 reciever goroutine
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
// 找到了等待receiver。 我們將要傳送的值直接傳遞給receiver,繞過通道緩衝區(如果有的話)。
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// CASE2: 快取佇列未滿,則將訊息複製到快取佇列上
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
if !block {
unlock(&c.lock)
return false
}
// CASE3: 快取佇列已滿,將goroutine 加入 send 佇列
// 初始化 sudog
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.waiting = mysg
gp.param = nil
// 加入佇列
c.sendq.enqueue(mysg)
// 休眠
goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)
// 喚醒 goroutine
// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
return true
}
// send processes a send operation on an empty channel c.
// The value ep sent by the sender is copied to the receiver sg.
// The receiver is then woken up to go on its merry way.
// Channel c must be empty and locked. send unlocks c with unlockf.
// sg must already be dequeued from c.
// ep must be non-nil and point to the heap or the caller's stack.
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if raceenabled {
if c.dataqsiz == 0 {
racesync(c, sg)
} else {
// Pretend we go through the buffer, even though
// we copy directly. Note that we need to increment
// the head/tail locations only when raceenabled.
qp := chanbuf(c, c.recvx)
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
}
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
send 有以下四種情況:【都是對不為nil的chan的情況】
- 向已經close的chan寫資料,拋panic。
- 有 goroutine 阻塞在 channel recv 佇列上,此時快取佇列( hchan.buf)為空(即緩衝區內無元素),直接將訊息傳送給 reciever goroutine,只產生一次複製
- 當 channel 快取佇列( hchan.buf )有剩餘空間時,將資料放到佇列裡,等待接收,接收後總共產生兩次複製
- 當 channel 快取佇列( hchan.buf )已滿時,將當前 goroutine 加入 send 佇列並阻塞。
【第一種情況】:向已經close的chan寫資料,會拋panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
【第二種情況】:從當前 channel 的等待佇列中取出等待的 goroutine,然後呼叫 send。goready 負責喚醒 goroutine
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 看send 部分邏輯
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if raceenabled {
if c.dataqsiz == 0 {
racesync(c, sg)
} else {
// Pretend we go through the buffer, even though
// we copy directly. Note that we need to increment
// the head/tail locations only when raceenabled.
qp := chanbuf(c, c.recvx)
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
}
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
【第三種情況】:通過比較 qcount 和 dataqsiz 來判斷 hchan.buf 是否還有可用空間。除此之後還需要調整一下 sendx 和 qcount
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
【第四種情況】:當 channel 快取佇列( hchan.buf )已滿時,將當前 goroutine 加入 send 佇列並阻塞。
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
// 一些初始化工作
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg) // 當前 goroutine 如等待佇列
goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3) //休眠
receive:
// entry points for <- c from compiled code
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// raceenabled: don't need to check ep, as it is always on the stack
// or is new memory allocated by reflect.
if debugChan {
print("chanrecv: chan=", c, "\n")
}
// 從 nil 的 channel 中接收訊息,永久阻塞
if c == nil {
if !block {
return
}
gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
throw("unreachable")
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not ready for receiving, we observe that the
// channel is not closed. Each of these observations is a single word-sized read
// (first c.sendq.first or c.qcount, and second c.closed).
// Because a channel cannot be reopened, the later observation of the channel
// being not closed implies that it was also not closed at the moment of the
// first observation. We behave as if we observed the channel at that moment
// and report that the receive cannot proceed.
//
// The order of operations is important here: reversing the operations can lead to
// incorrect behavior when racing with a close.
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
// CASE1: 從已經 close 且為空的 channel recv 資料,返回空值
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(unsafe.Pointer(c))
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
// CASE2: send 佇列不為空
// CASE2.a: 快取佇列為空,直接從 sender recv 元素
// CASE2.b: 快取佇列不為空,此時只有可能是快取佇列已滿,從佇列頭取出元素,
//並喚醒 sender 將元素寫入快取佇列尾部。由於為環形佇列,因此,佇列滿時只需要將佇列頭複製給 reciever,
//同時將 sender 元素複製到該位置,並移動佇列頭尾索引,不需要移動佇列元素
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// CASE3: 快取佇列不為空,直接從佇列取元素,移動頭索引
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
if !block {
unlock(&c.lock)
return false, false
}
// CASE4: 快取佇列為空,將 goroutine 加入 recv 佇列,並阻塞
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}
// recv processes a receive operation on a full channel c.
// There are 2 parts:
// 1) The value sent by the sender sg is put into the channel
// and the sender is woken up to go on its merry way.
// 2) The value received by the receiver (the current G) is
// written to ep.
// For synchronous channels, both values are the same.
// For asynchronous channels, the receiver gets its data from
// the channel buffer and the sender's data is put in the
// channel buffer.
// Channel c must be full and locked. recv unlocks c with unlockf.
// sg must already be dequeued from c.
// A non-nil ep must point to the heap or the caller's stack.
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
} else {
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
}
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
receive 有以下四種情況:【都是對不為nil的chan的情況】
- 從已經 close 且為空的 channel recv 資料,返回空值
- 當 send 佇列不為空,分兩種情況:【一】快取佇列為空,直接從 send 佇列的sender中接收資料 元素;【二】快取佇列不為空,此時只有可能是快取佇列已滿,從佇列頭取出元素,並喚醒 sender 將元素寫入快取佇列尾部。由於為環形佇列,因此,佇列滿時只需要將佇列頭複製給 reciever,同時將 sender 元素複製到該位置,並移動佇列頭尾索引,不需要移動佇列元素。【這就是為什麼使用環形佇列的原因】
- 快取佇列不為空,直接從佇列取隊頭元素,移動頭索引。
- 快取佇列為空,將 goroutine 加入 recv 佇列,並阻塞。
【第一種情況】:從 closed channel 接收資料,如果 channel 中還有資料,接著走下面的流程。如果已經沒有資料了,則返回預設值。使用 ok-idiom 方式讀取的時候,第二個引數返回 false。
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(unsafe.Pointer(c))
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
【第二種情況】:當前有send goroutine 阻塞在 channel 上,直接調 recv函式【a】當快取佇列尾空時,直接從 send 佇列的sender中接收資料 元素。【b】快取佇列不為空,此時只有可能是快取佇列已滿,從佇列頭取出元素,並喚醒 sender 將元素寫入快取佇列尾部。同時更改佇列頭索引。
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
} else {
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
}
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
【第三種情況】:buf 中有可用資料。直接從佇列取隊頭元素,移動頭索引。
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
【第四種情況】:buf 為空,將當前 goroutine 加入 recv 佇列並阻塞。
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)
close:
下面,我們來看看關閉通道具體的實現:
//go:linkname reflect_chanclose reflect.chanclose
func reflect_chanclose(c *hchan) {
closechan(c)
}
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
// 重複 close,產生 panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc(unsafe.Pointer(&c))
racewritepc(unsafe.Pointer(c), callerpc, funcPC(closechan))
racerelease(unsafe.Pointer(c))
}
c.closed = 1
var glist *g
// 喚醒所有 reciever
// release all readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, unsafe.Pointer(c))
}
gp.schedlink.set(glist)
glist = gp
}
// 喚醒所有 sender,併產生 panic
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, unsafe.Pointer(c))
}
gp.schedlink.set(glist)
glist = gp
}
unlock(&c.lock)
// 喚醒所喲叜glist中的goroutine
// Ready all Gs now that we've dropped the channel lock.
for glist != nil {
gp := glist
glist = glist.schedlink.ptr()
gp.schedlink = 0
goready(gp, 3)
}
}
close channel 的工作
- 將 c.closed 設定為 1。
- 喚醒 recvq 佇列裡面的阻塞 goroutine
- 喚醒 sendq 佇列裡面的阻塞 goroutine
處理方式是分別遍歷 recvq 和 sendq 佇列,將所有的 goroutine 放到 glist 佇列中,最後喚醒 glist 佇列中的 goroutine。
OK上述就是channel的原始碼分析,我們下面通過幾張圖來看一下chan的工作原理:
send的流程:
send的流程:
close的流程:
以上就是對 chan的底層操作原理及講解。
問chan是否執行緒安全的呢?是執行緒安全的,因為其hchan結構彙總內建了mutex,且send 及 recv 及close 的操作中均會去 加鎖/解鎖 等動作。
到這裡我們對chan的底層講解就結束了!大家手下留情了~