go中的sync.pool原始碼剖析
sync.pool
sync.pool作用
有時候我們為了優化GC的場景,減少並複用記憶體,我們可以使用 sync.Pool 來複用需要頻繁建立臨時物件。
sync.Pool
是一個臨時物件池。一句話來概括,sync.Pool 管理了一組臨時物件, 當需要時從池中獲取,使用完畢後從再放回池中,以供他人使用。
本次探究的go版本go version go1.13.15 darwin/amd64
使用
一個小demo
func main() { // 初始化一個pool pool := &sync.Pool{ // 預設的返回值設定,不寫這個引數,預設是nil New: func() interface{} { return 0 }, } // 看一下初始的值,這裡是返回0,如果不設定New函式,預設返回nil init := pool.Get() fmt.Println("初始值", init) // 設定一個引數1 pool.Put(1) // 獲取檢視結果 num := pool.Get() fmt.Println("put之後取值", num) // 再次獲取,會發現,已經是空的了,只能返回預設的值。 num = pool.Get() fmt.Println("put之後再次取值", num) }
輸出
初始值 0
put之後取值 1
put之後再次取值 0
適用場景
1、Pool 裡物件的生命週期受 GC 影響,不適合於做連線池,因為連線池需要自己管理物件的生命週期
放入本地池中的值有可能會在任何時候被刪除,但是不通知呼叫者,也有可能被其他的goroutine偷走
2、適用於儲存一些會在goroutine間分享的臨時物件。主要作用是減少GC,提高效能
3、適用於已經申請了記憶體,目前未使用,接下來會使用的記憶體,來緩解GC
4、一些生命週期比較短的不適合使用sync.pool來維護
案例
go中的fmt就是使用到了sync.pool
// Use simple []byte instead of bytes.Buffer to avoid large dependency. type buffer []byte func (b *buffer) write(p []byte) { *b = append(*b, p...) } func (b *buffer) writeString(s string) { *b = append(*b, s...) } func (b *buffer) writeByte(c byte) { *b = append(*b, c) } func (bp *buffer) writeRune(r rune) { if r < utf8.RuneSelf { *bp = append(*bp, byte(r)) return } b := *bp n := len(b) for n+utf8.UTFMax > cap(b) { b = append(b, 0) } w := utf8.EncodeRune(b[n:n+utf8.UTFMax], r) *bp = b[:n+w] } // pp物件 // pp is used to store a printer's state and is reused with sync.Pool to avoid allocations. type pp struct { buf buffer // arg holds the current item, as an interface{}. arg interface{} // value is used instead of arg for reflect values. value reflect.Value // fmt is used to format basic items such as integers or strings. fmt fmt // reordered records whether the format string used argument reordering. reordered bool // goodArgNum records whether the most recent reordering directive was valid. goodArgNum bool // panicking is set by catchPanic to avoid infinite panic, recover, panic, ... recursion. panicking bool // erroring is set when printing an error string to guard against calling handleMethods. erroring bool // wrapErrs is set when the format string may contain a %w verb. wrapErrs bool // wrappedErr records the target of the %w verb. wrappedErr error } // 定義sync.pool var ppFree = sync.Pool{ New: func() interface{} { return new(pp) }, } // 在pool取出一個物件,然後初始化 func newPrinter() *pp { p := ppFree.Get().(*pp) p.panicking = false p.erroring = false p.wrapErrs = false p.fmt.init(&p.buf) return p } // Sprintln formats using the default formats for its operands and returns the resulting string. // Spaces are always added between operands and a newline is appended. func Sprintln(a ...interface{}) string { p := newPrinter() p.doPrintln(a) s := string(p.buf) p.free() return s } // 歸還sync.pool func (p *pp) free() { // Proper usage of a sync.Pool requires each entry to have approximately // the same memory cost. To obtain this property when the stored type // contains a variably-sized buffer, we add a hard limit on the maximum buffer // to place back in the pool. // // See https://golang.org/issue/23199 if cap(p.buf) > 64<<10 { return } p.buf = p.buf[:0] p.arg = nil p.value = reflect.Value{} p.wrappedErr = nil ppFree.Put(p) }
使用的時候在pool中取出一個pp,然後使用完畢之後,歸還到的pool中。
原始碼解讀
pool結構
type Pool struct {
// 用來標記,當前的 struct 是不能夠被 copy 的
noCopy noCopy
// P 個固定大小的 poolLocal 陣列,每個 P 擁有一個空間
local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
// 上面陣列的大小,即 P 的個數
localSize uintptr // size of the local array
// 同 local 和 localSize,只是在 gc 的過程中保留一次
victim unsafe.Pointer // local from previous cycle
victimSize uintptr // size of victims array
// 自定義一個 New 函式,然後可以在 Get 不到東西時,自動建立一個
New func() interface{}
}
// unsafe.Sizeof(poolLocal{}) // 128 byte(1byte = 8 bits)
// unsafe.Sizeof(poolLocalInternal{}) // 32 byte(1byte = 8 bits)
type poolLocal struct {
// 每個P對應的pool
poolLocalInternal
// 將 poolLocal 補齊至兩個快取行的倍數,防止 false sharing,
// 每個快取行具有 64 bytes,即 512 bit
// 目前我們的處理器一般擁有 32 * 1024 / 64 = 512 條快取行
// 偽共享,僅佔位用,防止在 cache line 上分配多個 poolLocalInternal
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
// Local per-P Pool appendix.
type poolLocalInternal struct {
// private 儲存一個 Put 的資料,pool.Put() 操作優先存入 private,如果private有資訊,才會存入 shared
private interface{} // Can be used only by the respective P.
// 儲存一個連結串列,用來維護 pool.Put() 操作加入的資料,每個 P 可以操作自己 shared 連結串列中的頭部,而其他的 P 在用完自己的 shared 時,可能會來偷資料,從而操作連結串列的尾部
// 本地 P 可以 pushHead/popHead;其他 P 則只能 popTail
shared poolChain // Local P can pushHead/popHead; any P can popTail.
}
noCopy
意思就是不讓copy,是如何實現的呢?
Go中沒有原生的禁止拷貝的方式,所以如果有的結構體,你希望使用者無法拷貝,只能指標傳遞保證全域性唯一的話,可以這麼幹,定義 一個結構體叫 noCopy,實現如下的介面,然後嵌入到你想要禁止拷貝的結構體中,這樣go vet就能檢測出來。
// noCopy may be embedded into structs which must not be copied
// after the first use.
//
// See https://golang.org/issues/8005#issuecomment-190753527
// for details.
type noCopy struct{}
// Lock is a no-op used by -copylocks checker from `go vet`.
func (*noCopy) Lock() {}
func (*noCopy) Unlock() {}
測試下
type noCopy struct{}
func (*noCopy) Lock() {}
func (*noCopy) Unlock() {}
type Person struct {
noCopy noCopy
name string
}
// go中的函式傳參都是值拷貝
func test(person Person) {
fmt.Println(person)
}
func main() {
var person Person
test(person)
}
go vet main.go
$ go vet main.go
# command-line-arguments
./main.go:18:18: test passes lock by value: command-line-arguments.Person contains command-line-arguments.noCopy
./main.go:19:14: call of fmt.Println copies lock value: command-line-arguments.Person contains command-line-arguments.noCopy
./main.go:24:7: call of test copies lock value: command-line-arguments.Person contains command-line-arguments.noCopy
使用vet檢測到了不能copy的錯誤
偽共享
[128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
這是來處理偽共享
什麼意思呢?
我們知道從處理訪問到記憶體,中間有好幾級快取,而這些快取的儲存單位是cacheline,也就是說每次從記憶體中載入資料,是以cacheline為單位的,這樣就會存在一個問題,如果程式碼中的變數A和B被分配在了一個cacheline,但是處理器a要修改變數A,處理器b要修改B變數。此時,這個cacheline會被分別載入到a處理器的cache和b處理器的cache,當a修改A時,快取系統會強制使b處理器的cacheline置為無效,同樣當b要修改B時,會強制使得a處理器的cacheline失效,這樣會導致cacheline來回無效,來回從低階的快取載入資料,影響效能。
增加一個 pad,補齊快取行,讓相關的欄位能獨立地載入到快取行就不會出現 false sharding 了。
GET
當從池中獲取物件時,會先從 per-P 的 poolLocal slice 中選取一個 poolLocal,選擇策略遵循:
1、優先從 private 中選擇物件
2、若取不到,則嘗試從 shared 佇列的隊頭進行讀取
3、若取不到,則嘗試從其他的 P 中進行偷取 getSlow
4、若還是取不到,則使用 New 方法新建
func (p *Pool) Get() interface{} {
if race.Enabled {
race.Disable()
}
// 獲取一個 poolLocal
l, pid := p.pin()
// 先從 private 獲取物件,有則立即返回
// private只儲存了一個物件
x := l.private
l.private = nil
if x == nil {
// 嘗試從 localPool 的 shared 佇列隊頭讀取,
// 因為隊頭的記憶體區域性性比隊尾更好。
x, _ = l.shared.popHead()
if x == nil {
// 如果取不到,則獲取新的快取物件
x = p.getSlow(pid)
}
}
runtime_procUnpin()
if race.Enabled {
race.Enable()
if x != nil {
race.Acquire(poolRaceAddr(x))
}
}
// 如果取不到,則獲取新的快取物件
if x == nil && p.New != nil {
x = p.New()
}
return x
}
pin
// pin 會將當前的 goroutine 固定到 P 上,禁用搶佔,並返回 localPool 池以及當前 P 的 pid。
func (p *Pool) pin() (*poolLocal, int) {
// 返回當前 P.id
pid := runtime_procPin()
// In pinSlow we store to local and then to localSize, here we load in opposite order.
// Since we've disabled preemption, GC cannot happen in between.
// Thus here we must observe local at least as large localSize.
// We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness).
s := atomic.LoadUintptr(&p.localSize) // load-acquire
l := p.local // load-consume
// 因為可能存在動態的 P(執行時調整 P 的個數)procresize/GOMAXPROCS
// 如果 P.id 沒有越界,則直接返回
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
return p.pinSlow()
}
pin 的作用就是將當前 groutine 和 P 繫結在一起,禁止搶佔。並且返回對應的 poolLocal 以及 P 的 id。
pin() 首先會呼叫執行時實現獲得當前 P 的 id,將 P 設定為禁止搶佔,達到固定當前 goroutine 的目的。
如果 G 被搶佔,則 G 的狀態從 running 變成 runnable,會被放回 P 的 localq 或 globaq,等待下一次排程。下次再執行時,就不一定是和現在的 P 相結合了。因為之後會用到 pid,如果被搶佔了,有可能接下來使用的 pid 與所繫結的 P 並非同一個。
繫結是通過procPin實現的
// src/runtime/proc.go
func procPin() int {
_g_ := getg()
mp := _g_.m
mp.locks++
return int(mp.p.ptr().id)
}
procPin函式實際上就是先獲取當前goroutine,然後對當前協程繫結的執行緒(即為m)加鎖,即mp.locks++,然後返回m目前繫結的p的id。
系統執行緒對協程協調排程,會涉及到協程之間的搶佔排程,有時候會搶佔當前協程所屬的P,原因就是不能讓一個協程一直佔用資源,搶佔的時候如何判斷是否可以搶佔,
一個重要的條件就是判斷m.locks==0
。procPin
所做的就是禁止當前P被強佔。
pinSlow
var (
allPoolsMu Mutex
// allPools 是一組 pool 的集合,具有非空主快取。
// 有兩種形式來保護它的讀寫:1. allPoolsMu 鎖; 2. STW.
allPools []*Pool
)
func (p *Pool) pinSlow() (*poolLocal, int) {
// 這時取消 P 的禁止搶佔,因為使用 mutex 時候 P 必須可搶佔
runtime_procUnpin()
// 加鎖
allPoolsMu.Lock()
defer allPoolsMu.Unlock()
// 當鎖住後,再次固定 P 取其 id
pid := runtime_procPin()
// 因為 pinSlow 中途可能已經被其他的執行緒呼叫,因此這時候需要再次對 pid 進行檢查。 如果 pid 在 p.local 大小範圍內,則不用建立 poolLocal 切片,直接返回。
s := p.localSize
l := p.local
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
// 如果陣列為空,新建
// 將其新增到 allPools,垃圾回收器從這裡獲取所有 Pool 例項
if p.local == nil {
allPools = append(allPools, p)
}
// 根據 P 數量建立 slice,如果 GOMAXPROCS 在 GC 間發生變化
// 我們重新分配此陣列並丟棄舊的
size := runtime.GOMAXPROCS(0)
local := make([]poolLocal, size)
// 將底層陣列起始指標儲存到 p.local,並設定 p.localSize
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
atomic.StoreUintptr(&p.localSize, uintptr(size)) // store-release
return &local[pid], pid
}
pinSlow() 會首先取消 P 的禁止搶佔,這是因為使用 mutex 時 P 必須為可搶佔的狀態。 然後使用 allPoolsMu 進行加鎖。 當完成加鎖後,再重新固定 P
,取其 pid。注意,因為中途可能已經被其他的執行緒呼叫,因此這時候需要再次對 pid 進行檢查。 如果 pid 在 p.local 大小範圍內,則不再此時建立,直接返回。
如果 p.local 為空,則將 p 扔給 allPools 並在垃圾回收階段回收所有 Pool 例項。 最後再完成對 p.local 的建立(徹底丟棄舊陣列)
getSlow
在取物件的過程中,本地p中的private沒有,並且shared沒有,這時候就會呼叫Pool.getSlow()
,嘗試從其他的p中獲取。
func (p *Pool) getSlow(pid int) interface{} {
// See the comment in pin regarding ordering of the loads.
size := atomic.LoadUintptr(&p.localSize) // load-acquire
locals := p.local // load-consume
// 嘗試熊其他的p中偷取
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i+1)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// 嘗試從victim cache中取物件。這發生在嘗試從其他 P 的 poolLocal 偷去失敗後,
// 因為這樣可以使 victim 中的物件更容易被回收。
size = atomic.LoadUintptr(&p.victimSize)
if uintptr(pid) >= size {
return nil
}
locals = p.victim
l := indexLocal(locals, pid)
if x := l.private; x != nil {
l.private = nil
return x
}
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// 清空 victim cache。下次就不用再從這裡找了
atomic.StoreUintptr(&p.victimSize, 0)
return nil
}
從索引 pid+1 的 poolLocal 處開始,嘗試呼叫 shared.popTail() 獲取快取物件。如果沒有找到就從victim去找。
如果沒有找到,清空 victim cache。對於get來講,如何其他的p也沒找到,就new一個出來。
Put
Put的邏輯就相對簡單了
1、首先呼叫p.pin()搶佔p
2、優先放歸還到private中
3、如果private有值,則放到shared中
// Put adds x to the pool.
func (p *Pool) Put(x interface{}) {
if x == nil {
return
}
if race.Enabled {
if fastrand()%4 == 0 {
// Randomly drop x on floor.
return
}
race.ReleaseMerge(poolRaceAddr(x))
race.Disable()
}
// 獲得一個 localPool
l, _ := p.pin()
// 優先寫入 private 變數
if l.private == nil {
l.private = x
x = nil
}
// 如果 private 有值,則寫入 shared poolChain 連結串列
if x != nil {
l.shared.pushHead(x)
}
runtime_procUnpin()
if race.Enabled {
race.Enable()
}
}
總結下這個過程
其實還有點疑惑,上面的victim是如何被保留下來的呢?來分析下pool的垃圾回收機制
poolChain
type poolChain struct {
// 只有生產者會 push to,不用加鎖
head *poolChainElt
// 讀寫需要原子控制。
tail *poolChainElt
}
type poolChainElt struct {
poolDequeue
// next 被 producer 寫,consumer 讀。所以只會從 nil 變成 non-nil
// prev 被 consumer 寫,producer 讀。所以只會從 non-nil 變成 nil
next, prev *poolChainElt
}
// poolDequeue is a lock-free fixed-size single-producer,
// multi-consumer queue. The single producer can both push and pop
// from the head, and consumers can pop from the tail.
//
// It has the added feature that it nils out unused slots to avoid
// unnecessary retention of objects. This is important for sync.Pool,
// but not typically a property considered in the literature.
type poolDequeue struct {
// headTail packs together a 32-bit head index and a 32-bit
// tail index. Both are indexes into vals modulo len(vals)-1.
// headTail 包含一個 32 位的 head 和一個 32 位的 tail 指標。這兩個值都和 len(vals)-1 取模過。
//
// tail = index of oldest data in queue
// 是佇列中最老的資料
// head = index of next slot to fill
// 指向下一個將要填充的 slot
//
// Slots in the range [tail, head) are owned by consumers. slots
// Slots的有效範圍是 [tail, head),由 consumers 持有
// A consumer continues to own a slot outside this range until
// it nils the slot, at which point ownership passes to the
// producer.
//
// The head index is stored in the most-significant bits so
// that we can atomically add to it and the overflow is
// harmless.
headTail uint64
// vals is a ring buffer of interface{} values stored in this
// dequeue. The size of this must be a power of 2.
//
// vals[i].typ is nil if the slot is empty and non-nil
// otherwise. A slot is still in use until *both* the tail
// index has moved beyond it and typ has been set to nil. This
// is set to nil atomically by the consumer and read
// atomically by the producer.
// vals 是一個儲存 interface{} 的環形佇列,它的 size 必須是 2 的冪
// 如果 slot 為空,則 vals[i].typ 為空;否則,非空。
// 一個 slot 在這時宣告無效:tail 不指向它了,vals[i].typ 為 nil
// 由 consumer 設定成 nil,由 producer 讀
vals []eface
}
type eface struct {
typ, val unsafe.Pointer
}
poolDequeue
被實現為單生產者、多消費者的固定大小的無鎖(atomic 實現) Ring 式佇列(底層儲存使用陣列,使用兩個指標標記 head、tail)。生產者可以從 head 插入、head 刪除,而消費者僅可從 tail 刪除。
headTail 指向佇列的頭和尾,通過位運算將 head 和 tail 存入 headTail 變數中。
popHead
發生在從本地 shared 佇列中消費並獲取物件(消費者)
func (c *poolChain) popHead() (interface{}, bool) {
d := c.head
// d 是一個 poolDequeue,如果 d.popHead 是併發安全的,
// 那麼這裡取 val 也是併發安全的。若 d.popHead 失敗,則
// 說明需要重新嘗試。這個過程會持續到整個連結串列為空。
for d != nil {
if val, ok := d.popHead(); ok {
return val, ok
}
d = loadPoolChainElt(&d.prev)
}
return nil, false
}
拿到頭結點:c.head,是一個 poolDequeue。如果過頭結點不為空呼叫,poolDequeue的popHead方法。
// popHead removes and returns the element at the head of the queue.
// It returns false if the queue is empty. It must only be called by a
// single producer.
func (d *poolDequeue) popHead() (interface{}, bool) {
var slot *eface
for {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if tail == head {
// 佇列是空的
return nil, false
}
// head 位置是隊頭的前一個位置,所以此處要先退一位。
// 在讀出 slot 的 value 之前就把 head 值減 1,取消對這個 slot 的控制
head--
ptrs2 := d.pack(head, tail)
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
// 拿到slot
slot = &d.vals[head&uint32(len(d.vals)-1)]
break
}
}
val := *(*interface{})(unsafe.Pointer(slot))
if val == dequeueNil(nil) {
val = nil
}
// 重置 slot,typ 和 val 均為 nil
// 這裡清空的方式與 popTail 不同,與 pushHead 沒有競爭關係,所以不用太小心
*slot = eface{}
return val, true
}
1、判斷是否是空佇列,通過unpack 函式分離出 head 和 tail 指標,如果 head 和 tail 相等,即首尾相等,那麼這個佇列就是空的,直接就返回 nil,false。
2、佇列不為空,就迴圈佇列直到拿到slot。
3、得到相應 slot 的元素後,經過型別轉換並判斷是否是 dequeueNil,如果是,說明沒取到快取的物件,返回 nil。
4、返回val,清空slot。
pushHead
發生在向本地 shared 佇列中放置物件(生產者)
const (
dequeueBits = 32
dequeueLimit = (1 << dequeueBits) / 4
)
func (c *poolChain) pushHead(val interface{}) {
d := c.head
// 如果連結串列為空,建立新的連結串列
if d == nil {
// 初始化連結串列
const initSize = 8
// 固定長度為 8,必須為 2 的指數
d = new(poolChainElt)
d.vals = make([]eface, initSize)
c.head = d
storePoolChainElt(&c.tail, d)
}
// 佇列滿了,pushHead就返回false
if d.pushHead(val) {
return
}
// 如果滿了,就新建立一個是原來兩倍大小的佇列
newSize := len(d.vals) * 2
// 最大的限制
if newSize >= dequeueLimit {
// Can't make it any bigger.
newSize = dequeueLimit
}
d2 := &poolChainElt{prev: d}
d2.vals = make([]eface, newSize)
c.head = d2
storePoolChainElt(&d.next, d2)
d2.pushHead(val)
}
// 將 val 新增到雙端佇列頭部。如果佇列已滿,則返回 false。此函式只能被一個生產者呼叫
func (d *poolDequeue) pushHead(val interface{}) bool {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
// 佇列滿了
return false
}
slot := &d.vals[head&uint32(len(d.vals)-1)]
// 檢測這個 slot 是否被 popTail 釋放
typ := atomic.LoadPointer(&slot.typ)
if typ != nil {
// 另一個 goroutine 正在 popTail 這個 slot,說明佇列仍然是滿的
return false
}
// The head slot is free, so we own it.
if val == nil {
val = dequeueNil(nil)
}
*(*interface{})(unsafe.Pointer(slot)) = val
// Increment head. This passes ownership of slot to popTail
// and acts as a store barrier for writing the slot.
atomic.AddUint64(&d.headTail, 1<<dequeueBits)
return true
}
pack/unpack
實現對head和tail` 的讀寫
// 將 head 和 tail 指標從 d.headTail 中分離開來
func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32) {
const mask = 1<<dequeueBits - 1
head = uint32((ptrs >> dequeueBits) & mask)
tail = uint32(ptrs & mask)
return
}
// 將 head 和 tail 指標打包到 d.headTail 一個 64bit 的變數中
func (d *poolDequeue) pack(head, tail uint32) uint64 {
const mask = 1<<dequeueBits - 1
return (uint64(head) << dequeueBits) |
uint64(tail&mask)
}
popTail
func (c *poolChain) popTail() (interface{}, bool) {
d := loadPoolChainElt(&c.tail)
if d == nil {
return nil, false
}
for {
// d可能是暫時為空,但如果next不為null並且popTail失敗,則d為永久為空,這是唯一的天劍可以安全地將d從鏈中刪除。
d2 := loadPoolChainElt(&d.next)
if val, ok := d.popTail(); ok {
return val, ok
}
if d2 == nil {
// 這是唯一的出隊。它現在是空的,但是將來可能會被推倒。
return nil, false
}
// 雙向連結串列的尾節點裡的雙端佇列被“掏空”,所以繼續看下一個節點。
// 並且由於尾節點已經被“掏空”,所以要甩掉它。這樣,下次 popHead 就不會再看它有沒有快取物件了。
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
// 甩掉尾節點
storePoolChainElt(&d2.prev, nil)
}
d = d2
}
}
func (d *poolDequeue) popTail() (interface{}, bool) {
var slot *eface
for {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
// 佇列滿
if tail == head {
return nil, false
}
ptrs2 := d.pack(head, tail+1)
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
slot = &d.vals[tail&uint32(len(d.vals)-1)]
break
}
}
val := *(*interface{})(unsafe.Pointer(slot))
if val == dequeueNil(nil) {
val = nil
}
// 注意:此處可能與 pushHead 發生競爭,解決方案是:
// 1. 讓 pushHead 先讀取 typ 的值,如果 typ 值不為 nil,則說明 popTail 尚未清理完 slot
// 2. 讓 popTail 先清理掉 val 中的內容,在清理掉 typ,從而確保不會與 pushHead 對 slot 的寫行為發生競爭
slot.val = nil
atomic.StorePointer(&slot.typ, nil)
return val, true
}
快取的回收
// 將快取清理函式註冊到執行時 GC 時間段
func init() {
runtime_registerPoolCleanup(poolCleanup)
}
// 由執行時實現
func runtime_registerPoolCleanup(cleanup func())
在 src/runtime/mgc.go 中:
// 開始 GC
func gcStart(trigger gcTrigger) {
...
clearpools()
...
}
// 實現快取清理
func clearpools() {
// clear sync.Pools
if poolcleanup != nil {
poolcleanup()
}
...
}
var poolcleanup func()
// 利用編譯器標誌將 sync 包中的清理註冊到執行時
//go:linkname sync_runtime_registerPoolCleanup sync.runtime_registerPoolCleanup
func sync_runtime_registerPoolCleanup(f func()) {
poolcleanup = f
}
來看下具體的實現
var (
// allPools 是一組 pool 的集合,具有非空主快取。
// 有兩種形式來保護它的讀寫:1. allPoolsMu 鎖; 2. STW.
allPools []*Pool
// oldPools 是一組 pool 的集合,具有非空 victim 快取。由 STW 保護
oldPools []*Pool
)
func poolCleanup() {
// 該函式會註冊到執行時 GC 階段(前),此時為 STW 狀態,不需要加鎖
// 它必須不處理分配且不呼叫任何執行時函式。
// 由於此時是 STW,不存在使用者態程式碼能嘗試讀取 localPool,進而所有的 P 都已固定(與 goroutine 繫結)
// 從所有的 oldPols 中刪除 victim
for _, p := range oldPools {
p.victim = nil
p.victimSize = 0
}
// 將主快取移動到 victim 快取
for _, p := range allPools {
p.victim = p.local
p.victimSize = p.localSize
p.local = nil
p.localSize = 0
}
// 具有非空主快取的池現在具有非空的 victim 快取,並且沒有任何 pool 具有主快取。
oldPools, allPools = allPools, nil
}
poolCleanup
會在stw階段的時候被呼叫。主要是刪除oldPools中內容,然後將allPools裡面的內容放入到victim中。最後標記allPools為oldPools。這樣把pool裡內容回收了,有victim進行兜底。
總結
sync.pool
拿取快取的過程
一個goroutine會搶佔p,然後炒年糕當前p的private 中選擇物件,如果裡面沒找到,然後嘗試本地p的shared 佇列的隊頭進行讀取,若還是取不到,則嘗試從其他 P 的 shared 佇列隊尾中偷取。 若偷不到,則嘗試從上一個 GC 週期遺留到 victim 快取中取,否則呼叫 New 建立一個新的物件。
對於回收而言,池中所有臨時物件在一次 GC 後會被放入 victim 快取中, 而前一個週期被放入 victim 的快取則會被清理掉。
在加入 victim 機制前,sync.Pool 裡物件的最⼤快取時間是一個 GC 週期,當 GC 開始時,沒有被引⽤的物件都會被清理掉;加入 victim 機制後,最大快取時間為兩個 GC 週期。
當get一個物件使用完成之後,呼叫put歸還的時候,需要注意將裡面的內容清除
Pool 不可以指定⼤⼩,⼤⼩只受制於 GC 臨界值。
參考
【深入Golang之sync.Pool詳解】https://www.cnblogs.com/sunsky303/p/9706210.html
【由淺入深聊聊Golang的sync.Pool】https://juejin.cn/post/6844903903046320136
【Golang sync.Pool原始碼閱讀與分析】https://jiajunhuang.com/articles/2020_05_05-go_sync_pool.md.html
【【Go夜讀】sync.Pool 原始碼閱讀及適用場景分析】https://www.jianshu.com/p/f61bfe89e473
【golang的物件池sync.pool原始碼解讀】https://zhuanlan.zhihu.com/p/99710992
【15.5 快取池】https://golang.design/under-the-hood/zh-cn/part4lib/ch15sync/pool/
【請問sync.Pool有什麼缺點?】https://mp.weixin.qq.com/s?__biz=MzA4ODg0NDkzOA==&mid=2247487149&idx=1&sn=f38f2d72fd7112e19e97d5a2cd304430&source=41#wechat_redirect
【七分鐘讀懂 Go 的臨時物件池pool及其應用場景】https://segmentfault.com/a/1190000016987629
【偽共享(False Sharing)】http://ifeve.com/falsesharing/