1. 程式人生 > >多圖詳解Go的sync.Pool原始碼

多圖詳解Go的sync.Pool原始碼

> 轉載請宣告出處哦~,本篇文章釋出於luozhiyun的部落格:https://www.luozhiyun.com > > 本文使用的go的原始碼時14.4 ### Pool介紹 總所周知Go 是一個自動垃圾回收的程式語言,採用三色併發標記演算法標記物件並回收。如果你想使用 Go 開發一個高效能的應用程式的話,就必須考慮垃圾回收給效能帶來的影響。因為Go 在垃圾回收的時候會有一個STW(stop-the-world,程式暫停)的時間,並且如果物件太多,做標記也需要時間。 所以如果採用物件池來建立物件,增加物件的重複利用率,使用的時候就不必在堆上重新建立物件可以節省開銷。 在Go中,sync.Pool提供了物件池的功能。它對外提供了三個方法:New、Get 和 Put。下面用一個簡短的例子來說明一下Pool使用: ```go var pool *sync.Pool type Person struct { Name string } func init() { pool = &sync.Pool{ New: func() interface{}{ fmt.Println("creating a new person") return new(Person) }, } } func main() { person := pool.Get().(*Person) fmt.Println("Get Pool Object:", person) person.Name = "first" pool.Put(person) fmt.Println("Get Pool Object:",pool.Get().(*Person)) fmt.Println("Get Pool Object:",pool.Get().(*Person)) } ``` 結果: ``` creating a new person Get Pool Object: &{} Get Pool Object: &{first} creating a new person Get Pool Object: &{} ``` 這裡我用了init方法初始化了一個pool,然後get了三次,put了一次到pool中,如果pool中沒有物件,那麼會呼叫New函式建立一個新的物件,否則會重put進去的物件中獲取。 ## 原始碼分析 ```go type Pool struct { noCopy noCopy local unsafe.Pointer localSize uintptr victim unsafe.Pointer victimSize uintptr New func() interface{} } ``` Pool結構體裡面noCopy代表這個結構體是禁止拷貝的,它可以在我們使用 `go vet` 工具的時候生效; local是一個poolLocal陣列的指標,localSize代表這個陣列的大小;同樣victim也是一個poolLocal陣列的指標,每次垃圾回收的時候,Pool 會把 victim 中的物件移除,然後把 local 的資料給 victim;local和victim的邏輯我們下面會詳細介紹到。 New函式是在建立pool的時候設定的,當pool沒有快取物件的時候,會呼叫New方法生成一個新的物件。 下面我們對照著pool的結構圖往下講,避免找不到北: ![Group 25](https://img.luozhiyun.com/20201226184341.png) ```go type poolLocal struct { poolLocalInternal pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte } ``` local欄位儲存的是一個poolLocal陣列的指標,poolLocal陣列大小是goroutine中P的數量,訪問時,P的id對應poolLocal陣列下標索引,所以Pool的最大個數runtime.GOMAXPROCS(0)。 通過這樣的設計,每個P都有了自己的本地空間,多個 goroutine 使用同一個 Pool 時,減少了競爭,提升了效能。如果對goroutine的P、G、M有疑惑的同學不妨看看這篇文章:[The Go scheduler](https://morsmachine.dk/go-scheduler)。 poolLocal裡面有一個pad陣列用來佔位用,防止在 cache line 上分配多個 poolLocalInternal從而造成false sharing,有關於false sharing可以看看這篇文章: [What’s false sharing and how to solve it](https://medium.com/@genchilu/whats-false-sharing-and-how-to-solve-it-using-golang-as-example-ef978a305e10) ,文中對於false sharing的定義: > That’s what false sharing is: one core update a variable would force other cores to update cache either. ```go type poolLocalInternal struct { private interface{} // Can be used only by the respective P. shared poolChain // Local P can pushHead/popHead; any P can popTail. } ``` poolLocalInternal包含兩個欄位private和shared。 private代表快取的一個元素,只能由相應的一個 P 存取。因為一個 P 同時只能執行一個 goroutine,所以不會有併發的問題; shared則可以由任意的 P 訪問,但是隻有本地的 P 才能 pushHead/popHead,其它 P 可以 popTail。 ```go type poolChain struct { head *poolChainElt tail *poolChainElt } type poolChainElt struct { poolDequeue next, prev *poolChainElt } type poolDequeue struct { headTail uint64 vals []eface } ``` poolChain是一個雙端佇列,裡面的head和tail分別指向佇列頭尾;poolDequeue裡面存放真正的資料,是一個單生產者、多消費者的固定大小的無鎖的環狀佇列,headTail是環狀佇列的首位位置的指標,可以通過位運算解析出首尾的位置,生產者可以從 head 插入、head 刪除,而消費者僅可從 tail 刪除。 這個雙端佇列的模型大概是這個樣子: ![Group 26](https://img.luozhiyun.com/20201226184348.png) poolDequeue裡面的環狀佇列大小是固定的,後面分析原始碼我們會看到,當環狀佇列滿了的時候會建立一個size是原來兩倍大小的環狀佇列。大家這張圖好好體會一下,會反覆用到。 ### Get方法 ```go func (p *Pool) Get() interface{} { ... //1.把當前goroutine繫結在當前的P上 l, pid := p.pin() //2.優先從local的private中獲取 x := l.private l.private = nil if x == nil { //3,private沒有,那麼從shared的頭部獲取 x, _ = l.shared.popHead() //4. 如果都沒有,那麼去別的local上去偷一個 if x == nil { x = p.getSlow(pid) } } //解除搶佔 runtime_procUnpin() ... //5. 如果沒有獲取到,嘗試使用New函式生成一個新的 if x == nil && p.New != nil { x = p.New() } return x } ``` * 這一段程式碼首先會將當前goroutine繫結在當前的P上返回對應的local,然後嘗試從local的private中獲取,然後需要把private欄位置空,因為已經拿到了想要的物件; * private中獲取不到,那麼就去shared的頭部獲取; * shared也沒有,那麼嘗試遍歷所有的 local,嘗試從它們的 shared 彈出一個元素; * 最後如果還是沒有,那麼就直接呼叫預先設定好的 New 函式,建立一個出來。 #### pin ```go func (p *Pool) pin() (*poolLocal, int) { pid := runtime_procPin() s := atomic.LoadUintptr(&p.localSize) // load-acquire l := p.local // load-consume if uintptr(pid) < s { return indexLocal(l, pid), pid } return p.pinSlow() } ``` pin方法裡面首先會呼叫runtime_procPin方法會先獲取當前goroutine,然後繫結到對應的M上,然後返回M目前繫結的P的id,因為這個pid後面會用到,防止在使用途中P被搶佔,具體的細節可以看這篇:https://zhuanlan.zhihu.com/p/99710992。 接下來會使用原子操作取出localSize,如果當前pid大於localSize,那麼就表示Pool還沒建立對應的poolLocal,那麼呼叫pinSlow進行建立工作,否則呼叫indexLocal取出pid對應的poolLocal返回。 ```go func indexLocal(l unsafe.Pointer, i int) *poolLocal { lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{})) return (*poolLocal)(lp) } ``` indexLocal裡面是使用了地址操作,傳入的i是陣列的index值,所以需要獲取poolLocal{}的size做一下地址的位移操作,然後再轉成轉成poolLocal地址返回。 #### pinSlow ```go func (p *Pool) pinSlow() (*poolLocal, int) { // 解除pin runtime_procUnpin() // 加上全域性鎖 allPoolsMu.Lock() defer allPoolsMu.Unlock() // pin住 pid := runtime_procPin() s := p.localSize l := p.local // 重新對pid進行檢查 if uintptr(pid) < s { return indexLocal(l, pid), pid } // 初始化local前會將pool放入到allPools陣列中 if p.local == nil { allPools = append(allPools, p) } // 當前P的數量 size := runtime.GOMAXPROCS(0) local := make([]poolLocal, size) atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) atomic.StoreUintptr(&p.localSize, uintptr(size)) return &local[pid], pid } ``` 因為allPoolsMu是一個全域性Mutex鎖,因此上鎖會比較慢可能被阻塞,所以上鎖前呼叫runtime_procUnpin方法解除pin的操作; 在解除繫結後,pinSlow 可能被其他的執行緒呼叫過了,p.local 可能會發生變化。因此這時候需要再次對 pid 進行檢查。 最後初始化local,並使用原子操作對local和localSize設值,返回當前P對應的local。 到這裡pin方法終於講完了。畫一個簡單的圖描述一下這整個流程: ![Group 22](https://img.luozhiyun.com/20201226184352.png) 下面我們再回到Get方法中往下走,程式碼我再貼一遍,以便閱讀: ```go func (p *Pool) Get() interface{} { ... //2.優先從local的private中獲取 x := l.private l.private = nil if x == nil { //3,private沒有,那麼從shared的頭部獲取 x, _ = l.shared.popHead() //4. 如果都沒有,那麼去別的local上去偷一個 if x == nil { x = p.getSlow(pid) } } ... return x } ``` 如果private中沒有值,那麼會呼叫shared的popHead方法獲取值。 #### popHead ```go func (c *poolChain) popHead() (interface{}, bool) { // 這裡頭部是一個poolChainElt d := c.head // 遍歷poolChain連結串列 for d != nil { // 從poolChainElt的環狀列表中獲取值 if val, ok := d.popHead(); ok { return val, ok } // load poolChain下一個物件 d = loadPoolChainElt(&d.prev) } return nil, false } ``` popHead方法裡面會獲取到poolChain的頭結點,不記得poolChain資料結構的同學建議往上面翻一下再回來。 接著有個for迴圈會挨個從poolChain的頭結點往下遍歷,直到獲取物件返回。 ```go func (d *poolDequeue) popHead() (interface{}, bool) { var slot *eface for { ptrs := atomic.LoadUint64(&d.headTail) // headTail的高32位為head,低32位為tail head, tail := d.unpack(ptrs) // 首尾相等,那麼這個佇列就是空的 if tail == head { return nil, false } // 這裡需要head--之後再獲取slot head-- ptrs2 := d.pack(head, tail) if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) { slot = &d.vals[head&uint32(len(d.vals)-1)] break } } val := *(*interface{})(unsafe.Pointer(slot)) // 說明沒取到快取的物件,返回 nil if val == dequeueNil(nil) { val = nil } // 重置slot *slot = eface{} return val, true } ``` * poolDequeue的popHead方法首先會獲取到headTail的值,然後呼叫unpack解包,headTail是一個64位的值,高32位表示head,低32位表示tail。 * 判斷head和tail是否相等,相等那麼這個佇列就是空的; * 如果佇列不是空的,那麼將head減一之後再使用,因為head當前指的位置是空值,表示下一個新物件存放的位置; * CAS重新設值新的headTail,成功之後獲取slot,這裡因為vals大小是2的n 次冪,因此`len(d.vals)-1)`之後低n位全是1,和head取與之後可以獲取到head的低n位的值; * 如果slot所對應的物件是dequeueNil,那麼表示是空值,直接返回,否則將slot指標對應位置的值置空,返回val。 如果shared的popHead方法也沒獲取到值,那麼就需要呼叫getSlow方法獲取了。 #### getSlow ```go func (p *Pool) getSlow(pid int) interface{} { size := atomic.LoadUintptr(&p.localSize) // load-acquire locals := p.local // load-consume // 遍歷locals列表,從其他的local的shared列表尾部獲取物件 for i := 0; i < int(size); i++ { l := indexLocal(locals, (pid+i+1)%int(size)) if x, _ := l.shared.popTail(); x != nil { return x } } size = atomic.LoadUintptr(&p.victimSize) if uintptr(pid) >= size { return nil } locals = p.victim l := indexLocal(locals, pid) // victim的private不為空則返回 if x := l.private; x != nil { l.private = nil return x } // 遍歷victim對應的locals列表,從其他的local的shared列表尾部獲取物件 for i := 0; i < int(size); i++ { l := indexLocal(locals, (pid+i)%int(size)) if x, _ := l.shared.popTail(); x != nil { return x } } // 獲取不到,將victimSize置為0 atomic.StoreUintptr(&p.victimSize, 0) return nil } ``` getSlow方法會遍歷locals列表,這裡需要注意的是,遍歷是從索引為 pid+1 的 poolLocal 處開始,嘗試呼叫shared的popTail方法獲取物件;如果沒有拿到,則從 victim 裡找。如果都沒找到,那麼就將victimSize置為0,下次就不找victim了。 #### poolChain&popTail ```go func (c *poolChain) popTail() (interface{}, bool) { d := loadPoolChainElt(&c.tail) // 如果最後一個節點是空的,那麼直接返回 if d == nil { return nil, false } for { // 這裡獲取的是next節點,與一般的雙向連結串列是相反的 d2 := loadPoolChainElt(&d.next) // 獲取尾部物件 if val, ok := d.popTail(); ok { return val, ok } if d2 == nil { return nil, false } // 因為d已經沒有資料了,所以重置tail為d2,並刪除d2的上一個節點 if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) { storePoolChainElt(&d2.prev, nil) } d = d2 } } ``` * 判斷poolChain,如果最後一個節點是空的,那麼直接返回; * 進入for迴圈,獲取tail的next節點,這裡需要注意的是這個雙向連結串列與一般的連結串列是反向的,不清楚的可以再去看看第一張圖; * 呼叫popTail獲取poolDequeue列表的物件,有物件直接返回; * d2為空則表示已經遍歷完整個poolChain雙向列表了,都為空,那麼直接返回; * 通過CAS將tail重置為d2,因為d已經沒有資料了,並將d2的prev節點置為nil,然後將d置為d2,進入下一個迴圈; #### poolDequeue&popTail ```go func (d *poolDequeue) popTail() (interface{}, bool) { var slot *eface for { ptrs := atomic.LoadUint64(&d.headTail) // 和pophead一樣,將headTail解包 head, tail := d.unpack(ptrs) // 首位相等,表示列表中沒有資料,返回 if tail == head { return nil, false } ptrs2 := d.pack(head, tail+1) // CAS重置tail位置 if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) { // 獲取tail位置物件 slot = &d.vals[tail&uint32(len(d.vals)-1)] break } } val := *(*interface{})(unsafe.Pointer(slot)) // 判斷物件是不是為空 if val == dequeueNil(nil) { val = nil } // 將slot置空 slot.val = nil atomic.StorePointer(&slot.typ, nil) return val, true } ``` 如果看懂了popHead,那麼這個popTail方法是和它非常的相近的。 popTail簡單來說也是從佇列尾部移除一個元素,如果佇列為空,返回 false。但是需要注意的是,這個popTail可能會被多個消費者呼叫,所以需要迴圈CAS獲取物件;在poolDequeue環狀列表中tail是有資料的,不必像popHead中`head--`。 最後,需要將slot置空。 大家可以再對照一下圖回顧一下程式碼: ![Group 39](https://img.luozhiyun.com/20201226184358.png) ### Put方法 ```go func (p *Pool) Put(x interface{}) { if x == nil { return } ... l, _ := p.pin() if l.private == nil { l.private = x x = nil } if x != nil { l.shared.pushHead(x) } runtime_procUnpin() ... } ``` 看完了Get方法,看Put方法就容易多了。同樣Put方法首先會去Pin住當前goroutine和P,然後嘗試將 x 賦值給 private 欄位。如果private不為空,那麼就呼叫pushHead將其放入到shared佇列中。 #### poolChain&pushHead ```go func (c *poolChain) pushHead(val interface{}) { d := c.head // 頭節點沒有初始化,那麼設值一下 if d == nil { const initSize = 8 // Must be a power of 2 d = new(poolChainElt) d.vals = make([]eface, initSize) c.head = d storePoolChainElt(&c.tail, d) } // 將物件加入到環狀佇列中 if d.pushHead(val) { return } newSize := len(d.vals) * 2 // 這裡做了限制,單個環狀佇列不能超過2的30次方大小 if newSize >
= dequeueLimit { newSize = dequeueLimit } // 初始化新的環狀列表,大小是d的兩倍 d2 := &poolChainElt{prev: d} d2.vals = make([]eface, newSize) c.head = d2 storePoolChainElt(&d.next, d2) // push到新的佇列中 d2.pushHead(val) } ``` 如果頭節點為空,那麼需要建立一個新的poolChainElt物件作為頭節點,大小為8;然後呼叫pushHead放入到環狀佇列中; 如果放置失敗,那麼建立一個 poolChainElt 節點,並且雙端佇列的長度翻倍,當然長度也不能超過dequeueLimit,即2的30次方; 然後將新的節點d2和d互相繫結一下,並將d2設值為頭節點,將傳入的物件push到d2中; #### poolDequeue&pushHead ```go func (d *poolDequeue) pushHead(val interface{}) bool { ptrs := atomic.LoadUint64(&d.headTail) // 解包headTail head, tail := d.unpack(ptrs) // 判斷佇列是否已滿 if (tail+uint32(len(d.vals)))&(1<