關於redigo中PubSub的一點小坑分析
前言
最近在用 golang 做一些 redis 相關的操作,選用了 redigo 這個第三方庫。然後在使用 Pub/Sub 的時候,卻發現了一個小坑……
Redis Client
首先,我們來初始化一個帶連線池的 Redis Client:
import ( "github.com/gomodule/redigo/redis" ) type RedisClient struct { pool *redis.Pool } func NewRedisClient(addr string,db int,passwd string) *RedisClient { pool := &redis.Pool{ MaxIdle: 10,IdleTimeout: 300 * time.Second,Dial: func() (redis.Conn,error) { c,err := redis.Dial("tcp",addr,redis.DialPassword(passwd),redis.DialDatabase(db)) if err != nil { return nil,err } return c,nil },TestOnBorrow: func(c redis.Conn,t time.Time) error { if time.Since(t) < time.Minute { return nil } _,err := c.Do("PING") return err },} log.Printf("new redis pool at %s",addr) client := &RedisClient{ pool: pool,} return client }
Publish
然後我們可以簡單的實現一個 publish 方法:
func (r *RedisClient) Publish(channel,message string) (int,error) { c := r.pool.Get() defer c.Close() n,err := redis.Int(c.Do("PUBLISH",channel,message)) if err != nil { return 0,fmt.Errorf("redis publish %s %s,err: %v",message,err) } return n,nil }
Subscribe
接下來就是一個稍微複雜點的帶有心跳的 subscribe 方法:
func (r *RedisClient) Subscribe(ctx context.Context,consume ConsumeFunc,channel ...string) error { psc := redis.PubSubConn{Conn: r.pool.Get()} defer psc.Close() log.Printf("redis pubsub subscribe channel: %v",channel) if err := psc.Subscribe(redis.Args{}.AddFlat(channel)...); err != nil { return err } done := make(chan error,1) // start a new goroutine to receive message go func() { for { switch msg := psc.Receive().(type) { case error: done <- fmt.Errorf("redis pubsub receive err: %v",msg) return case redis.Message: if err := consume(msg); err != nil { done <- err return } case redis.Subscription: if msg.Count == 0 { // all channels are unsubscribed done <- nil return } } } }() // health check tick := time.NewTicker(time.Minute) defer tick.Stop() for { select { case <-ctx.Done(): if err := psc.Unsubscribe(); err != nil { return fmt.Errorf("redis pubsub unsubscribe err: %v",err) } return nil case err := <-done: return err case <-tick.C: if err := psc.Ping(""); err != nil { return err } } } return nil }
最後,我們寫一個簡單地 main 函式來呼叫 publish & subscribe:
func (r *RedisClient) Subscribe(ctx context.Context,err) } return nil case err := <-done: return err case <-tick.C: if err := psc.Ping(""); err != nil { return err } } } return nil }
坑
咋一看之下,好像並沒有什麼異常?然而,如果我們這時候去看 redis 的 tcp 連線,就可以發現一些貓膩:
$sudo netstat -antp | grep redis tcp 0 0 0.0.0.0:6379 0.0.0.0:* LISTEN 940/redis-server 0. tcp 0 0 172.16.8.128:6379 172.16.8.1:55010 ESTABLISHED 940/redis-server 0. tcp 0 0 172.16.8.128:6379 172.16.8.1:55015 ESTABLISHED 940/redis-server 0. tcp 0 0 172.16.8.128:6379 172.16.8.1:55009 ESTABLISHED 940/redis-server 0. tcp 0 0 172.16.8.128:6379 172.16.8.1:55005 ESTABLISHED 940/redis-server 0. tcp 0 0 172.16.8.128:6379 172.16.8.1:55012 ESTABLISHED 940/redis-server 0. tcp 0 0 172.16.8.128:6379 172.16.8.1:55011 ESTABLISHED 940/redis-server 0. tcp 0 0 172.16.8.128:6379 172.16.8.1:55013 ESTABLISHED 940/redis-server 0. tcp 0 0 172.16.8.128:6379 172.16.8.1:55007 ESTABLISHED 940/redis-server 0. tcp 0 0 172.16.8.128:6379 172.16.8.1:55006 ESTABLISHED 940/redis-server 0. tcp 0 0 172.16.8.128:6379 172.16.8.1:55014 ESTABLISHED 940/redis-server 0. tcp 0 0 172.16.8.128:6379 172.16.8.1:54972 ESTABLISHED 940/redis-server 0.
竟然是每一次 subscribe 就新建了一個連線,而 connection pool 似乎沒有什麼作用。
更進一步地除錯,我們發現在 defer psc.Close() 的時候就卡住了,也就是上面的 10 個 goroutine 其實並沒有正常退出。
Concurrent
排查許久之後,終於定位到了問題!引用 redigo 的說明:
Connections support one concurrent caller to the Receive method and one concurrent caller to the Send and Flush methods. No other concurrency is supported including concurrent calls to the Do method.
For full concurrent access to Redis,use the thread-safe Pool to get,use and release a connection from within a goroutine. Connections returned from a Pool have the concurrency restrictions described in the previous paragraph.
也就是說,雖然一個連線可以在不同的 goroutine 併發呼叫 Receive() 和 Subscribe()(subscribe呼叫了send和flush) ,但是卻不能再有其他併發操作(比如 Close())。
其他相似的問題還可以參考 issue
Fix
知道了上面的原因之後,我們稍微修改一下 defer psc.Close() 的位置即可解決問題:
// start a new goroutine to receive message go func() { // IMPORTANT! defer psc.Close() for { switch msg := psc.Receive().(type) { case error:
總結
以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,如果有疑問大家可以留言交流,謝謝大家對我們的支援。