玩轉redis-簡單訊息佇列
阿新 • • 發佈:2020-04-08
使用go
語言基於redis
寫了一個簡單的訊息佇列
原始碼地址
使用demo
redis的 list
非常的靈活,可以從左邊或者右邊新增元素,當然也以從任意一頭讀取資料
新增資料和獲取資料的操作也是非常簡單的
LPUSH
從左邊插入資料
RPUSH
大右邊插入資料
LPOP
從左邊取出一個數據
RPOP
從右邊取出一個數據
127.0.0.1:6379> LPUSH list1 a (integer) 1 127.0.0.1:6379> RPUSH list1 b (integer) 2 127.0.0.1:6379> LPOP list1 "a" 127.0.0.1:6379> RPOP list1 "b"
或者使用 BLPOP
BRPOP
來讀取資料,不同之處是取資料時,如果沒有資料會等待指定的時間,
如果這期間有資料寫入,則會讀取並返回,沒有資料則會返回空
在一個視窗1
讀取
127.0.0.1:6379> BLPOP list1 10
1) "list1"
2) "a"
在另一個視窗2
寫入
127.0.0.1:6379> RPUSH list1 a b c
(integer) 3
再開一個視窗3
讀取,第二次讀取時,list
是空的,所以等待1秒後返回空。
127.0.0.1:6379> BRPOP list1 1 1) "list1" 2) "c" 127.0.0.1:6379> BRPOP list1 1 (nil) (1.04s)
簡單訊息佇列的實現
如果我們只從一邊新增元素,向另一邊取出元素,這就不是一個訊息佇列麼。但我估計你會有一個疑問,在消費資料時,同一個訊息會不會同時被多個consumer
消費掉?
當然不會,因為redis是單執行緒的,在從list
取資料時天然不會出現併發問題。但是這是一個簡單的訊息佇列,消費不成功怎麼處理還是需要我們自己寫程式碼來實現的
下面我說一下使用list
實現一個簡單的訊息佇列的整體思路
comsumer的實現
consumer
主要做的就是從list裡讀取資料,使用LPOP
或者BLPOP
都可以,
這裡做了一個開關 options
的UseBLopp
如果為true
時會使用BLPOP
。
type consumer struct {
once sync.Once
redisCmd redis.Cmdable
ctx context.Context
topicName string
handler Handler
rateLimitPeriod time.Duration
options ConsumerOptions
_ struct{}
}
type ConsumerOptions struct {
RateLimitPeriod time.Duration
UseBLPop bool
}
看一下建立consumer
的程式碼,最後面的opts
引數是可選的配置
type Consumer = *consumer
func NewSimpleMQConsumer(ctx context.Context, redisCmd redis.Cmdable, topicName string, opts ...ConsumerOption) Consumer {
consumer := &consumer{
redisCmd: redisCmd,
ctx: ctx,
topicName: topicName,
}
for _, o := range opts {
o(&consumer.options)
}
if consumer.options.RateLimitPeriod == 0 {
consumer.options.RateLimitPeriod = time.Microsecond * 200
}
return consumer
}
讀取資料後具體怎麼進行處理呼叫者可以根據自己的業務邏輯進行相應處理
有一個小的interface
呼叫者根據自己的邏輯去實現
type Handler interface {
HandleMessage(msg *Message)
}
讀取資料的邏輯使用一個gorouting實現
func (s *consumer) startGetMessage() {
go func() {
ticker := time.NewTicker(s.options.RateLimitPeriod)
defer func() {
log.Println("stop get message.")
ticker.Stop()
}()
for {
select {
case <-s.ctx.Done():
log.Printf("context Done msg: %#v \n", s.ctx.Err())
return
case <-ticker.C:
var revBody []byte
var err error
if !s.options.UseBLPop {
revBody, err = s.redisCmd.LPop(s.topicName).Bytes()
} else {
revs := s.redisCmd.BLPop(time.Second, s.topicName)
err = revs.Err()
revValues := revs.Val()
if len(revValues) >= 2 {
revBody = []byte(revValues[1])
}
}
if err == redis.Nil {
continue
}
if err != nil {
log.Printf("LPOP error: %#v \n", err)
continue
}
if len(revBody) == 0 {
continue
}
msg := &Message{}
json.Unmarshal(revBody, msg)
if s.handler != nil {
s.handler.HandleMessage(msg)
}
}
}
}()
}
Producer 的實現
Producer
還是很簡單的就是把資料推送到 reids
type Producer struct {
redisCmd redis.Cmdable
_ struct{}
}
func NewProducer(cmd redis.Cmdable) *Producer {
return &Producer{redisCmd: cmd}
}
func (p *Producer) Publish(topicName string, body []byte) error {
msg := NewMessage("", body)
sendData, _ := json.Marshal(msg)
return p.redisCmd.RPush(topicName, string(sendData)).Err()
}