1. 程式人生 > 實用技巧 >12. Redis中的訊息佇列

12. Redis中的訊息佇列

楔子

Redis雖然是一個快取,但是它也可以作為一個訊息佇列。所以redis還是比較有野心的,本來在快取方面就已經把memcached給幹掉了,但還想在訊息佇列的方向上闖一闖。不過雖說Redis支援訊息佇列,但是它還是作為快取更加的專業,大公司很少有將redis作為訊息佇列來使用的,因此訊息佇列的話一般還是使用rabbitmq、activemq之類的會比較好。

釋出訂閱模式

在 Redis 中提供了專門的型別:Publisher(釋出者)和 Subscriber(訂閱者)來實現訊息佇列。

不過在介紹訊息佇列之前,先丟擲幾個概念,這樣理解下文會更加輕鬆一些,當然都是老生常談的內容了。

  • 釋出訊息的叫做釋出方或釋出者,也就是訊息的生產者。
  • 接收訊息的叫做訊息的訂閱方或訂閱者,也就是消費者,用來處理生產者釋出的訊息。

除了釋出和和訂閱者,在訊息佇列中還有一個重要的概念:channel,指的是管道,可以理解為某個訊息佇列的名稱。首先消費者先要訂閱某個 channel,然後當生產者把訊息傳送到這個 channel 中時,消費者就可以正常接收到訊息了,如下圖所示:

普通訂閱與釋出

訊息佇列有兩個重要的角色,一個是傳送者,另一個就是訂閱者,對應的命令如下:

  • 釋出訊息:publish channel "message"
  • 訂閱訊息:subscribe channel

下面我們來看具體的命令實現。

訂閱訊息

127.0.0.1:6379> subscribe channel1 channel2  # 可以同時訂閱多個頻道
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel1"
3) (integer) 1
1) "subscribe"
2) "channel2"
3) (integer) 2

注意:當我們訂閱某個頻道的時候,就阻塞在這裡了。

就類似於微信公眾號一樣,你關注了某個公眾號,那麼當公眾號上面發表文章的時候,你就可以收到。此時操作公眾號的人就是訊息釋出者,你就是訊息訂閱者,公眾號就是訊息佇列,往公眾號上面發表的文章就是訊息。

傳送訊息

我們上面的訂閱者在訂閱之後,就處於阻塞狀態,因此我們需要再開一個終端。

127.0.0.1:6379> publish channel1 "mea: please please money"
(integer) 1
127.0.0.1:6379> publish channel2 "mea: please please money"
(integer) 1
127.0.0.1:6379> 

返回值表示成功傳送給了幾個訂閱方,所以這裡的 1 就表示成功發給了一個訂閱者,這個數字可以是 0~n,這是由訂閱者的數量決定的。如果有兩個訂閱者,那麼返回值就是2。

然後我們來看看訂閱者:

127.0.0.1:6379> subscribe channel1 channel2
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel1"
3) (integer) 1
1) "subscribe"
2) "channel2"
3) (integer) 2
1) "message"
2) "channel1"  # channel1 接收到訊息
3) "mea: please please money"
1) "message"
2) "channel2"  # channel2接收到訊息
3) "mea: please please money"

主題訂閱

主題訂閱說白了,和模糊匹配是類似的。假設我們需要訂閱好幾個訊息佇列,但它們都是以log開頭的,那麼我們就可以通過psubscribe log*來自動訂閱所有以log開頭的佇列。

比如我們上面的channel1、channel2,我們就可以通過psubscribe channel*實現,至於訊息釋出者則不需要變。

當然主題訂閱也可以是多個,比如:psubscribe log* db*,訂閱所有以log開頭、db開頭的訊息佇列。

Python操作Redis的釋出訂閱

# 訂閱者
import redis

client = redis.Redis(host="47.94.174.89", decode_responses="utf-8")

# 呼叫pubsub方法返回一個訂閱者
sub = client.pubsub()
# 訂閱兩個佇列
sub.subscribe("ch1", "ch2")
# 監聽,此時處於阻塞狀態
for item in sub.listen():
    # 一旦釋出者釋出訊息,這裡就可以接收到
    # item["channel"]是頻道,item["data"]是接收到了內容
    print(item["channel"], item["data"])
# 釋出者
import redis

client = redis.Redis(host="47.94.174.89", decode_responses="utf-8")
# 釋出者很簡單,直接釋出訊息介面
client.publish("ch1", "屑女僕1")
client.publish("ch1", "屑女僕2")
client.publish("ch2", "屑女僕3")

當執行釋出者的時候,會發現訂閱者多了幾條輸出,至於內容顯然是釋出者釋出的內容。

Python操作Redis,訂閱者還有幾種方式。

import redis

client = redis.Redis(host="47.94.174.89", decode_responses="utf-8")

sub = client.pubsub()
sub.subscribe("ch1", "ch2")

while True:
    # 這種方式會瞬間返回,如果有訊息得到訊息,沒有訊息會返回None
    item = sub.get_message()
    if item:
        print(item["channel"], item["data"])

或者開啟一個新的執行緒去監聽。

import redis

client = redis.Redis(host="47.94.174.89", decode_responses="utf-8")

sub = client.pubsub()
sub.subscribe("ch1", "ch2")

def handler(item):
    print(item["channel"], item["data"])

# 給每一個頻道註冊一個處理函式,當頻道有訊息時,會自動將訊息傳遞給處理函式
# 注意:上面的pubsub中訂閱的頻道都要有對應的處理函式
# 假設我們只給ch1註冊了處理函式,那麼執行的時候就會報錯:Channel: 'ch2' has no handler registered
sub.channels.update({"ch1": handler, "ch2": handler})
# 開啟一個執行緒執行,會返回新開啟的執行緒物件,注意:因為是單獨開了一個執行緒,所以這裡不會阻塞的,會直接往下走
th = sub.run_in_thread()
print("xxx")
print("yyy")
print("zzz")

# 先啟動訂閱者,再啟動釋出者,程式輸出如下
"""
xxx
yyy
zzz
ch1 屑女僕1
ch1 屑女僕2
ch2 屑女僕3
"""

# 注意:這裡程式依舊會卡住,因為開啟的執行緒是非守護執行緒
# 所以即便主執行緒執行完畢,也依舊會等待子執行緒
# 解決的辦法有兩種:
# 一種是在run_in_thread中加上一個引數daemon=True,設定為守護執行緒,這樣主執行緒就不會等待了
# 另一種是手動停止,我們說sub.run_in_thread會返回新開啟的執行緒,然後呼叫其stop方法即可
th.stop()  # 通過這種方式,我們可以在任意時刻停止監聽。

對於主題訂閱,釋出者程式碼不用變,只需要將訂閱的sub.subscribe換成sub.psubscribe即可。

import redis

client = redis.Redis(host="47.94.174.89", decode_responses="utf-8")

sub = client.pubsub()
sub.psubscribe("ch*")

def handler(item):
    print(item["channel"], item["data"])

# 對於開啟新的執行緒去監聽,要將之前的self.channels換成self.patterns
sub.patterns.update({"ch*": handler})
sub.run_in_thread()

取消訂閱

既然有訂閱,那麼就要取消訂閱,就類似於取關(o(╥﹏╥)o)

使用unsubscribe channel1 channel2可以取消訂閱多個channel,同理對於psubscribe ch*,也有punsubscribe ch*取消訂閱指定模式的頻道。比較簡單,不再贅述。

注意事項

釋出訂閱模式存在以下兩個缺點:

  • 無法持久化儲存訊息,如果 Redis 伺服器宕機或重啟,那麼所有的訊息將會丟失;
  • 釋出訂閱模式是"發後既忘"的工作模式,如果有訂閱者離線重連之後不能消費之前的歷史訊息。

然而這些缺點在 Redis 5.0 添加了 Stream 型別之後會被徹底的解決。

除了以上缺點外,釋出訂閱模式還有另一個需要注意問題:當消費端有一定的訊息積壓時,也就是生產者傳送的訊息,消費者消費不過來時,如果超過 32M 或者是 60s 內持續保持在 8M 以上,消費端會被強行斷開,這個引數是在配置檔案中設定的,預設值是 client-output-buffer-limit pubsub 32mb 8mb 60

小結

這一節介紹了訊息佇列的幾個名詞,生產者、消費者對應的就是訊息的傳送者和接收者,也介紹了釋出訂閱模式的幾個命令:

  • subscribe channel:普通訂閱
  • publish channel message:訊息推送
  • psubscribe pattern:主題訂閱
  • unsubscribe channel:取消普通訂閱
  • punsubscribe pattern:取消主題訂閱

使用它們之後就可以完成單個頻道和多個頻道的訊息收發,但傳送與訂閱模式也有一些缺點,比如“發後既忘”和不能持久化等問題,然而這些問題會等到 Stream 型別的出現而得到解決,關於更多 Stream 的內容後面文章會詳細介紹。

實現訊息佇列的其它方式

在 Redis 5.0 之前訊息佇列的實現方式有很多種,比較常見的除了我們上文介紹的釋出訂閱模式,還有兩種:List 和 ZSet 的實現方式。

List 和 ZSet 的方式解決了釋出訂閱模式不能持久化的問題,但這兩種方式也有自己的缺點,接下來我們一起來了解一下,先從 List 實現訊息佇列的方式說起。

List版訊息佇列

List 方式是實現訊息佇列最簡單和最直接的方式,它主要是通過 lpush 和 rpop 存入和讀取實現訊息佇列的,如下圖所示:

List 使用命令的方式實現訊息佇列:

127.0.0.1:6379> lpush channel message1
(integer) 1
127.0.0.1:6379> lpush channel message2
(integer) 2
127.0.0.1:6379> rpop channel
"message1"
127.0.0.1:6379> rpop channel
"message2"
127.0.0.1:6379> 

lpush用於生產訊息,rpop用於消費訊息。

然後我們使用Python來操作List模擬訊息佇列。

import threading
import redis

client = redis.Redis(host="47.94.174.89", decode_responses="utf-8")


def producer(messages: list):

    for message in messages:
        client.lpush("mq", message)
        print("生產者往佇列mq裡放入訊息:", message)


def consumer():
    while True:
        # brpop裡面可以傳入一個timeout,表示設定超時時間,預設為0,會一直阻塞
        print("消費者從佇列mq中消費了訊息:", client.brpop("mq"))


t1 = threading.Thread(target=producer, args=([f"message{_}" for _ in range(10)],))
t2 = threading.Thread(target=consumer)

t1.start()
t2.start()

for t in threading.enumerate():
    if t is not threading.main_thread():
        t.join()

"""
消費者從佇列mq中消費了訊息: ('mq', 'message0')
生產者往佇列mq裡放入訊息: message0
生產者往佇列mq裡放入訊息: message1
消費者從佇列mq中消費了訊息: ('mq', 'message1')
生產者往佇列mq裡放入訊息: message2
消費者從佇列mq中消費了訊息: ('mq', 'message2')
生產者往佇列mq裡放入訊息: message3
消費者從佇列mq中消費了訊息: ('mq', 'message3')
生產者往佇列mq裡放入訊息: message4
消費者從佇列mq中消費了訊息: ('mq', 'message4')
生產者往佇列mq裡放入訊息: message5
消費者從佇列mq中消費了訊息: ('mq', 'message5')
消費者從佇列mq中消費了訊息: ('mq', 'message6')
生產者往佇列mq裡放入訊息: message6
消費者從佇列mq中消費了訊息: ('mq', 'message7')
生產者往佇列mq裡放入訊息: message7
消費者從佇列mq中消費了訊息: ('mq', 'message8')
生產者往佇列mq裡放入訊息: message8
消費者從佇列mq中消費了訊息: ('mq', 'message9')
生產者往佇列mq裡放入訊息: message9
"""

我們看到使用List實現了一個類似於佇列的方式,但這顯然也是有其優缺點的

優點

  • 訊息可以被持久化,藉助 Redis 本身的持久化(AOF、RDB 或者是混合持久化),可以有效的儲存資料;
  • 消費者可以積壓訊息,不會因為客戶端的訊息過多而被強行斷開。

缺點

  • 訊息不能被重複消費,一個訊息消費完就會被刪除;
  • 沒有主題訂閱的功能。

ZSet 版訊息佇列

相比於之前的List 和釋出訂閱方式,ZSet 版訊息佇列在實現上要複雜一些,但 ZSet 因為多了一個 score(分值)屬性,從而使它具備更多的功能,例如我們可以用它來儲存時間戳,以此來實現延遲訊息佇列等。

它的實現思路和 List 相同也是利用 zadd 和 zrangebyscore 來實現存入和讀取,這裡就不重複敘述了,可以根據 List 的實現方式來實踐一下,看能不能實現相應的功能。如果寫不出來也沒關係,我們會在後面學習延遲佇列,到時候會用 ZSet 來實現。

優點

  • 支援訊息持久化;
  • 相比於 List 查詢更方便,ZSet 可以利用 score 屬性很方便的完成檢索,而 List 則需要遍歷整個元素才能檢索到某個值。

缺點

  • ZSet 不能儲存相同元素的值,也就是如果有訊息是重複的,那麼只能插入一條資訊在有序集合中;
  • ZSet 是根據 score 值排序的,不能像 List 一樣,按照插入順序來排序;
  • ZSet 沒有向 List 的 brpop 那樣的阻塞彈出的功能。

小結

這一節我們介紹了訊息佇列的另外兩種實現方式 List 和 ZSet,它們都是利用自身方法,先把資料放到佇列(自身的資料結構)裡,再使用無限迴圈讀取佇列中的訊息,以實現訊息佇列的功能,相比釋出訂閱模式,這兩種方式的優勢是支援持久化,當然它們各自都存在一些問題。

訊息佇列的終極解決方案--stream(上)

在 Redis 5.0 Stream 沒出來之前,訊息佇列的實現方式都有著各自的缺陷,例如:

  • 釋出訂閱模式 PubSub,不能持久化也就無法可靠的儲存訊息,並且對於離線重連的客戶端不能讀取歷史訊息的缺陷;
  • 列表實現訊息佇列的方式不能重複消費,一個訊息消費完就會被刪除;
  • 有序集合訊息佇列的實現方式不能儲存相同 value 的訊息,並且不能阻塞讀取訊息。

基礎使用

Stream 既然是一個數據型別,那麼和其他資料型別相似,它也有一些自己的操作方法,例如:

  • xadd:新增訊息;
  • xlen:查詢訊息的長度;
  • xdel:根據訊息ID刪除訊息;
  • del:刪除整個stream,當然del可以刪除任意的key;
  • xrange:讀取區間訊息;
  • xread:讀取某個訊息之後的訊息;

我們看一下具體如何操作。

新增訊息

語法:xadd key ID field1 string1 field2 string2······

127.0.0.1:6379> xadd my_stream * name mea age 19
"1594952987816-0"
127.0.0.1:6379> 

其中*表示Redis使用的規則:時間戳+序號的方式自動生成ID,當然你也可以指定自己的ID

查詢訊息的長度

語法:xlen key

127.0.0.1:6379> xlen my_stream
(integer) 1
127.0.0.1:6379> xadd my_stream * name hanser age 28  # 再新增一條
"1594953077142-0"
127.0.0.1:6379> xlen my_stream  # 長度變為2
(integer) 2
127.0.0.1:6379>

刪除訊息

語法:xdel key 訊息ID·····,可以同時刪除多個

127.0.0.1:6379> xlen my_stream
(integer) 2
127.0.0.1:6379> xdel my_stream 1594953077142-0
(integer) 1
127.0.0.1:6379> xlen my_stream
(integer) 1
127.0.0.1:6379>

刪除整個stream

直接使用del,它可以刪除任意多個任意的key

127.0.0.1:6379> del my_stream
(integer) 1
127.0.0.1:6379> 

查詢區間訊息

xrange key start end count n,這裡的start和end指的是訊息ID。

127.0.0.1:6379> # 新增幾條訊息
127.0.0.1:6379> xadd mq * name satori age 17
"1594953403230-0"
127.0.0.1:6379> xadd mq * name koishi age 16
"1594953410148-0"
127.0.0.1:6379> xadd mq * name scarlet age 400
"1594953422249-0"
127.0.0.1:6379> xadd mq * name morisa age unknow
"1594953438554-0"
127.0.0.1:6379> # 查詢
127.0.0.1:6379> xrange mq 1594953410148-0 1594953438554-0
1) 1) "1594953410148-0"
   2) 1) "name"
      2) "koishi"
      3) "age"
      4) "16"
2) 1) "1594953422249-0"
   2) 1) "name"
      2) "scarlet"
      3) "age"
      4) "400"
3) 1) "1594953438554-0"
   2) 1) "name"
      2) "morisa"
      3) "age"
      4) "unknow"
127.0.0.1:6379>
127.0.0.1:6379> # -表示第一條訊息、+表示最後一條訊息
127.0.0.1:6379> xrange mq - +
1) 1) "1594953403230-0"
   2) 1) "name"
      2) "satori"
      3) "age"
      4) "17"
2) 1) "1594953410148-0"
   2) 1) "name"
      2) "koishi"
      3) "age"
      4) "16"
3) 1) "1594953422249-0"
   2) 1) "name"
      2) "scarlet"
      3) "age"
      4) "400"
4) 1) "1594953438554-0"
   2) 1) "name"
      2) "morisa"
      3) "age"
      4) "unknow" 
127.0.0.1:6379> 
127.0.0.1:6379> # count n表示限定數量,這裡是返回兩條
127.0.0.1:6379> xrange mq - + count 2
1) 1) "1594953403230-0"
   2) 1) "name"
      2) "satori"
      3) "age"
      4) "17"
2) 1) "1594953410148-0"
   2) 1) "name"
      2) "koishi"
      3) "age"
      4) "16"
127.0.0.1:6379>       

雖然這裡查詢用的是訊息ID,但是也要像索引一樣注意先後關係。start對應的訊息要在end對應的訊息之前,類似於索引。

查詢某個訊息之後的訊息

語法:xread count n streams xxx MESSAGE_ID

從名為xxx的stream中,讀取訊息ID為MESSAGE_ID的後n條訊息

127.0.0.1:6379> xread count 2 streams mq 1594953410148-0
1) 1) "mq"
   2) 1) 1) "1594953422249-0"
         2) 1) "name"
            2) "scarlet"
            3) "age"
            4) "400"
      2) 1) "1594953438554-0"
         2) 1) "name"
            2) "morisa"
            3) "age"
            4) "unknow"
127.0.0.1:6379>
127.0.0.1:6379> # 該訊息後面只剩一條訊息了,所以即便count為2,所以也只返回了一條
127.0.0.1:6379> xread count 2 streams mq 1594953422249-0
1) 1) "mq"
   2) 1) 1) "1594953438554-0"
         2) 1) "name"
            2) "morisa"
            3) "age"
            4) "unknow"
127.0.0.1:6379>

並且該命令還提供了一個可以阻塞讀取的引數block,我們可以使用它讀取某條資料之後的新增資料。

比如:xread count 1 block streams mq $

$表示最後一條,此時程式會阻塞,會一直讀取最後一條資料之後的新增資料,既然阻塞,那麼肯定要開啟兩個終端才會看得到現象。

127.0.0.1:6379> xread count 1 block 0 streams mq $  # 程式就卡在了這裡

127.0.0.1:6379> xadd mq * name mea age 19  # 新開一個視窗,新增資料
"1594969025661-0"
127.0.0.1:6379> 
127.0.0.1:6379> xread count 1 block 0 streams mq $
1) 1) "mq"
   2) 1) 1) "1594969025661-0"
         2) 1) "name"
            2) "mea"
            3) "age"
            4) "19"
(44.51s)
127.0.0.1:6379>  # 此時接收到了新新增的資料,另外此時監聽也就結束了。

Python實現stream

from pprint import pprint
import redis

client = redis.Redis(host="47.94.174.89", decode_responses="utf-8")


def producer():
    id_lst = []
    for _ in [
        {"name": "mashiro", "age": 17},
        {"name": "satori", "age": 17},
        {"name": "koishi", "age": 17}]:
        id_lst.append(client.xadd("ch", _))
    return id_lst


def consumer():
    id_lst = producer()
    # 第二個引數和第三個引數預設是"-"和"+",也就是全部讀取
    # 當然我們也可以使用xread,具體引數可以看註釋
    msg = client.xrange("ch", id_lst[0], id_lst[-1])
    pprint(msg)


consumer()
"""
[('1594956361922-0', {'age': '17', 'name': 'mashiro'}),
 ('1594956361929-0', {'age': '17', 'name': 'satori'}),
 ('1594956361934-0', {'age': '17', 'name': 'koishi'})]
"""

訊息佇列的終極解決方案--stream(下)

下面我們使用訊息分組,不過在開始使用訊息分組之前,我們必須手動建立分組才行,以下是幾個和 Stream 分組有關的命令,我們先來學習一下它的使用。

訊息分組命令

建立消費者群組

語法:xgroup create <stream_key> <group_key> <ID>

127.0.0.1:6379> xgroup create mq group1 0-0
OK
127.0.0.1:6379>
  • mq:stream的key
  • group1:分組的名稱
  • 0-0:表示從第一條訊息開始讀取

如果從當前最後一條訊息向後讀取的話,那麼使用$即可。

127.0.0.1:6379> xgroup create mq group2 $
OK
127.0.0.1:6379> 

讀取訊息

語法:xreadgroup group <group_key> <consumer_key> [count n] streams <stream_key>

  • group_key:建立的分組名
  • consumer_key:消費者名,隨便指定即可
  • count n:每次讀取的數量,可選,不指定全部返回
  • stream_key:佇列名稱
127.0.0.1:6379> xreadgroup group group1 c1 count 1 streams mq >  # 結尾應該還有個>,表示讀取下一條訊息
1) 1) "mq"
   2) 1) 1) "1594953403230-0"
         2) 1) "name"
            2) "satori"
            3) "age"
            4) "17"
127.0.0.1:6379> 
127.0.0.1:6379> xreadgroup group group1 古明地覺 count 1 streams mq >  # 消費者名字隨便起
1) 1) "mq"
   2) 1) 1) "1594953410148-0"
         2) 1) "name"
            2) "koishi"
            3) "age"
            4) "16"
127.0.0.1:6379> 

這個引數類似於xread,也可以設定阻塞讀取。

127.0.0.1:6379> xreadgroup group group1 c2  streams mq >  # 不指定count,將訊息全部消費完
1) 1) "mq"
   2) 1) 1) "1594953422249-0"
         2) 1) "name"
            2) "scarlet"
            3) "age"
            4) "400"
      2) 1) "1594953438554-0"
         2) 1) "name"
            2) "morisa"
            3) "age"
            4) "unknow"
      3) 1) "1594969025661-0"
         2) 1) "name"
            2) "mea"
            3) "age"
            4) "19"
127.0.0.1:6379> xreadgroup group group1 c2 streams mq >  # 此時已經獲取不到訊息了
(nil)# 另外,我們這裡消費者數量是不受限制的,它們消費的都是同一個佇列裡面的資料
127.0.0.1:6379> xreadgroup group group1 c2 block 0  streams mq > # 開啟阻塞監聽狀態

127.0.0.1:6379> xadd mq * name nagisa age 21  # 在另一個終端中向mq中傳送一條資料
"1594970144178-0"
127.0.0.1:6379> 
127.0.0.1:6379> xreadgroup group group1 c2 block 0 streams mq >
1) 1) "mq"
   2) 1) 1) "1594970144178-0"
         2) 1) "name"
            2) "nagisa"
            3) "age"
            4) "21"
(119.02s)
127.0.0.1:6379>  # 我們看到這裡收到了資料,並且提示我們等待了119.02秒

訊息消費確認

一般訊息接收完了,我們會回覆一個確認資訊,告知我們消費完畢,命令:xack key group-key ID······

127.0.0.1:6379> xack mq group1 1594970144178-0
(integer) 1
127.0.0.1:6379> 

消費確認增加了訊息的可靠性,一般在業務處理完成之後,需要執行 ack 確認訊息已經被消費完成,整個流程的執行如下圖所示:

查詢未確認的消費佇列

127.0.0.1:6379> xpending mq group1
1) (integer) 5  # 未確認(ack)的訊息數量為 1 條
2) "1594953403230-0"
3) "1594969025661-0"
4) 1) 1) "c1"
      2) "1"
   2) 1) "c2"
      2) "3"
   3) 1) "\xe5\x8f\xa4\xe6\x98\x8e\xe5\x9c\xb0\xe8\xa7\x89"
      2) "1"
127.0.0.1:6379> 
127.0.0.1:6379> xack mq group1 1594953403230-0 1594969025661-0  # 確認兩條
(integer) 2
127.0.0.1:6379> xpending mq group1  # 還剩下三條
1) (integer) 3
2) "1594953410148-0"
3) "1594953438554-0"
4) 1) 1) "c2"
      2) "2"
   2) 1) "\xe5\x8f\xa4\xe6\x98\x8e\xe5\x9c\xb0\xe8\xa7\x89"
      2) "1"
127.0.0.1:6379>

xinfo 查詢相關命令

  • 1. 查詢流資訊:xinfo stream stream_key(佇列)
127.0.0.1:6379> xinfo stream mq
 1) "length"
 2) (integer) 6  # 佇列中有6個訊息
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "last-generated-id"
 8) "1594970144178-0"
 9) "groups"  # 2個消費分組,我們上面的group1 group2
10) (integer) 2
11) "first-entry"
12) 1) "1594953403230-0"
    2) 1) "name"
       2) "satori"
       3) "age"
       4) "17"
13) "last-entry"
14) 1) "1594970144178-0"
    2) 1) "name"
       2) "nagisa"
       3) "age"
       4) "21"
127.0.0.1:6379> 
  • 查詢消費組訊息:xinfo groups stream_key
127.0.0.1:6379> xinfo groups mq
1) 1) "name"
   2) "group1"  # 訊息分組名稱
   3) "consumers"
   4) (integer) 3  # 3個消費者
   5) "pending"
   6) (integer) 3  # 三個未確認的訊息
   7) "last-delivered-id"
   8) "1594970144178-0"
2) 1) "name"
   2) "group2"
   3) "consumers"
   4) (integer) 0
   5) "pending"
   6) (integer) 0
   7) "last-delivered-id"
   8) "1594953438554-0"
127.0.0.1:6379> 
  • 查詢消費組成員資訊:xinfo consumers stream_key group_key
127.0.0.1:6379> xinfo consumers mq group1
1) 1) "name"
   2) "c1"  # 消費者名稱
   3) "pending"
   4) (integer) 0
   5) "idle"
   6) (integer) 25214247
2) 1) "name"
   2) "c2"
   3) "pending"
   4) (integer) 2
   5) "idle"
   6) (integer) 24613903
3) 1) "name"
   2) "\xe5\x8f\xa4\xe6\x98\x8e\xe5\x9c\xb0\xe8\xa7\x89"  # 中文名的消費者
   3) "pending"
   4) (integer) 1
   5) "idle"
   6) (integer) 25200911
127.0.0.1:6379> 
  • 刪除消費者:xgroup delconsumer stream-key group-key consumer-key
127.0.0.1:6379> xgroup delconsumer mq group1 c2
  • 刪除消費組:xgroup destroy stream-key group-key
127.0.0.1:6379> xgroup destroy mq group1
(integer) 1
127.0.0.1:6379> 

小結

感覺訊息佇列的話,釋出訂閱和主題訂閱不是很難,但是stream的命令有點頭疼,這裡介紹的不是很詳細。建議自己理解一下,並實際動手操作。