1. 程式人生 > 實用技巧 >go中的sync.pool原始碼剖析

go中的sync.pool原始碼剖析

sync.pool

sync.pool作用

有時候我們為了優化GC的場景,減少並複用記憶體,我們可以使用 sync.Pool 來複用需要頻繁建立臨時物件。

sync.Pool 是一個臨時物件池。一句話來概括,sync.Pool 管理了一組臨時物件, 當需要時從池中獲取,使用完畢後從再放回池中,以供他人使用。

本次探究的go版本go version go1.13.15 darwin/amd64

使用

一個小demo

func main() {
	// 初始化一個pool
	pool := &sync.Pool{
		// 預設的返回值設定,不寫這個引數,預設是nil
		New: func() interface{} {
			return 0
		},
	}

	// 看一下初始的值,這裡是返回0,如果不設定New函式,預設返回nil
	init := pool.Get()
	fmt.Println("初始值", init)

	// 設定一個引數1
	pool.Put(1)

	// 獲取檢視結果
	num := pool.Get()
	fmt.Println("put之後取值", num)

	// 再次獲取,會發現,已經是空的了,只能返回預設的值。
	num = pool.Get()
	fmt.Println("put之後再次取值", num)
}

輸出

初始值 0
put之後取值 1
put之後再次取值 0

適用場景

1、Pool 裡物件的生命週期受 GC 影響,不適合於做連線池,因為連線池需要自己管理物件的生命週期

放入本地池中的值有可能會在任何時候被刪除,但是不通知呼叫者,也有可能被其他的goroutine偷走

2、適用於儲存一些會在goroutine間分享的臨時物件。主要作用是減少GC,提高效能

3、適用於已經申請了記憶體,目前未使用,接下來會使用的記憶體,來緩解GC

4、一些生命週期比較短的不適合使用sync.pool來維護

案例

go中的fmt就是使用到了sync.pool


// Use simple []byte instead of bytes.Buffer to avoid large dependency.
type buffer []byte

func (b *buffer) write(p []byte) {
	*b = append(*b, p...)
}

func (b *buffer) writeString(s string) {
	*b = append(*b, s...)
}

func (b *buffer) writeByte(c byte) {
	*b = append(*b, c)
}

func (bp *buffer) writeRune(r rune) {
	if r < utf8.RuneSelf {
		*bp = append(*bp, byte(r))
		return
	}

	b := *bp
	n := len(b)
	for n+utf8.UTFMax > cap(b) {
		b = append(b, 0)
	}
	w := utf8.EncodeRune(b[n:n+utf8.UTFMax], r)
	*bp = b[:n+w]
}

// pp物件
// pp is used to store a printer's state and is reused with sync.Pool to avoid allocations.
type pp struct {
	buf buffer

	// arg holds the current item, as an interface{}.
	arg interface{}

	// value is used instead of arg for reflect values.
	value reflect.Value

	// fmt is used to format basic items such as integers or strings.
	fmt fmt

	// reordered records whether the format string used argument reordering.
	reordered bool
	// goodArgNum records whether the most recent reordering directive was valid.
	goodArgNum bool
	// panicking is set by catchPanic to avoid infinite panic, recover, panic, ... recursion.
	panicking bool
	// erroring is set when printing an error string to guard against calling handleMethods.
	erroring bool
	// wrapErrs is set when the format string may contain a %w verb.
	wrapErrs bool
	// wrappedErr records the target of the %w verb.
	wrappedErr error
}

// 定義sync.pool
var ppFree = sync.Pool{
	New: func() interface{} { return new(pp) },
}

// 在pool取出一個物件,然後初始化
func newPrinter() *pp {
	p := ppFree.Get().(*pp)
	p.panicking = false
	p.erroring = false
	p.wrapErrs = false
	p.fmt.init(&p.buf)
	return p
}

// Sprintln formats using the default formats for its operands and returns the resulting string.
// Spaces are always added between operands and a newline is appended.
func Sprintln(a ...interface{}) string {
	p := newPrinter()
	p.doPrintln(a)
	s := string(p.buf)
	p.free()
	return s
}

// 歸還sync.pool
func (p *pp) free() {
	// Proper usage of a sync.Pool requires each entry to have approximately
	// the same memory cost. To obtain this property when the stored type
	// contains a variably-sized buffer, we add a hard limit on the maximum buffer
	// to place back in the pool.
	//
	// See https://golang.org/issue/23199
	if cap(p.buf) > 64<<10 {
		return
	}

	p.buf = p.buf[:0]
	p.arg = nil
	p.value = reflect.Value{}
	p.wrappedErr = nil
	ppFree.Put(p)
}

使用的時候在pool中取出一個pp,然後使用完畢之後,歸還到的pool中。

原始碼解讀

pool結構

type Pool struct {
    // 用來標記,當前的 struct 是不能夠被 copy 的
    noCopy noCopy
    // P 個固定大小的 poolLocal 陣列,每個 P 擁有一個空間
    local     unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
    // 上面陣列的大小,即 P 的個數
    localSize uintptr        // size of the local array

    // 同 local 和 localSize,只是在 gc 的過程中保留一次
    victim     unsafe.Pointer // local from previous cycle
    victimSize uintptr        // size of victims array

    // 自定義一個 New 函式,然後可以在 Get 不到東西時,自動建立一個
    New func() interface{}
}
// unsafe.Sizeof(poolLocal{})  // 128 byte(1byte = 8 bits)
// unsafe.Sizeof(poolLocalInternal{})  // 32 byte(1byte = 8 bits)
type poolLocal struct {
	// 每個P對應的pool
	poolLocalInternal

	// 將 poolLocal 補齊至兩個快取行的倍數,防止 false sharing,
	// 每個快取行具有 64 bytes,即 512 bit
	// 目前我們的處理器一般擁有 32 * 1024 / 64 = 512 條快取行
	// 偽共享,僅佔位用,防止在 cache line 上分配多個 poolLocalInternal
	pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

// Local per-P Pool appendix.
type poolLocalInternal struct {
	// private 儲存一個 Put 的資料,pool.Put() 操作優先存入 private,如果private有資訊,才會存入 shared
	private interface{} // Can be used only by the respective P.
	// 儲存一個連結串列,用來維護 pool.Put() 操作加入的資料,每個 P 可以操作自己 shared 連結串列中的頭部,而其他的 P 在用完自己的 shared 時,可能會來偷資料,從而操作連結串列的尾部
	// 本地 P 可以 pushHead/popHead;其他 P 則只能 popTail
	shared  poolChain   // Local P can pushHead/popHead; any P can popTail.
}

noCopy

意思就是不讓copy,是如何實現的呢?

Go中沒有原生的禁止拷貝的方式,所以如果有的結構體,你希望使用者無法拷貝,只能指標傳遞保證全域性唯一的話,可以這麼幹,定義 一個結構體叫 noCopy,實現如下的介面,然後嵌入到你想要禁止拷貝的結構體中,這樣go vet就能檢測出來。

// noCopy may be embedded into structs which must not be copied
// after the first use.
//
// See https://golang.org/issues/8005#issuecomment-190753527
// for details.
type noCopy struct{}

// Lock is a no-op used by -copylocks checker from `go vet`.
func (*noCopy) Lock()   {}
func (*noCopy) Unlock() {}

測試下

type noCopy struct{}

func (*noCopy) Lock()   {}
func (*noCopy) Unlock() {}

type Person struct {
	noCopy noCopy
	name   string
}

// go中的函式傳參都是值拷貝
func test(person Person) {
	fmt.Println(person)
}

func main() {
	var person Person
	test(person)
}

go vet main.go

$ go vet main.go
# command-line-arguments
./main.go:18:18: test passes lock by value: command-line-arguments.Person contains command-line-arguments.noCopy
./main.go:19:14: call of fmt.Println copies lock value: command-line-arguments.Person contains command-line-arguments.noCopy
./main.go:24:7: call of test copies lock value: command-line-arguments.Person contains command-line-arguments.noCopy

使用vet檢測到了不能copy的錯誤

偽共享

[128 - unsafe.Sizeof(poolLocalInternal{})%128]byte

這是來處理偽共享

什麼意思呢?

我們知道從處理訪問到記憶體,中間有好幾級快取,而這些快取的儲存單位是cacheline,也就是說每次從記憶體中載入資料,是以cacheline為單位的,這樣就會存在一個問題,如果程式碼中的變數A和B被分配在了一個cacheline,但是處理器a要修改變數A,處理器b要修改B變數。此時,這個cacheline會被分別載入到a處理器的cache和b處理器的cache,當a修改A時,快取系統會強制使b處理器的cacheline置為無效,同樣當b要修改B時,會強制使得a處理器的cacheline失效,這樣會導致cacheline來回無效,來回從低階的快取載入資料,影響效能。

增加一個 pad,補齊快取行,讓相關的欄位能獨立地載入到快取行就不會出現 false sharding 了。

GET

當從池中獲取物件時,會先從 per-P 的 poolLocal slice 中選取一個 poolLocal,選擇策略遵循:

1、優先從 private 中選擇物件
2、若取不到,則嘗試從 shared 佇列的隊頭進行讀取
3、若取不到,則嘗試從其他的 P 中進行偷取 getSlow
4、若還是取不到,則使用 New 方法新建

func (p *Pool) Get() interface{} {
	if race.Enabled {
		race.Disable()
	}
	// 獲取一個 poolLocal
	l, pid := p.pin()
	// 先從 private 獲取物件,有則立即返回
    // private只儲存了一個物件
	x := l.private
	l.private = nil
	if x == nil {
		// 嘗試從 localPool 的 shared 佇列隊頭讀取,
		// 因為隊頭的記憶體區域性性比隊尾更好。
		x, _ = l.shared.popHead()
		if x == nil {
			// 如果取不到,則獲取新的快取物件
			x = p.getSlow(pid)
		}
	}
	runtime_procUnpin()
	if race.Enabled {
		race.Enable()
		if x != nil {
			race.Acquire(poolRaceAddr(x))
		}
	}
	// 如果取不到,則獲取新的快取物件
	if x == nil && p.New != nil {
		x = p.New()
	}
	return x
}

pin

// pin 會將當前的 goroutine 固定到 P 上,禁用搶佔,並返回 localPool 池以及當前 P 的 pid。
func (p *Pool) pin() (*poolLocal, int) {
	// 返回當前 P.id
	pid := runtime_procPin()
	// In pinSlow we store to local and then to localSize, here we load in opposite order.
	// Since we've disabled preemption, GC cannot happen in between.
	// Thus here we must observe local at least as large localSize.
	// We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness).
	s := atomic.LoadUintptr(&p.localSize) // load-acquire
	l := p.local                          // load-consume
	// 因為可能存在動態的 P(執行時調整 P 的個數)procresize/GOMAXPROCS
	// 如果 P.id 沒有越界,則直接返回
	if uintptr(pid) < s {
		return indexLocal(l, pid), pid
	}
	return p.pinSlow()
}

pin 的作用就是將當前 groutine 和 P 繫結在一起,禁止搶佔。並且返回對應的 poolLocal 以及 P 的 id。

pin() 首先會呼叫執行時實現獲得當前 P 的 id,將 P 設定為禁止搶佔,達到固定當前 goroutine 的目的。

如果 G 被搶佔,則 G 的狀態從 running 變成 runnable,會被放回 P 的 localq 或 globaq,等待下一次排程。下次再執行時,就不一定是和現在的 P 相結合了。因為之後會用到 pid,如果被搶佔了,有可能接下來使用的 pid 與所繫結的 P 並非同一個。

繫結是通過procPin實現的

// src/runtime/proc.go

func procPin() int {
	_g_ := getg()
	mp := _g_.m

	mp.locks++
	return int(mp.p.ptr().id)
}

procPin函式實際上就是先獲取當前goroutine,然後對當前協程繫結的執行緒(即為m)加鎖,即mp.locks++,然後返回m目前繫結的p的id。

系統執行緒對協程協調排程,會涉及到協程之間的搶佔排程,有時候會搶佔當前協程所屬的P,原因就是不能讓一個協程一直佔用資源,搶佔的時候如何判斷是否可以搶佔,
一個重要的條件就是判斷m.locks==0procPin所做的就是禁止當前P被強佔。

pinSlow

var (
	allPoolsMu Mutex
	// allPools 是一組 pool 的集合,具有非空主快取。
	// 有兩種形式來保護它的讀寫:1. allPoolsMu 鎖; 2. STW.
	allPools   []*Pool
)

func (p *Pool) pinSlow() (*poolLocal, int) {
	// 這時取消 P 的禁止搶佔,因為使用 mutex 時候 P 必須可搶佔
	runtime_procUnpin()
	// 加鎖
	allPoolsMu.Lock()
	defer allPoolsMu.Unlock()
	// 當鎖住後,再次固定 P 取其 id
	pid := runtime_procPin()
	// 因為 pinSlow 中途可能已經被其他的執行緒呼叫,因此這時候需要再次對 pid 進行檢查。 如果 pid 在 p.local 大小範圍內,則不用建立 poolLocal 切片,直接返回。
	s := p.localSize
	l := p.local
	if uintptr(pid) < s {
		return indexLocal(l, pid), pid
	}

	// 如果陣列為空,新建
	// 將其新增到 allPools,垃圾回收器從這裡獲取所有 Pool 例項
	if p.local == nil {
		allPools = append(allPools, p)
	}
	// 根據 P 數量建立 slice,如果 GOMAXPROCS 在 GC 間發生變化
	// 我們重新分配此陣列並丟棄舊的
	size := runtime.GOMAXPROCS(0)
	local := make([]poolLocal, size)
	// 將底層陣列起始指標儲存到 p.local,並設定 p.localSize
	atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
	atomic.StoreUintptr(&p.localSize, uintptr(size))         // store-release
	return &local[pid], pid
}

pinSlow() 會首先取消 P 的禁止搶佔,這是因為使用 mutex 時 P 必須為可搶佔的狀態。 然後使用 allPoolsMu 進行加鎖。 當完成加鎖後,再重新固定 P
,取其 pid。注意,因為中途可能已經被其他的執行緒呼叫,因此這時候需要再次對 pid 進行檢查。 如果 pid 在 p.local 大小範圍內,則不再此時建立,直接返回。

如果 p.local 為空,則將 p 扔給 allPools 並在垃圾回收階段回收所有 Pool 例項。 最後再完成對 p.local 的建立(徹底丟棄舊陣列)

getSlow

在取物件的過程中,本地p中的private沒有,並且shared沒有,這時候就會呼叫Pool.getSlow(),嘗試從其他的p中獲取。

func (p *Pool) getSlow(pid int) interface{} {
	// See the comment in pin regarding ordering of the loads.
	size := atomic.LoadUintptr(&p.localSize) // load-acquire
	locals := p.local                        // load-consume
	// 嘗試熊其他的p中偷取
	for i := 0; i < int(size); i++ {
		l := indexLocal(locals, (pid+i+1)%int(size))
		if x, _ := l.shared.popTail(); x != nil {
			return x
		}
	}

	// 嘗試從victim cache中取物件。這發生在嘗試從其他 P 的 poolLocal 偷去失敗後,
	// 因為這樣可以使 victim 中的物件更容易被回收。
	size = atomic.LoadUintptr(&p.victimSize)
	if uintptr(pid) >= size {
		return nil
	}
	locals = p.victim
	l := indexLocal(locals, pid)
	if x := l.private; x != nil {
		l.private = nil
		return x
	}
	for i := 0; i < int(size); i++ {
		l := indexLocal(locals, (pid+i)%int(size))
		if x, _ := l.shared.popTail(); x != nil {
			return x
		}
	}

	// 清空 victim cache。下次就不用再從這裡找了
	atomic.StoreUintptr(&p.victimSize, 0)

	return nil
}

從索引 pid+1 的 poolLocal 處開始,嘗試呼叫 shared.popTail() 獲取快取物件。如果沒有找到就從victim去找。

如果沒有找到,清空 victim cache。對於get來講,如何其他的p也沒找到,就new一個出來。

Put

Put的邏輯就相對簡單了

1、首先呼叫p.pin()搶佔p
2、優先放歸還到private中
3、如果private有值,則放到shared中

// Put adds x to the pool.
func (p *Pool) Put(x interface{}) {
	if x == nil {
		return
	}
	if race.Enabled {
		if fastrand()%4 == 0 {
			// Randomly drop x on floor.
			return
		}
		race.ReleaseMerge(poolRaceAddr(x))
		race.Disable()
	}
	// 獲得一個 localPool
	l, _ := p.pin()
	// 優先寫入 private 變數
	if l.private == nil {
		l.private = x
		x = nil
	}
	// 如果 private 有值,則寫入 shared poolChain 連結串列
	if x != nil {
		l.shared.pushHead(x)
	}
	runtime_procUnpin()
	if race.Enabled {
		race.Enable()
	}
}

總結下這個過程

其實還有點疑惑,上面的victim是如何被保留下來的呢?來分析下pool的垃圾回收機制

poolChain

type poolChain struct {
	// 只有生產者會 push to,不用加鎖
	head *poolChainElt

	// 讀寫需要原子控制。 
	tail *poolChainElt
}

type poolChainElt struct {
	poolDequeue

	// next 被 producer 寫,consumer 讀。所以只會從 nil 變成 non-nil
	// prev 被 consumer 寫,producer 讀。所以只會從 non-nil 變成 nil
	next, prev *poolChainElt
}

// poolDequeue is a lock-free fixed-size single-producer,
// multi-consumer queue. The single producer can both push and pop
// from the head, and consumers can pop from the tail.
//
// It has the added feature that it nils out unused slots to avoid
// unnecessary retention of objects. This is important for sync.Pool,
// but not typically a property considered in the literature.
type poolDequeue struct {
	// headTail packs together a 32-bit head index and a 32-bit
	// tail index. Both are indexes into vals modulo len(vals)-1.
	// headTail  包含一個 32 位的 head 和一個 32 位的 tail 指標。這兩個值都和 len(vals)-1 取模過。
	//
	// tail = index of oldest data in queue 
	// 是佇列中最老的資料
	// head = index of next slot to fill 
	// 指向下一個將要填充的 slot
	//
	// Slots in the range [tail, head) are owned by consumers. slots 
	// Slots的有效範圍是 [tail, head),由 consumers 持有
	// A consumer continues to own a slot outside this range until
	// it nils the slot, at which point ownership passes to the
	// producer.
	//
	// The head index is stored in the most-significant bits so
	// that we can atomically add to it and the overflow is
	// harmless.
	headTail uint64

	// vals is a ring buffer of interface{} values stored in this
	// dequeue. The size of this must be a power of 2.
	//
	// vals[i].typ is nil if the slot is empty and non-nil
	// otherwise. A slot is still in use until *both* the tail
	// index has moved beyond it and typ has been set to nil. This
	// is set to nil atomically by the consumer and read
	// atomically by the producer.
	// vals 是一個儲存 interface{} 的環形佇列,它的 size 必須是 2 的冪
	// 如果 slot 為空,則 vals[i].typ 為空;否則,非空。
	// 一個 slot 在這時宣告無效:tail 不指向它了,vals[i].typ 為 nil
	// 由 consumer 設定成 nil,由 producer 讀
	vals []eface
}

type eface struct {
	typ, val unsafe.Pointer
}

poolDequeue被實現為單生產者、多消費者的固定大小的無鎖(atomic 實現) Ring 式佇列(底層儲存使用陣列,使用兩個指標標記 head、tail)。生產者可以從 head 插入、head 刪除,而消費者僅可從 tail 刪除。

headTail 指向佇列的頭和尾,通過位運算將 head 和 tail 存入 headTail 變數中。

popHead

發生在從本地 shared 佇列中消費並獲取物件(消費者)

func (c *poolChain) popHead() (interface{}, bool) {
	d := c.head
	// d 是一個 poolDequeue,如果 d.popHead 是併發安全的,
	// 那麼這裡取 val 也是併發安全的。若 d.popHead 失敗,則
	// 說明需要重新嘗試。這個過程會持續到整個連結串列為空。
	for d != nil {
		if val, ok := d.popHead(); ok {
			return val, ok
		}
		d = loadPoolChainElt(&d.prev)
	}
	return nil, false
}

拿到頭結點:c.head,是一個 poolDequeue。如果過頭結點不為空呼叫,poolDequeue的popHead方法。

// popHead removes and returns the element at the head of the queue.
// It returns false if the queue is empty. It must only be called by a
// single producer.
func (d *poolDequeue) popHead() (interface{}, bool) {
	var slot *eface
	for {
		ptrs := atomic.LoadUint64(&d.headTail)
		head, tail := d.unpack(ptrs)
		if tail == head {
			// 佇列是空的
			return nil, false
		}

		// head 位置是隊頭的前一個位置,所以此處要先退一位。
		// 在讀出 slot 的 value 之前就把 head 值減 1,取消對這個 slot 的控制
		head--
		ptrs2 := d.pack(head, tail)
		if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
			// 拿到slot
			slot = &d.vals[head&uint32(len(d.vals)-1)]
			break
		}
	}

	val := *(*interface{})(unsafe.Pointer(slot))
	if val == dequeueNil(nil) {
		val = nil
	}
	// 重置 slot,typ 和 val 均為 nil
	// 這裡清空的方式與 popTail 不同,與 pushHead 沒有競爭關係,所以不用太小心
	*slot = eface{}
	return val, true
}

1、判斷是否是空佇列,通過unpack 函式分離出 head 和 tail 指標,如果 head 和 tail 相等,即首尾相等,那麼這個佇列就是空的,直接就返回 nil,false。
2、佇列不為空,就迴圈佇列直到拿到slot。
3、得到相應 slot 的元素後,經過型別轉換並判斷是否是 dequeueNil,如果是,說明沒取到快取的物件,返回 nil。
4、返回val,清空slot。

pushHead

發生在向本地 shared 佇列中放置物件(生產者)

const (
	dequeueBits  = 32
	dequeueLimit = (1 << dequeueBits) / 4
)

func (c *poolChain) pushHead(val interface{}) {
	d := c.head
	// 如果連結串列為空,建立新的連結串列
	if d == nil {
		// 初始化連結串列
		const initSize = 8
		// 固定長度為 8,必須為 2 的指數
		d = new(poolChainElt)
		d.vals = make([]eface, initSize)
		c.head = d
		storePoolChainElt(&c.tail, d)
	}

	// 佇列滿了,pushHead就返回false
	if d.pushHead(val) {
		return
	}

	// 如果滿了,就新建立一個是原來兩倍大小的佇列
	newSize := len(d.vals) * 2
	// 最大的限制
	if newSize >= dequeueLimit {
		// Can't make it any bigger.
		newSize = dequeueLimit
	}

	d2 := &poolChainElt{prev: d}
	d2.vals = make([]eface, newSize)
	c.head = d2
	storePoolChainElt(&d.next, d2)
	d2.pushHead(val)
}

// 將 val 新增到雙端佇列頭部。如果佇列已滿,則返回 false。此函式只能被一個生產者呼叫
func (d *poolDequeue) pushHead(val interface{}) bool {
	ptrs := atomic.LoadUint64(&d.headTail)
	head, tail := d.unpack(ptrs)
	if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
		// 佇列滿了
		return false
	}
	slot := &d.vals[head&uint32(len(d.vals)-1)]

	// 檢測這個 slot 是否被 popTail 釋放
	typ := atomic.LoadPointer(&slot.typ)
	if typ != nil {
		// 另一個 goroutine 正在 popTail 這個 slot,說明佇列仍然是滿的
		return false
	}

	// The head slot is free, so we own it.
	if val == nil {
		val = dequeueNil(nil)
	}
	*(*interface{})(unsafe.Pointer(slot)) = val

	// Increment head. This passes ownership of slot to popTail
	// and acts as a store barrier for writing the slot.
	atomic.AddUint64(&d.headTail, 1<<dequeueBits)
	return true
}

pack/unpack

實現對head和tail` 的讀寫

// 將 head 和 tail 指標從 d.headTail 中分離開來
func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32) {
	const mask = 1<<dequeueBits - 1
	head = uint32((ptrs >> dequeueBits) & mask)
	tail = uint32(ptrs & mask)
	return
}
// 將 head 和 tail 指標打包到 d.headTail 一個 64bit 的變數中
func (d *poolDequeue) pack(head, tail uint32) uint64 {
	const mask = 1<<dequeueBits - 1
	return (uint64(head) << dequeueBits) |
		uint64(tail&mask)
}

popTail

func (c *poolChain) popTail() (interface{}, bool) {
	d := loadPoolChainElt(&c.tail)
	if d == nil {
		return nil, false
	}

	for {
		// d可能是暫時為空,但如果next不為null並且popTail失敗,則d為永久為空,這是唯一的天劍可以安全地將d從鏈中刪除。
		d2 := loadPoolChainElt(&d.next)

		if val, ok := d.popTail(); ok {
			return val, ok
		}

		if d2 == nil {
			// 這是唯一的出隊。它現在是空的,但是將來可能會被推倒。
			return nil, false
		}

		// 雙向連結串列的尾節點裡的雙端佇列被“掏空”,所以繼續看下一個節點。
		// 並且由於尾節點已經被“掏空”,所以要甩掉它。這樣,下次 popHead 就不會再看它有沒有快取物件了。
		if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
			// 甩掉尾節點
			storePoolChainElt(&d2.prev, nil)
		}
		d = d2
	}
}

func (d *poolDequeue) popTail() (interface{}, bool) {
	var slot *eface
	for {
		ptrs := atomic.LoadUint64(&d.headTail)
		head, tail := d.unpack(ptrs)
		// 佇列滿
		if tail == head {
			return nil, false
		}
		ptrs2 := d.pack(head, tail+1)
		if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
			slot = &d.vals[tail&uint32(len(d.vals)-1)]
			break
		}
	}

	val := *(*interface{})(unsafe.Pointer(slot))
	if val == dequeueNil(nil) {
		val = nil
	}

	// 注意:此處可能與 pushHead 發生競爭,解決方案是:
	// 1. 讓 pushHead 先讀取 typ 的值,如果 typ 值不為 nil,則說明 popTail 尚未清理完 slot
	// 2. 讓 popTail 先清理掉 val 中的內容,在清理掉 typ,從而確保不會與 pushHead 對 slot 的寫行為發生競爭
	slot.val = nil
	atomic.StorePointer(&slot.typ, nil)
	return val, true
}

快取的回收

// 將快取清理函式註冊到執行時 GC 時間段
func init() {
	runtime_registerPoolCleanup(poolCleanup)
}

// 由執行時實現
func runtime_registerPoolCleanup(cleanup func())

在 src/runtime/mgc.go 中:

// 開始 GC
func gcStart(trigger gcTrigger) {
	...
	clearpools()
	...
}

// 實現快取清理
func clearpools() {
	// clear sync.Pools
	if poolcleanup != nil {
		poolcleanup()
	}
	...
}

var poolcleanup func()

// 利用編譯器標誌將 sync 包中的清理註冊到執行時
//go:linkname sync_runtime_registerPoolCleanup sync.runtime_registerPoolCleanup
func sync_runtime_registerPoolCleanup(f func()) {
	poolcleanup = f
}

來看下具體的實現

var (
	// allPools 是一組 pool 的集合,具有非空主快取。
	// 有兩種形式來保護它的讀寫:1. allPoolsMu 鎖; 2. STW.
	allPools   []*Pool

	// oldPools 是一組 pool 的集合,具有非空 victim 快取。由 STW 保護
	oldPools []*Pool
)


func poolCleanup() {
	// 該函式會註冊到執行時 GC 階段(前),此時為 STW 狀態,不需要加鎖
	// 它必須不處理分配且不呼叫任何執行時函式。

	// 由於此時是 STW,不存在使用者態程式碼能嘗試讀取 localPool,進而所有的 P 都已固定(與 goroutine 繫結)

	// 從所有的 oldPols 中刪除 victim
	for _, p := range oldPools {
		p.victim = nil
		p.victimSize = 0
	}

	// 將主快取移動到 victim 快取
	for _, p := range allPools {
		p.victim = p.local
		p.victimSize = p.localSize

		p.local = nil
		p.localSize = 0
	}

	// 具有非空主快取的池現在具有非空的 victim 快取,並且沒有任何 pool 具有主快取。
	oldPools, allPools = allPools, nil
}

poolCleanup 會在stw階段的時候被呼叫。主要是刪除oldPools中內容,然後將allPools裡面的內容放入到victim中。最後標記allPools為oldPools。這樣把pool裡內容回收了,有victim進行兜底。

總結

sync.pool拿取快取的過程

一個goroutine會搶佔p,然後炒年糕當前p的private 中選擇物件,如果裡面沒找到,然後嘗試本地p的shared 佇列的隊頭進行讀取,若還是取不到,則嘗試從其他 P 的 shared 佇列隊尾中偷取。 若偷不到,則嘗試從上一個 GC 週期遺留到 victim 快取中取,否則呼叫 New 建立一個新的物件。

對於回收而言,池中所有臨時物件在一次 GC 後會被放入 victim 快取中, 而前一個週期被放入 victim 的快取則會被清理掉。

在加入 victim 機制前,sync.Pool 裡物件的最⼤快取時間是一個 GC 週期,當 GC 開始時,沒有被引⽤的物件都會被清理掉;加入 victim 機制後,最大快取時間為兩個 GC 週期。

當get一個物件使用完成之後,呼叫put歸還的時候,需要注意將裡面的內容清除

Pool 不可以指定⼤⼩,⼤⼩只受制於 GC 臨界值。

參考

【深入Golang之sync.Pool詳解】https://www.cnblogs.com/sunsky303/p/9706210.html
【由淺入深聊聊Golang的sync.Pool】https://juejin.cn/post/6844903903046320136
【Golang sync.Pool原始碼閱讀與分析】https://jiajunhuang.com/articles/2020_05_05-go_sync_pool.md.html
【【Go夜讀】sync.Pool 原始碼閱讀及適用場景分析】https://www.jianshu.com/p/f61bfe89e473
【golang的物件池sync.pool原始碼解讀】https://zhuanlan.zhihu.com/p/99710992
【15.5 快取池】https://golang.design/under-the-hood/zh-cn/part4lib/ch15sync/pool/
【請問sync.Pool有什麼缺點?】https://mp.weixin.qq.com/s?__biz=MzA4ODg0NDkzOA==&mid=2247487149&idx=1&sn=f38f2d72fd7112e19e97d5a2cd304430&source=41#wechat_redirect
【七分鐘讀懂 Go 的臨時物件池pool及其應用場景】https://segmentfault.com/a/1190000016987629
【偽共享(False Sharing)】http://ifeve.com/falsesharing/