1. 程式人生 > 其它 >zset實現延時佇列

zset實現延時佇列

一、zset相關操作

zrangebyscore:從zset拿出區間在[n,m]內的元素值
zadd:往zset新增一個元素
zrem:刪除一個元素


多消費者非同步消費資料的時候,先從zset拿到資料,然後刪除資料,最後再去消費資料,這樣可以確保任務不被其他消費者消費到
package delayqueue

import (
    "fmt"
    "log"
    "math/rand"
    "time"

    "github.com/garyburd/redigo/redis"

    "Project_A/json"
)
type redisClient struct {
    cli redis.Conn
}

func NewRedis() 
*redisClient{ conn,err:=redis.Dial("tcp","127.0.0.1:6379") if err!=nil{ log.Println("redis err:%v",err) panic(err) } return &redisClient{cli: conn} } type TaskMsg struct { Name string DelayTime int64 } func (c *redisClient)AddTask(msg *TaskMsg, dealTime int64) { key :
= "DelayQueue" data,err:=json.Marshal(msg) if err!=nil{ fmt.Errorf("AddTask err:%v",err) return } // 將延期的時間作為score,寫進zset ret,err:=c.cli.Do("zadd", key, dealTime,data ) if err!=nil{ fmt.Errorf("%v",err) return } if ret!=0{ fmt.Errorf(
"%v msg:%v",ret,msg) } } func (c *redisClient)DelTask(value string) int{ key := "DelayQueue" ret, err := redis.Int(c.cli.Do("zrem", key, value)) if err != nil { panic(err) } return ret } func (c *redisClient)GetTask() []string{ key := "DelayQueue" nowTime := time.Now().Unix() // zrangebyscore 從zset拿出score在[0,nowTime]的元素 ret, err := redis.Strings(c.cli.Do("zrangebyscore", key, 0, nowTime, "limit", 0, 1)) if err != nil { fmt.Errorf("GetTask err:%v",err) } return ret } func (c *redisClient)producer(count int) { for i:=0;i<count;i++ { dealTime := int64(rand.Intn(5)) + time.Now().Unix() taskName := fmt.Sprintf("task_%v",i) task:=&TaskMsg{ Name: taskName, DelayTime: dealTime, } c.AddTask(task,dealTime) fmt.Printf("%v\n",task) } } // 消費者 func (c *redisClient) consumer() { for { tasks := c.GetTask() if len(tasks) <= 0 { time.Sleep(time.Second * 1) continue } fmt.Printf("now time:%v,get task:%v",time.Now().Unix(),tasks[0]) // 如果當前搶redis佇列成功, if c.DelTask(tasks[0]) > 0 { task:=&TaskMsg{} err:=json.Unmarshal([]byte(tasks[0]),task) if err!=nil{ fmt.Errorf("unmarshal err:%v",err) return } handleMessage(task) } } } // 處理任務用函式 func handleMessage(msg *TaskMsg) { fmt.Printf("handleMessage: %s, now time:%v,require time: %d \n", msg.Name,time.Now().Unix(), msg.DelayTime) } func Test(){ //https://juejin.cn/post/6844904003810099208 svr:=NewRedis() svr.producer(10) svr.consumer() }