Go語言之並發示例-Pool(一)
這篇文章演示使用有緩沖的通道實現一個資源池,這個資源池可以管理在任意多個goroutine之間共享的資源,比如網絡連接、數據庫連接等,我們在數據庫操作的時候,比較常見的就是數據連接池,也可以基於我們實現的資源池來實現。
可以看出,資源池也是一種非常流暢性的模式,這種模式一般適用於在多個goroutine之間共享資源,每個goroutine可以從資源池裏申請資源,使用完之後再放回資源池裏,以便其他goroutine復用。
好了,老規矩,我們先構建一個資源池結構體,然後再賦予一些方法,這個資源池就可以幫助我們管理資源了。
//一個安全的資源池,被管理的資源必須都實現io.Close接口
type Pool struct {
m sync.Mutex
res chan io.Closer
factory func() (io.Closer,error)
closed bool}
這個結構體Pool有四個字段,其中m是一個互斥鎖,這主要是用來保證在多個goroutine訪問資源時,池內的值是安全的。
res字段是一個有緩沖的通道,用來保存共享的資源,這個通道的大小,在初始化Pool的時候就指定的。註意這個通道的類型是io.Closer接口,所以實現了這個io.Closer接口的類型都可以作為資源,交給我們的資源池管理。
factory這個是一個函數類型,它的作用就是當需要一個新的資源時,可以通過這個函數創建,也就是說它是生成新資源的,至於如何生成、生成什麽資源,是由使用者決定的,所以這也是這個資源池靈活的設計的地方。
closed字段表示資源池是否被關閉,如果被關閉的話,再訪問是會有錯誤的。
現在先這個資源池我們已經定義好了,也知道了每個字段的含義,下面就開時具體使用。剛剛我們說到關閉錯誤,那麽我們就先定義一個資源池已經關閉的錯誤。
var ErrPoolClosed = errors.New("資源池已經關閉。")
非常簡潔,當我們從資源池獲取資源的時候,如果該資源池已經關閉,那麽就會返回這個錯誤。單獨定義它的目的,是和其他錯誤有一個區分,這樣需要的時候,我們就可以從眾多的error類型裏區分出來這個ErrPoolClosed。
下面我們就該為創建Pool專門定一個函數了,這個函數就是工廠函數,我們命名為New。
//創建一個資源池
func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
if size <= 0 {
return nil, errors.New("size的值太小了。")
}
return &Pool{
factory: fn,
res: make(chan io.Closer, size),
}, nil
}
這個函數創建一個資源池,它接收兩個參數,一個fn是創建新資源的函數;還有一個size是指定資源池的大小。
這個函數裏,做了size大小的判斷,起碼它不能小於或者等於 0 ,否則就會返回錯誤。如果參數正常,就會使用size創建一個有緩沖的通道,來保存資源,並且返回一個資源池的指針。
有了創建好的資源池,那麽我們就可以從中獲取資源了。
//從資源池裏獲取一個資源
func (p *Pool) Acquire() (io.Closer,error) {
select {
case r,ok := <-p.res:
log.Println("Acquire:共享資源")
if !ok {
return nil,ErrPoolClosed
}
return r,nil
default:
log.Println("Acquire:新生成資源")
return p.factory()
}
}
Acquire方法可以從資源池獲取資源,如果沒有資源,則調用factory方法生成一個並返回。
這裏同樣使用了select的多路復用,因為這個函數不能阻塞,可以獲取到就獲取,不能就生成一個。
這裏的新知識是通道接收的多參返回,如果可以接收的話,第一參數是接收的值,第二個表示通道是否關閉。例子中如果ok值為false表示通道關閉,如果為true則表示通道正常。所以我們這裏做了一個判斷,如果通道關閉的話,返回通道關閉錯誤。
有獲取資源的方法,必然還有對應的釋放資源的方法,因為資源用完之後,要還給資源池,以便復用。在講解釋放資源的方法前,我們先看下關閉資源池的方法,因為釋放資源的方法也會用到它。
關閉資源池,意味著整個資源池不能再被使用,然後關閉存放資源的通道,同時釋放通道裏的資源。
//關閉資源池,釋放資源
func (p *Pool) Close() {
p.m.Lock()
defer p.m.Unlock()
if p.closed {
return
}
p.closed = true
//關閉通道,不讓寫入了
close(p.res) //關閉通道裏的資源
for r:=range p.res {
r.Close()
}
}
這個方法裏,我們使用了互斥鎖,因為有個標記資源池是否關閉的字段closed需要再多個goroutine操作,所以我們必須保證這個字段的同步。這裏把關閉標誌置為true。
然後我們關閉通道,不讓寫入了,而且我們前面的Acquire也可以感知到通道已經關閉了。同比通道後,就開始釋放通道中的資源,因為所有資源都實現了io.Closer接口,所以我們直接調用Close方法釋放資源即可。
關閉方法有了,我們看看釋放資源的方法如何實現。
func (p *Pool) Release(r io.Closer){
//保證該操作和Close方法的操作是安全的
p.m.Lock()
defer p.m.Unlock()
//資源池都關閉了,就省這一個沒有釋放的資源了,釋放即可
if p.closed {
r.Close()
return
}
select {
case p.res <- r:
log.Println("資源釋放到池子裏了")
default:
log.Println("資源池滿了,釋放這個資源吧")
r.Close()
}
}
釋放資源本質上就會把資源再發送到緩沖通道中,就是這麽簡單,不過為了更安全的實現這個方法,我們使用了互斥鎖,保證closed標誌的安全,而且這個互斥鎖還有一個好處,就是不會往一個已經關閉的通道發送資源。
這是為什麽呢?因為Close和Release這兩個方法是互斥的,Close方法裏對closed標誌的修改,Release方法可以感知到,所以就直接return了,不會執行下面的select代碼了,也就不會往一個已經關閉的通道裏發送資源了。
如果資源池沒有被關閉,則繼續嘗試往資源通道發送資源,如果可以發送,就等於資源又回到資源池裏了;如果發送不了,說明資源池滿了,該資源就無法重新回到資源池裏,那麽我們就把這個需要釋放的資源關閉,拋棄了。
Go語言之並發示例-Pool(一)