玩轉redis-延時訊息佇列
阿新 • • 發佈:2020-04-14
上一篇基於redis
的list實現了一個簡單的訊息佇列:玩轉redis-簡單訊息佇列
原始碼地址 使用demo
產品經理經常說的一句話,我們不光要有X
功能,還要Y
功能,這樣客戶才能更滿意。同樣的,只有簡單訊息佇列是不夠的,還要有延時訊息佇列
才能算是一個完整的訊息佇列。
看看redis
的命令,放眼望去,的有序集合(sorted set)就是一個很好用的命令,完全可以用他做一個延時訊息佇列
redis有序集合(sorted set)
redis
有序集合,每個元素都會關聯一個double
型別的分數。redis
正是通過分數來為集合中的成員進行從小到大的排序。
有序集合的成員是唯一的,但分數(score
簡單操作
新增資料
127.0.0.1:6379> ZADD testSet1 5 a
(integer) 1
127.0.0.1:6379> ZADD testSet1 1 b 8 c 7 d
(integer) 3
讀取
127.0.0.1:6379> ZRANGEBYSCORE testSet1 0 3
1) "b"
127.0.0.1:6379> ZRANGEBYSCORE testSet1 0 5
1) "b"
2) "a"
也可以把score
打出來
127.0.0.1:6379> ZRANGEBYSCORE testSet1 -inf 5 WITHSCORES 1) "b" 2) "1" 3) "a" 4) "5"
查出所有的資料
127.0.0.1:6379> ZRANGEBYSCORE testSet1 -inf inf
1) "b"
2) "a"
3) "d"
4) "c"
刪除資料
ZREMRANGEBYSCORE testSet1 0 2
延時佇列的實現思路
總體的思路很簡單,就是每一個value
的score
儲存的是時間,也就是說,在新增一個元素時他的score
是當前時間+延時的時間。輪循獲取資料時,查詢小於或等於當前時間的資料項,就是具體的延時訊息。
還有一個問題,就是
ZRANGEBYSCORE
和list
的pop
不同,pop
是取出元素並且會把元素在list
中刪除。ZRANGEBYSCORE
只會取出資料不會把資料從sorted set
中刪除。解決方法1,利用redis
的事務
,先ZRANGEBYSCORE
取出資料,然後再用ZREMRANGEBYSCORE
把資料刪除。
具體實現-code
新增延時訊息,引數delay
就是我們要延時多久:
func (p *Producer) PublishDelayMsg(topicName string, body []byte, delay time.Duration) error {
if delay <= 0 {
return errors.New("delay need great than zero")
}
tm := time.Now().Add(delay)
msg := NewMessage("", body)
msg.DelayTime = tm.Unix()
sendData, _ := json.Marshal(msg)
return p.redisCmd.ZAdd(topicName+zsetSuffix, redis.Z{Score: float64(tm.Unix()), Member: string(sendData)}).Err()
}
使用,比如我們想過1秒再處理
producer.PublishDelayMsg(topicName, body, time.Second)
讀取訊息並處理
這就比較簡單了,就是在一個ticker
裡迴圈讀取小於或等於當前時間的資料:
func (s *consumer) startGetDelayMessage() {
go func() {
ticker := time.NewTicker(s.options.RateLimitPeriod)
defer func() {
log.Println("stop get delay message.")
ticker.Stop()
}()
topicName := s.topicName + zsetSuffix
for {
currentTime := time.Now().Unix()
select {
case <-s.ctx.Done():
log.Printf("context Done msg: %#v \n", s.ctx.Err())
return
case <-ticker.C:
var valuesCmd *redis.ZSliceCmd
_, err := s.redisCmd.TxPipelined(func(pip redis.Pipeliner) error {
valuesCmd = pip.ZRangeWithScores(topicName, 0, currentTime)
pip.ZRemRangeByScore(topicName, "0", strconv.FormatInt(currentTime, 10))
return nil
})
if err != nil {
log.Printf("zset pip error: %#v \n", err)
continue
}
rev := valuesCmd.Val()
for _, revBody := range rev {
msg := &Message{}
json.Unmarshal([]byte(revBody.Member.(string)), msg)
if s.handler != nil {
s.handler.HandleMessage(msg)
}
}
}
}
}()
}