golang基於redis lua封裝的優先順序去重佇列
阿新 • • 發佈:2022-05-04
前言:
前兩天由於某幾個廠商的api出問題,導致後臺任務大量堆積,又因為我這邊任務流系統會重試超時任務,所以導致佇列中有大量的重複任務。這時候我們要臨時解決兩個事情,一件事情,讓一些高質量的任務優先執行; 另一件事情, 要有去重。 rabbitmq不能很好的針對這類情況去重、分優先順序。
這時候我又想到了我最愛的redis… 去重? list + set 就可以解決, 優先順序,zset + zrange + zrem 也可以解決… 但問題這幾個命令非原子,那麼怎麼讓他們原子? 寫模組 or redis lua script . 首先在 redis 4.x 寫了個簡單的module,但寫完了發現一件頗為重要的事情,我們線上的是3.2 …. 然後又花了點時間改成redis lua的版本。專案本身的功能實現很簡單,複雜的是創意 !!!
專案名: redis_unique_queue
, 專案地址,https://github.com/rfyiamcool/redis_unique_queue
該文章後續會有更新, 原文地址, http://xiaorui.cc/?p=4828
主要功能介紹:
使用redis lua script 封裝的去重及優先順序佇列方法, 達到了組合命令的原子性和節省來往的io請求的目的.
去重佇列:
不僅能保證FIFO, 而且去重.
優先順序去重佇列:
按照優先順序獲取任務, 並且去重.
使用方法:
# xiaorui.cc # PriorityQueue NewPriorityQueue(priority int, unique bool, r *redis.Pool) Push(q string, body string, pri int) (int, error) Pop(q string) (resp string, err error) # UniqueQueue NewUniqueQueue(r *redis.Pool) *UniqueQueue UniquePush(q string, body string) (int, error) UniquePop(q string) (resp string, err error) more..
下面是優先順序去重佇列的例子:
package main // xiaorui.cc import ( "fmt" "github.com/rfyiamcool/redis_unique_queue" ) func main() { fmt.Println("start") redis_client_config := unique_queue.RedisConfType{ RedisPw: "", RedisHost: "127.0.0.1:6379", RedisDb: 0, RedisMaxActive: 100, RedisMaxIdle: 100, RedisIdleTimeOut: 1000, } redis_client := unique_queue.NewRedisPool(redis_client_config) qname := "xiaorui.cc" body := "message from xiaorui.cc" u := unique_queue.NewPriorityQueue(3, true, redis_client) // 3: 3個優先順序,從1-3級 // true: 開啟unique set u.Push(qname, body, 2) // 2, 優先順序 fmt.Println(u.Pop(qname)) }
單單使用 去重佇列的例子:
package main
import (
"fmt"
"github.com/rfyiamcool/redis_unique_queue"
)
func main() {
fmt.Println("start")
redis_client_config := unique_queue.RedisConfType{
RedisPw: "",
RedisHost: "127.0.0.1:6379",
RedisDb: 0,
RedisMaxActive: 100,
RedisMaxIdle: 100,
RedisIdleTimeOut: 1000,
}
redis_client := unique_queue.NewRedisPool(redis_client_config)
qname := "xiaorui.cc"
u := unique_queue.NewUniqueQueue(redis_client)
for i := 0; i < 100; i++ {
u.UniquePush(qname, "body...")
}
fmt.Println(u.Length(qname))
for i := 0; i < 100; i++ {
u.UniquePop(qname)
}
fmt.Println(u.Length(qname))
fmt.Println("end")
}
需要改進地址也是很多, 比如 加入批量操作, 對於redis連線池引入方法改進等.
END.