1. 程式人生 > >Golang-Channel原理解析

Golang-Channel原理解析

本文主要分析golang實現併發基礎元件channel的實現原理;
主要內容分為幾個部分
Section1:channel使用例項分析
Section2:原始碼分析

Golang-Channel原理解析

Section1 channel使用例項

channel主要是為了實現go的併發特性,用於併發通訊的,也就是在不同的協程單元goroutine之間同步通訊。

下面主要從三個方面來講解:

  • make channel,主要也就是hchan的資料結構原型;
  • 傳送和接收資料時,goroutine會怎麼排程;
  • 設計思考;

1.1 make channel

我們建立channel時候有兩種,一種是帶緩衝的channel一種是不帶緩衝的channel。建立方式分別如下:

// buffered
ch := make(chan Task, 3)
// unbuffered
ch := make(chan int)

buffered channel

如果我們建立一個一個帶buffer的channel,底層的資料模型如下圖:
在這裡插入圖片描述

當我們向channel裡面寫入資料時候,會直接把資料存入circular queue(send)。當Queue存滿了之後就會是如下的狀態:
在這裡插入圖片描述

當dequeue一個元素時候,如下所示:
在這裡插入圖片描述

從上圖可知,recvx自增了一個,表示出隊了一個元素,其實也就是迴圈陣列實現FIFO語義。

那麼還有一個問題,當我們新建channel的時候,底層建立的hchan資料結構是在哪裡分配記憶體的呢?其實Section2裡面原始碼分析時候已經做了分析,hchan是在heap裡面分配的。

如下圖所示:
在這裡插入圖片描述

當我們使用make去建立一個channel的時候,實際上返回的是一個指向channel的pointer,所以我們能夠在不同的function之間直接傳遞channel物件,而不用通過指向channel的指標。

1.2 sends and receives

不同goroutine在channel上面進行讀寫時,涉及到的過程比較複雜,比如下圖:
在這裡插入圖片描述

G1會往channel裡面寫入資料,G2會從channel裡面讀取資料。

G1作用於底層hchan的流程如下圖:
在這裡插入圖片描述

  1. 先獲取全域性鎖;
  2. 然後enqueue元素(通過copy的方式);
  3. 釋放鎖;

G2讀取時候作用於底層資料結構流程如下圖所示:
在這裡插入圖片描述

  1. 先獲取全域性鎖;
  2. 然後dequeue元素(通過copy的方式);
  3. 釋放鎖;

上面的讀寫思路其實很簡單,除了hchan資料結構外,不要通過共享記憶體去通訊;而是通過通訊(複製)實現共享記憶體。

寫入滿channel的場景

如下圖所示:channel寫入3個task之後佇列已經滿了,這時候G1再寫入第四個task的時候會發生什麼呢?
在這裡插入圖片描述

G1這時候會暫停直到出現一個receiver。

這個地方需要介紹一下Golang的scheduler的。我們知道goroutine是使用者空間的執行緒,建立和管理interlude都是通過Go的runtime,而不是通過OS的thread。

但是Go的runtime排程執行goroutine卻是基於OS thread的。如下圖:
在這裡插入圖片描述

具體關於golang的scheduler的原理,可以看前面的一篇部落格,關於go的scheduler原理分析。

當向已經滿的channel裡面寫入資料時候,會發生什麼呢?如下圖:
在這裡插入圖片描述

上圖流程大概如下:

  1. 當前goroutine(G1)會呼叫gopark函式,將當前協程置為waiting狀態;
  2. 將M和G1繫結關係斷開;
  3. scheduler會排程另外一個就緒態的goroutine與M建立繫結關係,然後M 會執行另外一個G。

所以整個過程中,OS thread會一直處於執行狀態,不會因為協程G1的阻塞而阻塞。最後當前的G1的引用會存入channel的sender佇列(佇列元素是持有G1的sudog)。

那麼blocked的G1怎麼恢復呢?當有一個receiver接收channel資料的時候,會resume G1。

實際上hchan資料結構也儲存了channel的sender和receiver的等待佇列。資料原型如下:
在這裡插入圖片描述

等待佇列裡面是sudog的單鏈表,sudog持有一個G代表goroutine物件引用,elem代表channel裡面儲存的元素。當G1執行ch<-task4的時候,G1會建立一個sudog然後儲存進入sendq佇列,實際上hchan結構如下圖:
在這裡插入圖片描述

這個時候,如果G1進行一個讀取channel操作,讀取前和讀取後的變化圖如下圖:

在這裡插入圖片描述

整個過程如下:

  1. G2呼叫 t:=<-ch 獲取一個元素;
  2. 從channel的buffer裡面取出一個元素task1;
  3. 從sender等待佇列裡面pop一個sudog;
  4. 將task4複製buffer中task1的位置,然後更新buffer的sendx和recvx索引值;
  5. 這時候需要將G1置為Runable狀態,表示G1可以恢復執行;

這個時候將G1恢復到可執行狀態需要scheduler的參與。G2會呼叫goready(G1)來喚醒G1。流程如下圖所示:
在這裡插入圖片描述

  1. 首先G2會呼叫goready(G1),喚起scheduler的排程;
  2. 將G1設定成Runable狀態;
  3. G1會加入到區域性排程器P的local queue佇列,等待執行。

讀取空channel的場景

當channel的buffer裡面為空時,這時候如果G2首先發起了讀取操作。如下圖:

在這裡插入圖片描述

會建立一個sudog,將代表G2的sudog存入recvq等待佇列。然後G2會呼叫gopark函式進入等待狀態,讓出OS thread,然後G2進入阻塞態。

這個時候,如果有一個G1執行讀取操作,最直觀的流程就是:

  1. 將recvq中的task存入buffer;
  2. goready(G2) 喚醒G2;

但是我們有更加智慧的方法:direct send; 其實也就是G1直接把資料寫入到G2中的elem中,這樣就不用走G2中的elem複製到buffer中,再從buffer複製給G1。如下圖:

在這裡插入圖片描述

具體過程就是G1直接把資料寫入到G2的棧中。這樣 G2 不需要去獲取channel的全域性鎖和操作緩衝。

1.3 channel主要特性

(1)goroutine-safe
hchan mutex

(2)store values, pass in FIFO.
copying into and out of hchan buffer

(3)can cause goroutines to pause and resume.
a)hchan sudog queues
b)calls into the runtime scheduler (gopark, goready)

(4)channel的高效能所在:
a)呼叫runtime scheduler實現,OS thread不需要阻塞;
b)跨goroutine棧可以直接進行讀寫;

Section2 原始碼分析

2.1 channel資料儲存結構

在原始碼runtime/chan.go 裡面定義了channel的資料模型,channel可以理解成一個緩衝佇列,這個緩衝佇列用來儲存元素,並且提供FIFO的語義。原始碼如下:

type hchan struct {
	//channel佇列裡面總的資料量
	qcount   uint           // total data in the queue
	// 迴圈佇列的容量,如果是非緩衝的channel就是0
	dataqsiz uint           // size of the circular queue
	// 緩衝佇列,陣列型別。
	buf      unsafe.Pointer // points to an array of dataqsiz elements
	// 元素佔用位元組的size
	elemsize uint16
	// 當前佇列關閉標誌位,非零表示關閉
	closed   uint32
	// 佇列裡面元素型別
	elemtype *_type // element type
	// 佇列send索引
	sendx    uint   // send index
	// 佇列索引
	recvx    uint   // receive index
	// 等待channel的G佇列。
	recvq    waitq  // list of recv waiters
	// 向channel傳送資料的G佇列。
	sendq    waitq  // list of send waiters

	// lock protects all fields in hchan, as well as several
	// fields in sudogs blocked on this channel.
	//
	// Do not change another G's status while holding this lock
	// (in particular, do not ready a G), as this can deadlock
	// with stack shrinking.
	// 全域性鎖
	lock mutex
}

channel的資料結構相對比較簡單,主要是兩個結構:
1)一個數組實現的環形佇列,陣列有兩個下標索引分別表示讀寫的索引,用於儲存channel緩衝區資料。
2)channel的send和recv佇列,佇列裡面都是持有goroutine的sudog元素,佇列都是雙鏈表實現的。
3)channel的全域性鎖。

2.2 make channel

我們新建一個channel的時候一般使用 make(chan, n) 語句,這個語句的執行編譯器會重寫然後執行 chan.go裡面的 makechan函式。函式原始碼如下:

func makechan(t *chantype, size int) *hchan {
	elem := t.elem

	// compiler checks this but be safe.
	if elem.size >= 1<<16 {
		throw("makechan: invalid channel element type")
	}
	if hchanSize%maxAlign != 0 || elem.align > maxAlign {
		throw("makechan: bad alignment")
	}

	if size < 0 || uintptr(size) > maxSliceCap(elem.size) || uintptr(size)*elem.size > maxAlloc-hchanSize {
		panic(plainError("makechan: size out of range"))
	}

	// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
	// buf points into the same allocation, elemtype is persistent.
	// SudoG's are referenced from their owning thread so they can't be collected.
	// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
	var c *hchan
	switch {
	case size == 0 || elem.size == 0:
		// Queue or element size is zero.
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		// Race detector uses this location for synchronization.
		c.buf = unsafe.Pointer(c)
	case elem.kind&kindNoPointers != 0:
		// Elements do not contain pointers.
		// Allocate hchan and buf in one call.
		c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default:
		// Elements contain pointers.
		c = new(hchan)
		c.buf = mallocgc(uintptr(size)*elem.size, elem, true)
	}

	c.elemsize = uint16(elem.size)
	c.elemtype = elem
	c.dataqsiz = uint(size)

	if debugChan {
		print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
	}
	return c
}

函式接收兩個引數,一個是channel裡面儲存的元素的資料型別,一個是緩衝的容量(如果為0表示是非緩衝buffer),建立流程如下:

  • 根據傳遞的緩衝大小size是否為零,分別建立不帶buffer的channel或則帶size大小的緩衝channel:
    • 對於不帶緩衝channel,申請一個hchan資料結構的記憶體大小;
    • 對於帶緩衝channel,new一個hchan物件,並初始化buffer記憶體;
  • 更新 chan中迴圈佇列的關鍵屬性:elemsize、elemtype、dataqsiz。

所以,整個建立channel的過程還是比較簡單的。

2.3 協程從channel接收資料(goroutine receive data)

所有執行 ep < c 使用ep接收channel資料的程式碼,最後都會呼叫到chan.go裡面的 chanrecv函式

函式的定義如下:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
......
}

從原始碼註釋就可以知道,該函式從channel裡面接收資料,然後將接收到的資料寫入到ep指標指向的物件裡面。

還有一個引數block,表示當channel無法返回資料時是否阻塞等待。當block=false並且channel裡面沒有資料時候,函式直接返回(false,false)。

接收channel的資料的流程如下:

  • CASE1:前置channel為nil的場景:
    • 如果block為非阻塞,直接return;
    • 如果block為阻塞,就呼叫gopark()阻塞當前goroutine,並丟擲異常。
  • 前置場景,block為非阻塞,且channel為非緩衝佇列且sender等待佇列為空 或則 channel為有緩衝佇列但是佇列裡面元素數量為0,且channel未關閉,這個時候直接return;
  • 呼叫 lock(&c.lock) 鎖住channel的全域性鎖;
  • CASE2:channel已經被關閉且channel緩衝中沒有資料了,這時直接返回success和空值;
  • CASE3:sender佇列非空,呼叫func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) 函式處理:
    • channel是非緩衝channel,直接呼叫recvDirect函式直接從sender recv元素到ep物件,這樣就只用複製一次;
    • 對於sender佇列非空情況下, 有緩衝的channel的緩衝佇列一定是滿的:
      • 1.先取channel緩衝佇列的對頭元素複製給receiver(也就是ep);
      • 2.將sender佇列的對頭元素裡面的資料複製到channel緩衝佇列剛剛彈出的元素的位置,這樣緩衝佇列就不用移動資料了。
    • 釋放channel的全域性鎖;
    • 呼叫goready函式標記當前goroutine處於ready,可以執行的狀態;
  • CASE4:sender佇列為空,緩衝佇列非空,直接取佇列元素,移動頭索引;
  • CASE5:sender佇列為空、緩衝佇列也沒有元素且不阻塞協程,直接return (false,false);
  • CASE6:sender佇列為空且channel的快取佇列為空,將goroutine加入recv佇列,並阻塞。

2.4 協程向channel寫入資料(goroutine sender data)

所有執行 c < ep 將ep傳送到channel的程式碼,最後都會呼叫到chan.go裡面的 chansend函式

函式的定義如下:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
......
}

函式有三個引數,第一個代表channel的資料結構,第二個是要指向寫入的資料的指標,第三個block代表寫入操作是否阻塞。

向channel寫入資料主要流程如下:

  • CASE1:當channel為空或者未初始化,如果block表示阻塞那麼向其中傳送資料將會永久阻塞;如果block表示非阻塞就會直接return;
  • CASE2:前置場景,block為非阻塞,且channel沒有關閉(已關閉的channel不能寫入資料)且(channel為非緩衝佇列且receiver等待佇列為空)或則( channel為有緩衝佇列但是佇列已滿),這個時候直接return;
  • 呼叫 lock(&c.lock) 鎖住channel的全域性鎖;
  • CASE3:不能向已經關閉的channel send資料,會導致panic。
  • CASE4:如果channel上的recv佇列非空,則跳過channel的快取佇列,直接向訊息傳送給接收的goroutine:
    • 呼叫sendDirect方法,將待寫入的訊息傳送給接收的goroutine;
    • 釋放channel的全域性鎖;
    • 呼叫goready函式,將接收訊息的goroutine設定成就緒狀態,等待排程。
  • CASE5:快取佇列未滿,則將訊息複製到快取佇列上,然後釋放全域性鎖;
  • CASE6:快取佇列已滿且接收訊息佇列recv為空,則將當前的goroutine加入到send佇列;
    • 獲取當前goroutine的sudog,然後入channel的send佇列;
    • 將當前goroutine休眠

2.5 channel close關閉channel原始碼分析

當我們執行channel的close操作的時候會關閉channel。

關閉的主要流程如下所示:

  • 獲取全域性鎖;
  • 設定channel資料結構chan的關閉標誌位;
  • 獲取當前channel上面的讀goroutine並連結成連結串列;
  • 獲取當前channel上面的寫goroutine然後拼接到前面的讀連結串列後面;
  • 釋放全域性鎖;
  • 喚醒所有的讀寫goroutine。