Redis(8)——釋出/訂閱與Stream
阿新 • • 發佈:2020-03-15
![](https://upload-images.jianshu.io/upload_images/7896890-31406a824536c54a.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
# 一、Redis 中的釋出/訂閱功能
**釋出/ 訂閱系統** 是 Web 系統中比較常用的一個功能。簡單點說就是 **釋出者釋出訊息,訂閱者接受訊息**,這有點類似於我們的報紙/ 雜誌社之類的: *(借用前邊的一張圖)*
![](https://upload-images.jianshu.io/upload_images/7896890-13aa5cb2668368fe.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
- 圖片引用自:「訊息佇列」看過來! - [https://www.wmyskxz.com/2019/07/16/xiao-xi-dui-lie-kan-guo-lai/](https://www.wmyskxz.com/2019/07/16/xiao-xi-dui-lie-kan-guo-lai/)
從我們 *前面(下方相關閱讀)* 學習的知識來看,我們雖然可以使用一個 `list` 列表結構結合 `lpush` 和 `rpop` 來實現訊息佇列的功能,但是似乎很難實現實現 **訊息多播** 的功能:
![](https://upload-images.jianshu.io/upload_images/7896890-526a5b110a7c4ea2.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
為了支援訊息多播,**Redis** 不能再依賴於那 5 種基礎的資料結構了,它單獨使用了一個模組來支援訊息多播,這個模組就是 **PubSub**,也就是 **PublisherSubscriber** *(釋出者/ 訂閱者模式)*。
## PubSub 簡介
我們從 *上面的圖* 中可以看到,基於 `list` 結構的訊息佇列,是一種 `Publisher` 與 `Consumer` 點對點的強關聯關係,**Redis** 為了消除這樣的強關聯,引入了另一種概念:**頻道** *(channel)*:
![](https://upload-images.jianshu.io/upload_images/7896890-cc3bb012eeca9fca.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
當 `Publisher` 往 `channel` 中釋出訊息時,關注了指定 `channel` 的 `Consumer` 就能夠同時受到訊息。但這裡的 **問題** 是,消費者訂閱一個頻道是必須 **明確指定頻道名稱** 的,這意味著,如果我們想要 **訂閱多個** 頻道,那麼就必須 **顯式地關注多個** 名稱。
為了簡化訂閱的繁瑣操作,**Redis** 提供了 **模式訂閱** 的功能 **Pattern Subscribe**,這樣就可以 **一次性關注多個頻道** 了,即使生產者新增了同模式的頻道,消費者也可以立即受到訊息:
![](https://upload-images.jianshu.io/upload_images/7896890-18ac258e4e9387da.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
例如上圖中,**所有** 位於圖片下方的 **`Consumer` 都能夠受到訊息**。
`Publisher` 往 `wmyskxz.chat` 這個 `channel` 中傳送了一條訊息,不僅僅關注了這個頻道的 `Consumer 1` 和 `Consumer 2` 能夠受到訊息,圖片中的兩個 `channel` 都和模式 `wmyskxz.*` 匹配,所以 **Redis** 此時會同樣傳送訊息給訂閱了 `wmyskxz.*` 這個模式的 `Consumer 3` 和關注了在這個模式下的另一個頻道 `wmyskxz.log` 下的 `Consumer 4` 和 `Consumer 5`。
另一方面,如果接收訊息的頻道是 `wmyskxz.chat`,那麼 `Consumer 3` 也會受到訊息。
## 快速體驗
在 **Redis** 中,**PubSub** 模組的使用非常簡單,常用的命令也就下面這麼幾條:
```bash
# 訂閱頻道:
SUBSCRIBE channel [channel ....] # 訂閱給定的一個或多個頻道的資訊
PSUBSCRIBE pattern [pattern ....] # 訂閱一個或多個符合給定模式的頻道
# 釋出頻道:
PUBLISH channel message # 將訊息傳送到指定的頻道
# 退訂頻道:
UNSUBSCRIBE [channel [channel ....]] # 退訂指定的頻道
PUNSUBSCRIBE [pattern [pattern ....]] #退訂所有給定模式的頻道
```
我們可以在本地快速地來體驗一下 **PubSub**:
![](https://upload-images.jianshu.io/upload_images/7896890-518e0d1e93135775.gif?imageMogr2/auto-orient/strip)
具體步驟如下:
1. 開啟本地 Redis 服務,新建兩個控制檯視窗;
2. 在其中一個視窗輸入 `SUBSCRIBE wmyskxz.chat` 關注 `wmyskxz.chat` 頻道,讓這個視窗成為 **消費者**。
3. 在另一個視窗輸入 `PUBLISH wmyskxz.chat 'message'` 往這個頻道傳送訊息,這個時候就會看到 **另一個視窗實時地出現** 了傳送的測試訊息。
## 實現原理
可以看到,我們通過很簡單的兩條命令,幾乎就可以簡單使用這樣的一個 **釋出/ 訂閱系統** 了,但是具體是怎麼樣實現的呢?
**每個 Redis 伺服器程序維持著一個標識伺服器狀態** 的 `redis.h/redisServer` 結構,其中就 **儲存著有訂閱的頻道** 以及 **訂閱模式** 的資訊:
```c
struct redisServer {
// ...
dict *pubsub_channels; // 訂閱頻道
list *pubsub_patterns; // 訂閱模式
// ...
};
```
### 訂閱頻道原理
當客戶端訂閱某一個頻道之後,Redis 就會往 `pubsub_channels` 這個字典中新新增一條資料,實際上這個 `dict` 字典維護的是一張連結串列,比如,下圖展示的 `pubsub_channels` 示例中,`client 1`、`client 2` 就訂閱了 `channel 1`,而其他頻道也分別被其他客戶端訂閱:
![](https://upload-images.jianshu.io/upload_images/7896890-218fc15f7c368eee.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
#### SUBSCRIBE 命令
`SUBSCRIBE` 命令的行為可以用下列的偽程式碼表示:
```python
def SUBSCRIBE(client, channels):
# 遍歷所有輸入頻道
for channel in channels:
# 將客戶端新增到連結串列的末尾
redisServer.pubsub_channels[channel].append(client)
```
通過 `pubsub_channels` 字典,程式只要檢查某個頻道是否為字典的鍵,就可以知道該頻道是否正在被客戶端訂閱;只要取出某個鍵的值,就可以得到所有訂閱該頻道的客戶端的資訊。
#### PUBLISH 命令
瞭解 `SUBSCRIBE`,那麼 `PUBLISH` 命令的實現也變得十分簡單了,只需要通過上述字典定位到具體的客戶端,再把訊息傳送給它們就好了:*(虛擬碼實現如下)*
```python
def PUBLISH(channel, message):
# 遍歷所有訂閱頻道 channel 的客戶端
for client in server.pubsub_channels[channel]:
# 將資訊傳送給它們
send_message(client, message)
```
#### UNSUBSCRIBE 命令
使用 `UNSUBSCRIBE` 命令可以退訂指定的頻道,這個命令執行的是訂閱的反操作:它從 `pubsub_channels` 字典的給定頻道(鍵)中,刪除關於當前客戶端的資訊,這樣被退訂頻道的資訊就不會再發送給這個客戶端。
### 訂閱模式原理
![](https://upload-images.jianshu.io/upload_images/7896890-18ac258e4e9387da.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
正如我們上面說到了,當傳送一條訊息到 `wmyskxz.chat` 這個頻道時,Redis 不僅僅會發送到當前的頻道,還會發送到匹配於當前模式的所有頻道,實際上,`pubsub_patterns` 背後還維護了一個 `redis.h/pubsubPattern` 結構:
```c
typedef struct pubsubPattern {
redisClient *client; // 訂閱模式的客戶端
robj *pattern; // 訂閱的模式
} pubsubPattern;
```
每當呼叫 `PSUBSCRIBE` 命令訂閱一個模式時,程式就建立一個包含客戶端資訊和被訂閱模式的 `pubsubPattern` 結構,並將該結構新增到 `redisServer.pubsub_patterns` 連結串列中。
我們來看一個 `pusub_patterns` 連結串列的示例:
![](https://upload-images.jianshu.io/upload_images/7896890-d0d3b1849fdb6162.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
這個時候客戶端 `client 3` 執行 `PSUBSCRIBE wmyskxz.java.*`,那麼 `pubsub_patterns` 連結串列就會被更新成這樣:
![](https://upload-images.jianshu.io/upload_images/7896890-edbf11995590de50.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
通過遍歷整個 `pubsub_patterns` 連結串列,程式可以檢查所有正在被訂閱的模式,以及訂閱這些模式的客戶端。
#### PUBLISH 命令
上面給出的虛擬碼並沒有 **完整描述** `PUBLISH` 命令的行為,因為 `PUBLISH` 除了將 `message` 傳送到 **所有訂閱 `channel` 的客戶端** 之外,它還會將 `channel` 和 `pubsub_patterns` 中的 **模式** 進行對比,如果 `channel` 和某個模式匹配的話,那麼也將 `message` 傳送到 **訂閱那個模式的客戶端**。
完整描述 `PUBLISH` 功能的虛擬碼定於如下:
```python
def PUBLISH(channel, message):
# 遍歷所有訂閱頻道 channel 的客戶端
for client in server.pubsub_channels[channel]:
# 將資訊傳送給它們
send_message(client, message)
# 取出所有模式,以及訂閱模式的客戶端
for pattern, client in server.pubsub_patterns:
# 如果 channel 和模式匹配
if match(channel, pattern):
# 那麼也將資訊發給訂閱這個模式的客戶端
send_message(client, message)
```
#### PUNSUBSCRIBE 命令
使用 `PUNSUBSCRIBE` 命令可以退訂指定的模式,這個命令執行的是訂閱模式的反操作:序會刪除 `redisServer.pubsub_patterns` 連結串列中,所有和被退訂模式相關聯的 `pubsubPattern` 結構,這樣客戶端就不會再收到和模式相匹配的頻道發來的資訊。
## PubSub 的缺點
儘管 **Redis** 實現了 **PubSub** 模式來達到了 **多播訊息佇列** 的目的,但在實際的訊息佇列的領域,幾乎 **找不到特別合適的場景**,因為它的缺點十分明顯:
- **沒有 Ack 機制,也不保證資料的連續:** PubSub 的生產者傳遞過來一個訊息,Redis 會直接找到相應的消費者傳遞過去。如果沒有一個消費者,那麼訊息會被直接丟棄。如果開始有三個消費者,其中一個突然掛掉了,過了一會兒等它再重連時,那麼重連期間的訊息對於這個消費者來說就徹底丟失了。
- **不持久化訊息:** 如果 Redis 停機重啟,PubSub 的訊息是不會持久化的,畢竟 Redis 宕機就相當於一個消費者都沒有,所有的訊息都會被直接丟棄。
基於上述缺點,Redis 的作者甚至單獨開啟了一個 Disque 的專案來專門用來做多播訊息佇列,不過該專案目前好像都沒有成熟。不過後來在 2018 年 6 月,**Redis 5.0** 新增了 `Stream` 資料結構,這個功能給 Redis 帶來了 **持久化訊息佇列**,從此 PubSub 作為訊息佇列的功能可以說是就消失了..
![](http://ww1.sinaimg.cn/bmiddle/006APoFYjw1fbkgv6dh18g303r041744.gif)
# 二、更為強大的 Stream | 持久化的釋出/訂閱系統
**Redis Stream** 從概念上來說,就像是一個 **僅追加內容** 的 **訊息連結串列**,把所有加入的訊息都一個一個串起來,每個訊息都有一個唯一的 ID 和內容,這很簡單,讓它複雜的是從 Kafka 借鑑的另一種概念:**消費者組(Consumer Group)** *(思路一致,實現不同)*:
![](https://upload-images.jianshu.io/upload_images/7896890-b9d8afde068a165f.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
上圖就展示了一個典型的 **Stream** 結構。每個 Stream 都有唯一的名稱,它就是 Redis 的 `key`,在我們首次使用 `xadd` 指令追加訊息時自動建立。我們對圖中的一些概念做一下解釋:
- **Consumer Group**:消費者組,可以簡單看成記錄流狀態的一種資料結構。消費者既可以選擇使用 `XREAD` 命令進行 **獨立消費**,也可以多個消費者同時加入一個消費者組進行 **組內消費**。同一個消費者組內的消費者共享所有的 Stream 資訊,**同一條訊息只會有一個消費者消費到**,這樣就可以應用在分散式的應用場景中來保證訊息的唯一性。
- **last_delivered_id**:用來表示消費者組消費在 Stream 上 **消費位置** 的遊標資訊。每個消費者組都有一個 Stream 內 **唯一的名稱**,消費者組不會自動建立,需要使用 `XGROUP CREATE` 指令來顯式建立,並且需要指定從哪一個訊息 ID 開始消費,用來初始化 `last_delivered_id` 這個變數。
- **pending_ids**:每個消費者內部都有的一個狀態變數,用來表示 **已經** 被客戶端 **獲取**,但是 **還沒有 ack** 的訊息。記錄的目的是為了 **保證客戶端至少消費了訊息一次**,而不會在網路傳輸的中途丟失而沒有對訊息進行處理。如果客戶端沒有 ack,那麼這個變數裡面的訊息 ID 就會越來越多,一旦某個訊息被 ack,它就會對應開始減少。這個變數也被 Redis 官方稱為 **PEL** *(Pending Entries List)*。
## 訊息 ID 和訊息內容
#### 訊息 ID
訊息 ID 如果是由 `XADD` 命令返回自動建立的話,那麼它的格式會像這樣:`timestampInMillis-sequence` *(毫秒時間戳-序列號)*,例如 `1527846880585-5`,它表示當前的訊息是在毫秒時間戳 `1527846880585` 時產生的,並且是該毫秒內產生的第 5 條訊息。
這些 ID 的格式看起來有一些奇怪,**為什麼要使用時間來當做 ID 的一部分呢?** 一方面,我們要 **滿足 ID 自增** 的屬性,另一方面,也是為了 **支援範圍查詢** 的功能。由於 ID 和生成訊息的時間有關,這樣就使得在根據時間範圍內查詢時基本上是沒有額外損耗的。
當然訊息 ID 也可以由客戶端自定義,但是形式必須是 **"整數-整數"**,而且後面加入的訊息的 ID 必須要大於前面的訊息 ID。
#### 訊息內容
訊息內容就是普通的鍵值對,形如 hash 結構的鍵值對。
## 增刪改查示例
增刪改查命令很簡單,詳情如下:
1. `xadd`:追加訊息
2. `xdel`:刪除訊息,這裡的刪除僅僅是設定了標誌位,不影響訊息總長度
3. `xrange`:獲取訊息列表,會自動過濾已經刪除的訊息
4. `xlen`:訊息長度
5. `del`:刪除Stream
使用示例:
```bash
# *號表示伺服器自動生成ID,後面順序跟著一堆key/value
127.0.0.1:6379> xadd codehole * name laoqian age 30 # 名字叫laoqian,年齡30歲
1527849609889-0 # 生成的訊息ID
127.0.0.1:6379> xadd codehole * name xiaoyu age 29
1527849629172-0
127.0.0.1:6379> xadd codehole * name xiaoqian age 1
1527849637634-0
127.0.0.1:6379> xlen codehole
(integer) 3
127.0.0.1:6379> xrange codehole - + # -表示最小值, +表示最大值
1) 1) 1527849609889-0
2) 1) "name"
2) "laoqian"
3) "age"
4) "30"
2) 1) 1527849629172-0
2) 1) "name"
2) "xiaoyu"
3) "age"
4) "29"
3) 1) 1527849637634-0
2) 1) "name"
2) "xiaoqian"
3) "age"
4) "1"
127.0.0.1:6379> xrange codehole 1527849629172-0 + # 指定最小訊息ID的列表
1) 1) 1527849629172-0
2) 1) "name"
2) "xiaoyu"
3) "age"
4) "29"
2) 1) 1527849637634-0
2) 1) "name"
2) "xiaoqian"
3) "age"
4) "1"
127.0.0.1:6379> xrange codehole - 1527849629172-0 # 指定最大訊息ID的列表
1) 1) 1527849609889-0
2) 1) "name"
2) "laoqian"
3) "age"
4) "30"
2) 1) 1527849629172-0
2) 1) "name"
2) "xiaoyu"
3) "age"
4) "29"
127.0.0.1:6379> xdel codehole 1527849609889-0
(integer) 1
127.0.0.1:6379> xlen codehole # 長度不受影響
(integer) 3
127.0.0.1:6379> xrange codehole - + # 被刪除的訊息沒了
1) 1) 1527849629172-0
2) 1) "name"
2) "xiaoyu"
3) "age"
4) "29"
2) 1) 1527849637634-0
2) 1) "name"
2) "xiaoqian"
3) "age"
4) "1"
127.0.0.1:6379> del codehole # 刪除整個Stream
(integer) 1
```
## 獨立消費示例
我們可以在不定義消費組的情況下進行 Stream 訊息的 **獨立消費**,當 Stream 沒有新訊息時,甚至可以阻塞等待。Redis 設計了一個單獨的消費指令 `xread`,可以將 Stream 當成普通的訊息佇列(list)來使用。使用 `xread` 時,我們可以完全忽略 **消費組(Consumer Group)** 的存在,就好比 Stream 就是一個普通的列表(list):
```bash
# 從Stream頭部讀取兩條訊息
127.0.0.1:6379> xread count 2 streams codehole 0-0
1) 1) "codehole"
2) 1) 1) 1527851486781-0
2) 1) "name"
2) "laoqian"
3) "age"
4) "30"
2) 1) 1527851493405-0
2) 1) "name"
2) "yurui"
3) "age"
4) "29"
# 從Stream尾部讀取一條訊息,毫無疑問,這裡不會返回任何訊息
127.0.0.1:6379> xread count 1 streams codehole $
(nil)
# 從尾部阻塞等待新訊息到來,下面的指令會堵住,直到新訊息到來
127.0.0.1:6379> xread block 0 count 1 streams codehole $
# 我們從新開啟一個視窗,在這個視窗往Stream裡塞訊息
127.0.0.1:6379> xadd codehole * name youming age 60
1527852774092-0
# 再切換到前面的視窗,我們可以看到阻塞解除了,返回了新的訊息內容
# 而且還顯示了一個等待時間,這裡我們等待了93s
127.0.0.1:6379> xread block 0 count 1 streams codehole $
1) 1) "codehole"
2) 1) 1) 1527852774092-0
2) 1) "name"
2) "youming"
3) "age"
4) "60"
(93.11s)
```
客戶端如果想要使用 `xread` 進行 **順序消費**,一定要 **記住當前消費** 到哪裡了,也就是返回的訊息 ID。下次繼續呼叫 `xread` 時,將上次返回的最後一個訊息 ID 作為引數傳遞進去,就可以繼續消費後續的訊息。
`block 0` 表示永遠阻塞,直到訊息到來,`block 1000` 表示阻塞 `1s`,如果 `1s` 內沒有任何訊息到來,就返回 `nil`:
```bash
127.0.0.1:6379> xread block 1000 count 1 streams codehole $
(nil)
(1.07s)
```
## 建立消費者示例
Stream 通過 `xgroup create` 指令建立消費組(Consumer Group),需要傳遞起始訊息 ID 引數用來初始化 `last_delivered_id` 變數:
```bash
127.0.0.1:6379> xgroup create codehole cg1 0-0 # 表示從頭開始消費
OK
# $表示從尾部開始消費,只接受新訊息,當前Stream訊息會全部忽略
127.0.0.1:6379> xgroup create codehole cg2 $
OK
127.0.0.1:6379> xinfo codehole # 獲取Stream資訊
1) length
2) (integer) 3 # 共3個訊息
3) radix-tree-keys
4) (integer) 1
5) radix-tree-nodes
6) (integer) 2
7) groups
8) (integer) 2 # 兩個消費組
9) first-entry # 第一個訊息
10) 1) 1527851486781-0
2) 1) "name"
2) "laoqian"
3) "age"
4) "30"
11) last-entry # 最後一個訊息
12) 1) 1527851498956-0
2) 1) "name"
2) "xiaoqian"
3) "age"
4) "1"
127.0.0.1:6379> xinfo groups codehole # 獲取Stream的消費組資訊
1) 1) name
2) "cg1"
3) consumers
4) (integer) 0 # 該消費組還沒有消費者
5) pending
6) (integer) 0 # 該消費組沒有正在處理的訊息
2) 1) name
2) "cg2"
3) consumers # 該消費組還沒有消費者
4) (integer) 0
5) pending
6) (integer) 0 # 該消費組沒有正在處理的訊息
```
## 組內消費示例
Stream 提供了 `xreadgroup` 指令可以進行消費組的組內消費,需要提供 **消費組名稱、消費者名稱和起始訊息 ID**。它同 `xread` 一樣,也可以阻塞等待新訊息。讀到新訊息後,對應的訊息 ID 就會進入消費者的 **PEL** *(正在處理的訊息)* 結構裡,客戶端處理完畢後使用 `xack` 指令 **通知伺服器**,本條訊息已經處理完畢,該訊息 ID 就會從 **PEL** 中移除,下面是示例:
```bash
# >號表示從當前消費組的last_delivered_id後面開始讀
# 每當消費者讀取一條訊息,last_delivered_id變數就會前進
127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 1 streams codehole >
1) 1) "codehole"
2) 1) 1) 1527851486781-0
2) 1) "name"
2) "laoqian"
3) "age"
4) "30"
127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 1 streams codehole >
1) 1) "codehole"
2) 1) 1) 1527851493405-0
2) 1) "name"
2) "yurui"
3) "age"
4) "29"
127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 2 streams codehole >
1) 1) "codehole"
2) 1) 1) 1527851498956-0
2) 1) "name"
2) "xiaoqian"
3) "age"
4) "1"
2) 1) 1527852774092-0
2) 1) "name"
2) "youming"
3) "age"
4) "60"
# 再繼續讀取,就沒有新訊息了
127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 1 streams codehole >
(nil)
# 那就阻塞等待吧
127.0.0.1:6379> xreadgroup GROUP cg1 c1 block 0 count 1 streams codehole >
# 開啟另一個視窗,往裡塞訊息
127.0.0.1:6379> xadd codehole * name lanying age 61
1527854062442-0
# 回到前一個視窗,發現阻塞解除,收到新訊息了
127.0.0.1:6379> xreadgroup GROUP cg1 c1 block 0 count 1 streams codehole >
1) 1) "codehole"
2) 1) 1) 1527854062442-0
2) 1) "name"
2) "lanying"
3) "age"
4) "61"
(36.54s)
127.0.0.1:6379> xinfo groups codehole # 觀察消費組資訊
1) 1) name
2) "cg1"
3) consumers
4) (integer) 1 # 一個消費者
5) pending
6) (integer) 5 # 共5條正在處理的資訊還有沒有ack
2) 1) name
2) "cg2"
3) consumers
4) (integer) 0 # 消費組cg2沒有任何變化,因為前面我們一直在操縱cg1
5) pending
6) (integer) 0
# 如果同一個消費組有多個消費者,我們可以通過xinfo consumers指令觀察每個消費者的狀態
127.0.0.1:6379> xinfo consumers codehole cg1 # 目前還有1個消費者
1) 1) name
2) "c1"
3) pending
4) (integer) 5 # 共5條待處理訊息
5) idle
6) (integer) 418715 # 空閒了多長時間ms沒有讀取訊息了
# 接下來我們ack一條訊息
127.0.0.1:6379> xack codehole cg1 1527851486781-0
(integer) 1
127.0.0.1:6379> xinfo consumers codehole cg1
1) 1) name
2) "c1"
3) pending
4) (integer) 4 # 變成了5條
5) idle
6) (integer) 668504
# 下面ack所有訊息
127.0.0.1:6379> xack codehole cg1 1527851493405-0 1527851498956-0 1527852774092-0 1527854062442-0
(integer) 4
127.0.0.1:6379> xinfo consumers codehole cg1
1) 1) name
2) "c1"
3) pending
4) (integer) 0 # pel空了
5) idle
6) (integer) 745505
```
## QA 1:Stream 訊息太多怎麼辦? | Stream 的上限
很容易想到,要是訊息積累太多,Stream 的連結串列豈不是很長,內容會不會爆掉就是個問題了。`xdel` 指令又不會刪除訊息,它只是給訊息做了個標誌位。
Redis 自然考慮到了這一點,所以它提供了一個定長 Stream 功能。在 `xadd` 的指令提供一個定長長度 `maxlen`,就可以將老的訊息幹掉,確保最多不超過指定長度,使用起來也很簡單:
```bash
> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
2) 1) "value"
2) "2"
2) 1) 1526655000369-0
2) 1) "value"
2) "3"
```
如果使用 `MAXLEN` 選項,當 Stream 的達到指定長度後,老的訊息會自動被淘汰掉,因此 Stream 的大小是恆定的。目前還沒有選項讓 Stream 只保留給定數量的條目,因為為了一致地執行,這樣的命令必須在很長一段時間內阻塞以淘汰訊息。*(例如在新增資料的高峰期間,你不得不長暫停來淘汰舊訊息和新增新的訊息)*
另外使用 `MAXLEN` 選項的花銷是很大的,Stream 為了節省記憶體空間,採用了一種特殊的結構表示,而這種結構的調整是需要額外的花銷的。所以我們可以使用一種帶有 `~` 的特殊命令:
```bash
XADD mystream MAXLEN ~ 1000 * ... entry fields here ...
```
它會基於當前的結構合理地對節點執行裁剪,來保證至少會有 `1000` 條資料,可能是 `1010` 也可能是 `1030`。
## QA 2:PEL 是如何避免訊息丟失的?
在客戶端消費者讀取 Stream 訊息時,Redis 伺服器將訊息回覆給客戶端的過程中,客戶端突然斷開了連線,訊息就丟失了。但是 PEL 裡已經儲存了發出去的訊息 ID,待客戶端重新連上之後,可以再次收到 PEL 中的訊息 ID 列表。不過此時 `xreadgroup` 的起始訊息 ID 不能為引數 `>` ,而必須是任意有效的訊息 ID,一般將引數設為 `0-0`,表示讀取所有的 PEL 訊息以及自 `last_delivered_id` 之後的新訊息。
## Redis Stream Vs Kafka
Redis 基於記憶體儲存,這意味著它會比基於磁碟的 Kafka 快上一些,也意味著使用 Redis 我們 **不能長時間儲存大量資料**。不過如果您想以 **最小延遲** 實時處理訊息的話,您可以考慮 Redis,但是如果 **訊息很大並且應該重用資料** 的話,則應該首先考慮使用 Kafka。
另外從某些角度來說,`Redis Stream` 也更適用於小型、廉價的應用程式,因為 `Kafka` 相對來說更難配置一些。
# 相關閱讀
1. Redis(1)——5種基本資料結構 - [https://www.wmyskxz.com/2020/02/28/redis-1-5-chong-ji-ben-shu-ju-jie-gou/](https://www.wmyskxz.com/2020/02/28/redis-1-5-chong-ji-ben-shu-ju-jie-gou/)
2. Redis(2)——跳躍表 - [https://www.wmyskxz.com/2020/02/29/redis-2-tiao-yue-biao/](https://www.wmyskxz.com/2020/02/29/redis-2-tiao-yue-biao/)
3. Redis(3)——分散式鎖深入探究 - [https://www.wmyskxz.com/2020/03/01/redis-3/](https://www.wmyskxz.com/2020/03/01/redis-3/)
4. Reids(4)——神奇的HyperLoglog解決統計問題 - [https://www.wmyskxz.com/2020/03/02/reids-4-shen-qi-de-hyperloglog-jie-jue-tong-ji-wen-ti/](https://www.wmyskxz.com/2020/03/02/reids-4-shen-qi-de-hyperloglog-jie-jue-tong-ji-wen-ti/)
5. Redis(5)——億級資料過濾和布隆過濾器 - [https://www.wmyskxz.com/2020/03/11/redis-5-yi-ji-shu-ju-guo-lu-he-bu-long-guo-lu-qi/](https://www.wmyskxz.com/2020/03/11/redis-5-yi-ji-shu-ju-guo-lu-he-bu-long-guo-lu-qi/)
6. Redis(6)——GeoHash查詢附近的人[https://www.wmyskxz.com/2020/03/12/redis-6-geohash-cha-zhao-fu-jin-de-ren/](https://www.wmyskxz.com/2020/03/12/redis-6-geohash-cha-zhao-fu-jin-de-ren/)
7. Redis(7)——持久化【一文了解】 - [https://www.wmyskxz.com/2020/03/13/redis-7-chi-jiu-hua-yi-wen-liao-jie/](https://www.wmyskxz.com/2020/03/13/redis-7-chi-jiu-hua-yi-wen-liao-jie/)
# 參考資料
1. 訂閱與釋出——Redis 設計與實現 - [https://redisbook.readthedocs.io/en/latest/feature/pubsub.html](https://redisbook.readthedocs.io/en/latest/feature/pubsub.html)
2. 《Redis 深度歷險》 - 錢文品/ 著 - [https://book.douban.com/subject/30386804/](https://book.douban.com/subject/30386804/)
3. Introduction to Redis Streams【官方文件】 - [https://redis.io/topics/streams-intro](https://redis.io/topics/streams-intro)
4. Kafka vs. Redis: Log Aggregation Capabilities and Performance - [https://logz.io/blog/kafka-vs-redis/](https://logz.io/blog/kafka-vs-redis/)
> - 本文已收錄至我的 Github 程式設計師成長系列 **【More Than Java】,學習,不止 Code,歡迎 star:[https://github.com/wmyskxz/MoreThanJava](https://github.com/wmyskxz/MoreThanJava)**
> - **個人公眾號** :wmyskxz,**個人獨立域名部落格**:wmyskxz.com,堅持原創輸出,下方掃碼關注,2020,與您共同成長!
![](https://upload-images.jianshu.io/upload_images/7896890-fca34cfd601e7449.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
非常感謝各位人才能 **看到這裡**,如果覺得本篇文章寫得不錯,覺得 **「我沒有三顆心臟」有點東西** 的話,**求點贊,求關注,求分享,求留言!**
創作不易,各位的支援和認可,就是我創作的最大動力,我們下篇文