1. 程式人生 > >golang的臨時物件池sync.Pool

golang的臨時物件池sync.Pool

今天在寫碼之時,發現了同事用到了sync.pool。因不知其因,遂Google之。雖然大概知道其原因和用法。還不能融匯貫通。故寫此記,方便日後查閱。直至明瞭。

正文

在高併發或者大量的資料請求的場景中,我們會遇到很多問題,垃圾回收就是其中之一(garbage collection),為了減少優化GC,我們一般想到的方法就是能夠讓物件得以重用。這就需要一個物件池來儲存待回收物件,等待下次重用,從而減少物件產生數量。我們可以把sync.Pool型別值看作是存放可被重複使用的值的容器。此類容器是自動伸縮的、高效的,同時也是併發安全的。為了描述方便,我們也會把sync.Pool型別的值稱為臨時物件池,而把存於其中的值稱為物件值。這個類設計的目的是用來儲存和複用臨時物件,以減少記憶體分配,降低CG壓力。

我們看下Go的原始碼:

// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
 
package sync
 
import (
	"internal/race"
	"runtime"
	"sync/atomic"
	"unsafe"
)
 
// A Pool is a set of temporary objects that may be individually saved and
// retrieved. // // Any item stored in the Pool may be removed automatically at any time without // notification. If the Pool holds the only reference when this happens, the // item might be deallocated. // // A Pool is safe for use by multiple goroutines simultaneously. // // Pool's purpose is to cache allocated but unused items for later reuse,
// relieving pressure on the garbage collector. That is, it makes it easy to // build efficient, thread-safe free lists. However, it is not suitable for all // free lists. // // An appropriate use of a Pool is to manage a group of temporary items // silently shared among and potentially reused by concurrent independent // clients of a package. Pool provides a way to amortize allocation overhead // across many clients. // // An example of good use of a Pool is in the fmt package, which maintains a // dynamically-sized store of temporary output buffers. The store scales under // load (when many goroutines are actively printing) and shrinks when // quiescent. // // On the other hand, a free list maintained as part of a short-lived object is // not a suitable use for a Pool, since the overhead does not amortize well in // that scenario. It is more efficient to have such objects implement their own // free list. // type Pool struct { local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal localSize uintptr // size of the local array // New optionally specifies a function to generate // a value when Get would otherwise return nil. // It may not be changed concurrently with calls to Get. New func() interface{} } // Local per-P Pool appendix. type poolLocal struct { private interface{} // Can be used only by the respective P. shared []interface{} // Can be used by any P. Mutex // Protects shared. pad [128]byte // Prevents false sharing. } // Put adds x to the pool. func (p *Pool) Put(x interface{}) { if race.Enabled { // Under race detector the Pool degenerates into no-op. // It's conforming, simple and does not introduce excessive // happens-before edges between unrelated goroutines. return } if x == nil { return } l := p.pin() if l.private == nil { l.private = x x = nil } runtime_procUnpin() if x == nil { return } l.Lock() l.shared = append(l.shared, x) l.Unlock() } // Get selects an arbitrary item from the Pool, removes it from the // Pool, and returns it to the caller. // Get may choose to ignore the pool and treat it as empty. // Callers should not assume any relation between values passed to Put and // the values returned by Get. // // If Get would otherwise return nil and p.New is non-nil, Get returns // the result of calling p.New. func (p *Pool) Get() interface{} { if race.Enabled { if p.New != nil { return p.New() } return nil } l := p.pin() x := l.private l.private = nil runtime_procUnpin() if x != nil { return x } l.Lock() last := len(l.shared) - 1 if last >= 0 { x = l.shared[last] l.shared = l.shared[:last] } l.Unlock() if x != nil { return x } return p.getSlow() } func (p *Pool) getSlow() (x interface{}) { // See the comment in pin regarding ordering of the loads. size := atomic.LoadUintptr(&p.localSize) // load-acquire local := p.local // load-consume // Try to steal one element from other procs. pid := runtime_procPin() runtime_procUnpin() for i := 0; i < int(size); i++ { l := indexLocal(local, (pid+i+1)%int(size)) l.Lock() last := len(l.shared) - 1 if last >= 0 { x = l.shared[last] l.shared = l.shared[:last] l.Unlock() break } l.Unlock() } if x == nil && p.New != nil { x = p.New() } return x } // pin pins the current goroutine to P, disables preemption and returns poolLocal pool for the P. // Caller must call runtime_procUnpin() when done with the pool. func (p *Pool) pin() *poolLocal { pid := runtime_procPin() // In pinSlow we store to localSize and then to local, here we load in opposite order. // Since we've disabled preemption, GC can not happen in between. // Thus here we must observe local at least as large localSize. // We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness). s := atomic.LoadUintptr(&p.localSize) // load-acquire l := p.local // load-consume if uintptr(pid) < s { return indexLocal(l, pid) } return p.pinSlow() } func (p *Pool) pinSlow() *poolLocal { // Retry under the mutex. // Can not lock the mutex while pinned. runtime_procUnpin() allPoolsMu.Lock() defer allPoolsMu.Unlock() pid := runtime_procPin() // poolCleanup won't be called while we are pinned. s := p.localSize l := p.local if uintptr(pid) < s { return indexLocal(l, pid) } if p.local == nil { allPools = append(allPools, p) } // If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one. size := runtime.GOMAXPROCS(0) local := make([]poolLocal, size) atomic.StorePointer((*unsafe.Pointer)(&p.local), unsafe.Pointer(&local[0])) // store-release atomic.StoreUintptr(&p.localSize, uintptr(size)) // store-release return &local[pid] } func poolCleanup() { // This function is called with the world stopped, at the beginning of a garbage collection. // It must not allocate and probably should not call any runtime functions. // Defensively zero out everything, 2 reasons: // 1. To prevent false retention of whole Pools. // 2. If GC happens while a goroutine works with l.shared in Put/Get, // it will retain whole Pool. So next cycle memory consumption would be doubled. for i, p := range allPools { allPools[i] = nil for i := 0; i < int(p.localSize); i++ { l := indexLocal(p.local, i) l.private = nil for j := range l.shared { l.shared[j] = nil } l.shared = nil } p.local = nil p.localSize = 0 } allPools = []*Pool{} } var ( allPoolsMu Mutex allPools []*Pool ) func init() { runtime_registerPoolCleanup(poolCleanup) } func indexLocal(l unsafe.Pointer, i int) *poolLocal { return &(*[1000000]poolLocal)(l)[i] } // Implemented in runtime. func runtime_registerPoolCleanup(cleanup func()) func runtime_procPin() int func runtime_procUnpin()

sync.Pool 最常用的兩個函式Get/Put

var pool = &sync.Pool{New:func()interface{}{return NewObject()}}
    pool.Put()
    Pool.Get()

物件池在Get的時候沒有裡面沒有物件會返回nil,所以我們需要New function來確保當獲取物件物件池為空時,重新生成一個物件返回,前者的功能是從池中獲取一個interface{}型別的值,而後者的作用則是把一個interface{}型別的值放置於池中。

// 建立物件
var pool = &sync.Pool{New:func()interface{}{return "Hello,xiequan"}}
// 準備放入的字串
val := "Hello,World!"
// 放入
pool.Put(val)
// 取出
log.Println(pool.Get())
// 再取就沒有了,會自動呼叫NEW
log.Println(pool.Get())

再來看一個例子:

package main
 
import (
    "fmt"
    "runtime"
    "runtime/debug"
    "sync"
    "sync/atomic"
)
 
func main() {
    // 禁用GC,並保證在main函式執行結束前恢復GC
    defer debug.SetGCPercent(debug.SetGCPercent(-1))
    var count int32
    newFunc := func() interface{} {
        return atomic.AddInt32(&count, 1)
    }
    pool := sync.Pool{New: newFunc}
 
    // New 欄位值的作用
    v1 := pool.Get()
    fmt.Printf("v1: %v\n", v1)
 
    // 臨時物件池的存取
    pool.Put(newFunc())
    pool.Put(newFunc())
    pool.Put(newFunc())
    v2 := pool.Get()
    fmt.Printf("v2: %v\n", v2)
 
    // 垃圾回收對臨時物件池的影響
    debug.SetGCPercent(100)
    runtime.GC()
    v3 := pool.Get()
    fmt.Printf("v3: %v\n", v3)
    pool.New = nil
    v4 := pool.Get()
    fmt.Printf("v4: %v\n", v4)
}

通過Get方法獲取到的值是任意的。如果一個臨時物件池的Put方法未被呼叫過,且它的New欄位也未曾被賦予一個非nil的函式值,那麼它的Get方法返回的結果值就一定會是nil。Get方法返回的不一定就是存在於池中的值。不過,如果這個結果值是池中的,那麼在該方法返回它之前就一定會把它從池中刪除掉。

這樣一個臨時物件池在功能上看似與一個通用的快取池相差無幾。但是實際上,臨時物件池本身的特性決定了它是一個“個性”非常鮮明的同步工具。我們在這裡說明它的兩個非常突出的特性。

來看一個syscn.pool和bytes.Buffer使用的例子:

type Dao struct {
	bp       sync.Pool
}
 
func New(c *conf.Config) (d *Dao) {
	d = &Dao{
		bp: sync.Pool{
			New: func() interface{} {
				return &bytes.Buffer{}
			},
		},
	}
	return
}
 
 
func (d *Dao) Infoc(args ...string) (value string, err error) {
	if len(args) == 0 {
		return
	}
	// fetch a buf from bufpool
	buf, ok := d.bp.Get().(*bytes.Buffer)
	if !ok {
		return "", ErrType
	}
	// append first arg
	if _, err := buf.WriteString(args[0]); err != nil {
		return "", err
	}
	for _, arg := range args[1:] {
		// append ,arg
		if _, err := buf.WriteString(defaultSpliter); err != nil {
			return "", err
		}
		if _, err := buf.WriteString(strings.Replace(arg, defaultSpliter, defaultReplacer, -1)); err != nil {
			return "", err
		}
	}
	value = buf.String()
	buf.Reset()
	d.bp.Put(buf)
	return
}

在實現過程中還要特別注意的是Pool本身也是一個物件,要把Pool物件在程式開始的時候初始化為全域性唯一。

物件池使用是較簡單的,但原生的sync.Pool有個較大的問題:我們不能自由控制Pool中元素的數量,放進Pool中的物件每次GC發生時都會被清理掉。這使得sync.Pool做簡單的物件池還可以,但做連線池就有點心有餘而力不足了,比如:在高併發的情景下一旦Pool中的連線被GC清理掉,那每次連線DB都需要重新三次握手建立連線,這個代價就較大了。

總結

物件池的一些適用場景(比如作為臨時且狀態無關的資料的暫存處),以及一些不適用的場景(比如用來存放資料庫連線的例項)。如果我們在做實現技術的選型的時候把臨時物件池作為了候選之一,那麼就應該好好想想它的“個性”是不是符合你的需要。如果真的適合,那麼它的特性一定會為你的程式增光添彩,無論在功能上還是在效能上。而如果它被用在了不恰當的地方,那麼就只能適得其反了

參考文獻: