golang實現redis的延時訊息佇列功能
阿新 • • 發佈:2020-06-24
前言
在學習過程中發現redis的zset還可以用來實現輕量級的延時訊息佇列功能,雖然可靠性還有待提高,但是對於一些對資料可靠性要求不那麼高的功能要求完全可以實現。本次主要採用了redis中zset中的zadd,zrangebyscore 和 zdel來實現一個小demo。
提前準備 安裝redis, redis-go
因為用的是macOS, 直接
$ brew install redis
$ go get github.com/garyburd/redigo/redis
複製程式碼
又因為比較懶,生成任務的唯一id時,直接採用了bson中的objectId,所以:
$ go get gopkg.in/mgo.v2/bson
複製程式碼
唯一id不是必須有,但如果之後有實際應用需要攜帶,便於查詢相應任務。
生產者
通過一個for迴圈生成10w個任務,每一個任務有不同的時間
func producer() {
count := 0
//生成100000個任務
for count < 100000 {
count++
dealTime := int64(rand.Intn(5)) + time.Now().Unix()
uuid := bson.NewObjectId().Hex()
redis.Client.AddJob(&job.JobMessage{
Id: uuid,DealTime: dealTime,},+ int64 (dealTime))
}
}
複製程式碼
其中AddJob函式在另一個包中,將上一個函式中隨機生成的時間作為需要處理的時間戳.
// 新增任務
func (client *RedisClient) AddJob(msg *job.JobMessage,dealTime int64) {
conn := client.Get()
defer conn.Close()
key := "JOB_MESSAGE_QUEUE"
conn.Do("zadd",key,dealTime,util.JsonEncode(msg))
}
複製程式碼
消費者
消費者處理流程分為兩個步驟:
- 獲取小於等於當前時間戳的任務
- 通過刪除當前任務來判斷誰獲得了當前任務
因為在獲取小於等於當前時間戳的任務時,可能有多個go routine同時讀到了當前任務,而只有一個任務可以來處理當前任務。因此我們需要通過一個方案來判斷究竟由誰來處理這個任務(當然如果只有一個消費者可以讀到就直接處理):這個時候可以通過redis的刪除操作來獲取,因為刪除指定value時只有成功的操作才會返回不為0,所以我們可以認為刪除當前佇列成功的那個go routine拿到了當前的任務。
下面是程式碼:
// 消費者
func consumer() {
// 啟動10個go routine一起去拿
count := 0
for count < 10 {
go func() {
for {
jobs := redis.Client.GetJob()
if len(jobs) <= 0 {
time.Sleep(time.Second * 1)
continue
}
currentJob := jobs[0]
// 如果當前搶redis佇列成功,
if redis.Client.DelJob(currentJob) > 0 {
var jobMessage job.JobMessage
util.JsonDecode(currentJob,&jobMessage) //自定義的json解析函式
handleMessage(&jobMessage)
}
}
}()
count++
}
}
// 處理任務用函式
func handleMessage(msg *job.JobMessage) {
fmt.Printf("deal job: %s,require time: %d \n",msg.Id,msg.DealTime)
go func() {
countChan <- true
}()
}
複製程式碼
redis部分的程式碼,獲取任務和刪除任務
// 獲取任務
func (client *RedisClient) GetJob() []string {
conn := client.Get()
defer conn.Close()
key := "JOB_MESSAGE_QUEUE"
timeNow := time.Now().Unix()
ret,err := redis.Strings(conn.Do("zrangebyscore",0,timeNow,"limit",1))
if err != nil {
panic(err)
}
return ret
}
// 刪除當前任務, 用來判斷是否搶到了當前任務
func (client *RedisClient) DelJob(value string) int {
conn := client.Get()
defer conn.Close()
key := "JOB_MESSAGE_QUEUE"
ret,err := redis.Int(conn.Do("zrem",value))
if err != nil {
panic(err)
}
return ret
}
複製程式碼
程式碼大抵如此。最後跑起來之後,大概每3-4秒鐘能夠處理掉1w個任務,速度上確實是...