1. 程式人生 > >玩轉redis-簡單訊息佇列

玩轉redis-簡單訊息佇列

使用go語言基於redis寫了一個簡單的訊息佇列
原始碼地址
使用demo

redis的 list 非常的靈活,可以從左邊或者右邊新增元素,當然也以從任意一頭讀取資料

新增資料和獲取資料的操作也是非常簡單的
LPUSH 從左邊插入資料
RPUSH 大右邊插入資料
LPOP 從左邊取出一個數據
RPOP 從右邊取出一個數據

127.0.0.1:6379> LPUSH list1 a
(integer) 1
127.0.0.1:6379> RPUSH list1 b
(integer) 2
127.0.0.1:6379> LPOP list1
"a"
127.0.0.1:6379> RPOP list1
"b"

或者使用 BLPOP BRPOP 來讀取資料,不同之處是取資料時,如果沒有資料會等待指定的時間,
如果這期間有資料寫入,則會讀取並返回,沒有資料則會返回空
在一個視窗1讀取

127.0.0.1:6379> BLPOP list1 10
1) "list1"
2) "a"

在另一個視窗2寫入

127.0.0.1:6379> RPUSH list1 a b c
(integer) 3

再開一個視窗3讀取,第二次讀取時,list是空的,所以等待1秒後返回空。

127.0.0.1:6379> BRPOP list1 1
1) "list1"
2) "c"

127.0.0.1:6379> BRPOP list1 1
(nil)
(1.04s)

簡單訊息佇列的實現

如果我們只從一邊新增元素,向另一邊取出元素,這就不是一個訊息佇列麼。但我估計你會有一個疑問,在消費資料時,同一個訊息會不會同時被多個consumer消費掉?

當然不會,因為redis是單執行緒的,在從list取資料時天然不會出現併發問題。但是這是一個簡單的訊息佇列,消費不成功怎麼處理還是需要我們自己寫程式碼來實現的

下面我說一下使用list實現一個簡單的訊息佇列的整體思路

comsumer的實現

consumer 主要做的就是從list裡讀取資料,使用LPOP或者BLPOP都可以,
這裡做了一個開關 optionsUseBLopp如果為true時會使用BLPOP

type consumer struct {
	once            sync.Once
	redisCmd        redis.Cmdable
	ctx             context.Context
	topicName       string
	handler         Handler
	rateLimitPeriod time.Duration
	options         ConsumerOptions
	_               struct{}
}

type ConsumerOptions struct {
	RateLimitPeriod time.Duration
	UseBLPop        bool
}

看一下建立consumer的程式碼,最後面的opts引數是可選的配置

type Consumer = *consumer

func NewSimpleMQConsumer(ctx context.Context, redisCmd redis.Cmdable, topicName string, opts ...ConsumerOption) Consumer {
	consumer := &consumer{
		redisCmd:  redisCmd,
		ctx:       ctx,
		topicName: topicName,
	}
	for _, o := range opts {
		o(&consumer.options)
	}
	if consumer.options.RateLimitPeriod == 0 {
		consumer.options.RateLimitPeriod = time.Microsecond * 200
	}
	return consumer
}

讀取資料後具體怎麼進行處理呼叫者可以根據自己的業務邏輯進行相應處理
有一個小的interface呼叫者根據自己的邏輯去實現

type Handler interface {
	HandleMessage(msg *Message)
}

讀取資料的邏輯使用一個gorouting實現

func (s *consumer) startGetMessage() {
	go func() {
		ticker := time.NewTicker(s.options.RateLimitPeriod)
		defer func() {
			log.Println("stop get message.")
			ticker.Stop()
		}()
		for {
			select {
			case <-s.ctx.Done():
				log.Printf("context Done msg: %#v \n", s.ctx.Err())
				return
			case <-ticker.C:
				var revBody []byte
				var err error
				if !s.options.UseBLPop {
					revBody, err = s.redisCmd.LPop(s.topicName).Bytes()
				} else {
					revs := s.redisCmd.BLPop(time.Second, s.topicName)
					err = revs.Err()
					revValues := revs.Val()
					if len(revValues) >= 2 {
						revBody = []byte(revValues[1])
					}
				}
				if err == redis.Nil {
					continue
				}
				if err != nil {
					log.Printf("LPOP error: %#v \n", err)
					continue
				}

				if len(revBody) == 0 {
					continue
				}
				msg := &Message{}
				json.Unmarshal(revBody, msg)
				if s.handler != nil {
					s.handler.HandleMessage(msg)
				}
			}
		}
	}()
}

Producer 的實現

Producer還是很簡單的就是把資料推送到 reids

type Producer struct {
	redisCmd redis.Cmdable
	_        struct{}
}

func NewProducer(cmd redis.Cmdable) *Producer {
	return &Producer{redisCmd: cmd}
}

func (p *Producer) Publish(topicName string, body []byte) error {
	msg := NewMessage("", body)
	sendData, _ := json.Marshal(msg)
	return p.redisCmd.RPush(topicName, string(sendData)).Err()
}