Go元件學習——手寫連線池並沒有那麼簡單
1、背景
前段時間在看gorm,發現gorm是複用database/sql的連線池。
於是翻了下database/sql的資料庫連線池的程式碼實現,看完程式碼,好像也不是很複雜,但是總覺得理解不夠深刻,於是萌生了自己想寫個連線池的想法。(最後也驗證了,看原始碼的理解確實不夠深刻,一看就會,一做就跪)
2、連線池的實現原理
什麼是連線池
- 顧名思義是一個池子
- 池子裡面存放有限數量即時可用的連線,減少建立連線和關閉連線的時間
- 連線是有存活時間的
具體到資料庫連線池,我根據自己的理解畫了一張獲取連線的流程圖
從上圖我們可以看出,除了連線池的容量大小,我們還有一個最大連線數的限制。池子裡的連線讓我們不用頻繁的建立和關閉連線,同時應該也要有最大連線的限制,避免無限制的建立連線導致伺服器資源耗盡,拖垮服務不可用。
池子中的連線也有存活時間,如果超過存活時間則會銷燬連線。
3、實現連線池我們需要考慮哪些問題
3.1 功能點
-
獲取連線
-
釋放連線
-
Ping
-
關閉連線池
-
設定最大連線數和連線池容量(連線存活時間等等)
3.2 實現細節
- 連線應該有哪些屬性,比如最大連線數、連線池容量、連線建立時間和存活時間
- 如何模擬使用連線池以及超過最大連線數後等待其他連線釋放
- 如何保證在多協程操作下資料的一致性
- 如果實現連線的超時監聽和通知
4、具體實現
這裡的連線池實現包括
- 設定最大連線數和連線池容量
- 獲取連線
- 釋放連線
4.1 結構定義
定義Conn結構體,這裡包含了幾乎所有的有關連線需要的資訊屬性
type Conn struct { maxConn int // 最大連線數 maxIdle int // 最大可用連線數 freeConn int // 執行緒池空閒連線數 connPool []int // 連線池 openCount int // 已經開啟的連線數 waitConn map[int]chan Permission // 排隊等待的連線佇列 waitCount int // 等待個數 lock sync.Mutex // 鎖 nextConnIndex NextConnIndex // 下一個連線的ID標識(用於區分每個ID) freeConns map[int]Permission // 連線池的連線 }
這裡並不會建立一個真正的資料庫連線,而是使用一個非空的Permission表示拿到了連線。拿到一個非空的Permission才有資格執行後面類似增刪改查的操作。
Permission對應的結構體如下
type Permission struct { NextConnIndex // 對應Conn中的NextConnIndex Content string // 通行證的具體內容,比如"PASSED"表示成功獲取 CreatedAt time.Time // 建立時間,即連線的建立時間 MaxLifeTime time.Duration // 連線的存活時間,本次沒有用到這個屬性,保留 }
NextConnIndex對應的結構體如下
type NextConnIndex struct { Index int }
還有一個用來設定最大連線數以及連線池最大連線數的Config
type Config struct { MaxConn int MaxIdle int }
4.2 初始化連線池引數
func Prepare(ctx context.Context, config *Config) (conn *Conn) { // go func() { //for { //conn.expiredCh = make(chan string, len(conn.freeConns)) //for _, value := range conn.freeConns { // if value.CreatedAt.Add(value.MaxLifeTime).Before(nowFunc()) { // conn.expiredCh <- "CLOSE" // } //} // }() return &Conn{ maxConn: config.MaxConn, maxIdle: config.MaxIdle, openCount: 0, connPool: []int{}, waitConn: make(map[int]chan Permission), waitCount: 0, freeConns: make(map[int]Permission), } }
這裡主要是初始化上面的Conn結構體引數。
註釋的部分,主要想通過啟動一個監聽協程,用於監聽已經過期的連線,並通過channel傳送。(這塊還有一些細節沒有想清楚,先擱置)
4.3 設定MaxConn和MaxIdle
在main.go中新增程式碼
ctx := context.Background() config := &custom_pool.Config{ MaxConn: 2, MaxIdle: 1, }
這裡意味連線池只能快取一個連線,最大新建連線數為2,超過則要加入等待佇列。
4.4 獲取連線
// 建立連線 func (conn *Conn) New(ctx context.Context) (permission Permission, err error) { /** 1、如果當前連線池已滿,即len(freeConns)=0 2、判定openConn是否大於maxConn,如果大於,則丟棄獲取加入佇列進行等待 3、如果小於,則考慮建立新連線 */ conn.lock.Lock() select { default: case <-ctx.Done(): // context取消或超時,則退出 conn.lock.Unlock() return Permission{}, errors.New("new conn failed, context cancelled!") } // 連線池不為空,從連線池獲取連線 if len(conn.freeConns) > 0 { var ( popPermission Permission popReqKey int ) // 獲取其中一個連線 for popReqKey, popPermission = range conn.freeConns { break } // 從連線池刪除 delete(conn.freeConns, popReqKey) fmt.Println("log", "use free conn!!!!!", "openCount: ", conn.openCount, " freeConns: ", conn.freeConns) conn.lock.Unlock() return popPermission, nil } if conn.openCount >= conn.maxConn { // 當前連線數大於上限,則加入等待佇列 nextConnIndex := getNextConnIndex(conn) req := make(chan Permission, 1) conn.waitConn[nextConnIndex] = req conn.waitCount++ conn.lock.Unlock() select { // 如果在等待指定超時時間後,仍然無法獲取釋放連線,則放棄獲取連線,這裡如果不在超時時間後退出會一直阻塞 case <-time.After(time.Second * time.Duration(3)): fmt.Println("超時,通知主執行緒退出") return case ret, ok := <-req: // 有放回的連線, 直接拿來用 if !ok { return Permission{}, errors.New("new conn failed, no available conn release") } fmt.Println("log", "received released conn!!!!!", "openCount: ", conn.openCount, " freeConns: ", conn.freeConns) return ret, nil } return Permission{}, errors.New("new conn failed") } // 新建連線 conn.openCount++ conn.lock.Unlock() permission = Permission{NextConnIndex: NextConnIndex{nextConnIndex}, Content: "PASSED", CreatedAt: nowFunc(), MaxLifeTime: time.Second * 5} fmt.Println("log", "create conn!!!!!", "openCount: ", conn.openCount, " freeConns: ", conn.freeConns) return permission, nil }
這裡主要分為三個部分
-
如果連線池不為空,則直接從池子裡面獲取連線使用即可
-
如果連線池為空,且當前的連線數已經超過最大連線數maxConn,則會將當前任務加入等待佇列,同時監聽是否有釋放的可用連線,如果有則拿來直接用,如果超過指定等待時間後仍然取不到連線則退出阻塞返回。
-
如果連線池為空,且尚未達到最大連線數maxConn,則新建一個新連線。
getNextConnIndex函式
func getNextConnIndex(conn *Conn) int { currentIndex := conn.nextConnIndex.Index conn.nextConnIndex.Index = currentIndex + 1 return conn.nextConnIndex.Index }
4.5 釋放連線
// 釋放連線 func (conn *Conn) Release(ctx context.Context) (result bool, err error) { conn.lock.Lock() // 如果等待佇列有等待任務,則通知正在阻塞等待獲取連線的程序(即New方法中"<-req"邏輯) // 這裡沒有做指定連線的釋放,只是保證釋放的連線會被利用起來 if len(conn.waitConn) > 0 { var req chan Permission var reqKey int for reqKey, req = range conn.waitConn { break } // 假定釋放的連線就是下面新建的連線 permission := Permission{NextConnIndex: NextConnIndex{reqKey}, Content: "PASSED", CreatedAt: nowFunc(), MaxLifeTime: time.Second * 5} req <- permission conn.waitCount-- delete(conn.waitConn, reqKey) conn.lock.Unlock() } else { if conn.openCount > 0 { conn.openCount-- if len(conn.freeConns) < conn.maxIdle { // 確保連線池大小不會超過maxIdle nextConnIndex := getNextConnIndex(conn) permission := Permission{NextConnIndex: NextConnIndex{nextConnIndex}, Content: "PASSED", CreatedAt: nowFunc(), MaxLifeTime: time.Second * 5} conn.freeConns[nextConnIndex] = permission } } conn.lock.Unlock() } return }
這裡主要分為兩部分
- 如果釋放連線的時候發現等待佇列有任務在等待,則將釋放的連線通過channel傳送,給正在等待連線釋放的阻塞任務使用,同時從等待佇列中刪除該任務。
- 如果當前無等待任務,則將連線放入連線池
這裡的nowFunc
var nowFunc = time.Now
5、Case模擬
5.1 無釋放建立連線
即只有建立連線,拿到連線也不會釋放連線
package main import ( "context" custom_pool "go-demo/main/src/custom-pool" ) func main() { ctx := context.Background() config := &custom_pool.Config{ MaxConn: 2, MaxIdle: 1, } conn := custom_pool.Prepare(ctx, config) if _, err := conn.New(ctx); err != nil { return } if _, err := conn.New(ctx); err != nil { return } if _, err := conn.New(ctx); err != nil { return } if _, err := conn.New(ctx); err != nil { return } if _, err := conn.New(ctx); err != nil { return } }
執行結果如下
注意上面程式碼都是一直在獲取連線,在獲取連線後沒有釋放連線。
第一次獲取,連線池為空,則新建連線
第二次獲取,連線池為空,繼續新建連線
第三次獲取,連線池為空,同時已有連線數>=maxConn,所以會阻塞等待釋放連線,但是因為沒有連線釋放,所以一直等待,直到3秒超時後退出。
所以第三次、第四次和第五次都是超時退出
5.2 釋放連線
如果我們釋放連線會怎麼樣,我們可以通過新啟一個協程用於釋放一個連線如下
package main import ( "context" custom_pool "go-demo/main/src/custom-pool" ) func main() { ctx := context.Background() config := &custom_pool.Config{ MaxConn: 2, MaxIdle: 1, } conn := custom_pool.Prepare(ctx, config) if _, err := conn.New(ctx); err != nil { return } if _, err := conn.New(ctx); err != nil { return } go conn.Release(ctx) if _, err := conn.New(ctx); err != nil { return } if _, err := conn.New(ctx); err != nil { return } if _, err := conn.New(ctx); err != nil { return } }
執行結果如下
log create conn!!!!! openCount: 1 freeConns: map[] log create conn!!!!! openCount: 2 freeConns: map[] log received released conn!!!!! openCount: 2 freeConns: map[] 超時,通知主執行緒退出 超時,通知主執行緒退出
前兩次和上面一樣,但是第三次獲取的時候,會收到一個釋放的連線,所以可以直接複用釋放的連線返回。
但是第四次和第五次建立,因為沒有釋放的連線,所以都會因為等待超時後退出。
5.3 使用連線池
上面的兩個case是在MaxConn=2,MaxIdle=1的情況下執行的。
下面我們看看如果基於以上兩個引數設定,模擬出正好使用連線池的情況。
package main import ( "context" custom_pool "go-demo/main/src/custom-pool" ) func main() { ctx := context.Background() config := &custom_pool.Config{ MaxConn: 2, MaxIdle: 1, } conn := custom_pool.Prepare(ctx, config) if _, err := conn.New(ctx); err != nil { return } go conn.Release(ctx) if _, err := conn.New(ctx); err != nil { return } go conn.Release(ctx) if _, err := conn.New(ctx); err != nil { return } go conn.Release(ctx) if _, err := conn.New(ctx); err != nil { return } go conn.Release(ctx) if _, err := conn.New(ctx); err != nil { return } }
即除了第一次,後面都會有連線釋放。
執行結果可能情況如下
log create conn!!!!! openCount: 1 freeConns: map[] log create conn!!!!! openCount: 2 freeConns: map[] log use free conn!!!!! openCount: 1 freeConns: map[] log use free conn!!!!! openCount: 0 freeConns: map[] log create conn!!!!! openCount: 1 freeConns: map[]
從執行結果可以看出,這裡有兩次使用了連線池中的連線。
注意:因為釋放是新啟協程執行,所以無法保證執行順序,不同的執行順序,會有不同的執行結果。上面只是執行結果的一種。
以上完整程式碼參見https://github.com/DMinerJackie/go-demo/tree/master/main/src/custom-pool
6、總結和展望
6.1 總結
- 通過手寫連線池加深對於連線池實現的理解
- 學會使用channel和協程
- 學會如何在channel阻塞指定時間後退出(設立超時時間)
- 學會對於共享資源加鎖,比如nextConnIndex的獲取和更新需要加鎖
6.2 展望
- Close和Ping沒有寫(實現不難)
- 連線池連線需要有存活時間,並在連線過期的時候從連線池刪除
- 實現使用的是普通的map集合,可以考慮併發安全的syncMap
- 程式碼實現比較簡陋不夠優雅,可以繼續完善保證職責單一
如果您覺得閱讀本文對您有幫助,請點一下“推薦”按鈕,您的“推薦”將是我最大的寫作動力!如果您想持續關注我的文章,請掃描二維碼,關注JackieZheng的微信公眾號,我會將我的文章推送給您,並和您一起分享我日常閱讀過的優質文章。