1. 程式人生 > >go核心chan.go檔案分析解讀(二)go是如何排程通道內容(協程)的

go核心chan.go檔案分析解讀(二)go是如何排程通道內容(協程)的

回顧

上文我們分析到了,golang是如何產生一個通道的。

其實很簡單,就像所有的高階語言一樣,我宣告並實現一個物件(雖然go裡不叫物件),並給他的分配相應的資料格式和記憶體空間,這時候物件就存在於我們計算機的記憶體中了(一般都是堆中)。

不可避免的問題

在併發網路的世界裡,有個不可避免的問題,就是鎖。

貼程式碼(寫)

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	if c == nil {
		if !block {
			return false
		}
		gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

	if debugChan {
		print("chansend: chan=", c, "\n")
	}

	if raceenabled {
		racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
	}
	if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
		(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
		return false
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}

	lock(&c.lock)

	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}

	if sg := c.recvq.dequeue(); sg != nil {
		// Found a waiting receiver. We pass the value we want to send
		// directly to the receiver, bypassing the channel buffer (if any).
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}

	if c.qcount < c.dataqsiz {
		// Space is available in the channel buffer. Enqueue the element to send.
		qp := chanbuf(c, c.sendx)
		if raceenabled {
			raceacquire(qp)
			racerelease(qp)
		}
		typedmemmove(c.elemtype, qp, ep)
		c.sendx++
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
		c.qcount++
		unlock(&c.lock)
		return true
	}

	if !block {
		unlock(&c.lock)
		return false
	}
	
	// Block on the channel. Some receiver will complete our operation for us.
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.waiting = mysg
	gp.param = nil
	c.sendq.enqueue(mysg)
	goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)

	// someone woke us up.
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	if gp.param == nil {
		if c.closed == 0 {
			throw("chansend: spurious wakeup")
		}
		panic(plainError("send on closed channel"))
	}
	gp.param = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	mysg.c = nil
	releaseSudog(mysg)
	return true
}

分析(寫)

噢,和我們平常寫程式碼差距不大。

大致就是1.排除錯誤條件 2.開啟日誌 3.邊界 4. 鎖 5. 邏輯 6.釋放資源

其實我們應該從鎖開始觀察就好了。

就是 鎖->寫資料->釋放鎖

所以,我們就具體看些資料就行。

寫資料

emmm,實現

  1. 假如找到接受者的話,直接寫到接受者佇列中

  2. 通道緩衝區中有可用空間,將元素寫入緩衝區

  3. 緩衝區沒有可用空間,沒有可用空間,將當前 goroutine 加入 send 佇列並阻塞,即為同步阻塞。

貼程式碼(讀)

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	// raceenabled: don't need to check ep, as it is always on the stack
	// or is new memory allocated by reflect.

	if debugChan {
		print("chanrecv: chan=", c, "\n")
	}

	if c == nil {
		if !block {
			return
		}
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

	if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
		c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
		atomic.Load(&c.closed) == 0 {
		return
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}

	lock(&c.lock)

	if c.closed != 0 && c.qcount == 0 {
		if raceenabled {
			raceacquire(c.raceaddr())
		}
		unlock(&c.lock)
		if ep != nil {
			typedmemclr(c.elemtype, ep)
		}
		return true, false
	}

	if sg := c.sendq.dequeue(); sg != nil {
		// Found a waiting sender. If buffer is size 0, receive value
		// directly from sender. Otherwise, receive from head of queue
		// and add sender's value to the tail of the queue (both map to
		// the same buffer slot because the queue is full).
		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true, true
	}

	if c.qcount > 0 {
		// Receive directly from queue
		qp := chanbuf(c, c.recvx)
		if raceenabled {
			raceacquire(qp)
			racerelease(qp)
		}
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		typedmemclr(c.elemtype, qp)
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.qcount--
		unlock(&c.lock)
		return true, true
	}

	if !block {
		unlock(&c.lock)
		return false, false
	}

	// no sender available: block on this channel.
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	gp.waiting = mysg
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.param = nil
	c.recvq.enqueue(mysg)
	goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)

	// someone woke us up
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	closed := gp.param == nil
	gp.param = nil
	mysg.c = nil
	releaseSudog(mysg)
	return true, !closed
}

分析(讀)

前幾個邊界,錯誤判斷都差不多

然後讀的順序,好像和寫的是有區別

讀資料

  1. 當 send 佇列不為空,分兩種情況:
    • 快取佇列為空,直接從 send 佇列的sender中接收資料 元素;
    • 快取佇列不為空,此時只有可能是快取佇列已滿,從佇列頭取出元素,並喚醒 sender 將元素寫入快取佇列尾部。由於為環形佇列,因此,佇列滿時只需要將佇列頭複製給 reciever,同時將 sender 元素複製到該位置,並移動佇列頭尾索引,不需要移動佇列元素。【這就是為什麼使用環形佇列的原因】
  2. 緩衝區(佇列)不為空,直接從佇列取隊頭元素,移動頭索引。
  3. 緩衝區(佇列)為空,將 goroutine 加入 recv 佇列,並阻塞。

嗯? 好像就是和寫對稱反過來而已。

鎖!!!

無論是讀還是寫,都是用到了鎖,那麼,同樣是上鎖,為什麼chan的效能就會高呢?

lock(mutex 互斥鎖)欄位是這麼個含義:

lock protects all fields in hchan, as well as several fields in sudogs blocked on this channel. Do not change another G's status while holding this lock (in particular, do not ready a G), as this can deadlock with stack shrinking.

就是一個鎖保護hchan的所有欄位,當已經被上鎖時,不要去改變任何資料,否則會導致死鎖。

至於鎖,那是個大難題,且待下回分解!

系列文章

go核心chan.go檔案分析解讀(一)go是如何構建一個通道的

go核心chan.go檔案分析解讀(二)go是如何排程通道內容(協程)的