1. 程式人生 > >Redis(8)——釋出/訂閱與Stream

Redis(8)——釋出/訂閱與Stream

![](https://upload-images.jianshu.io/upload_images/7896890-31406a824536c54a.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) # 一、Redis 中的釋出/訂閱功能 **釋出/ 訂閱系統** 是 Web 系統中比較常用的一個功能。簡單點說就是 **釋出者釋出訊息,訂閱者接受訊息**,這有點類似於我們的報紙/ 雜誌社之類的: *(借用前邊的一張圖)* ![](https://upload-images.jianshu.io/upload_images/7896890-13aa5cb2668368fe.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) - 圖片引用自:「訊息佇列」看過來! - [https://www.wmyskxz.com/2019/07/16/xiao-xi-dui-lie-kan-guo-lai/](https://www.wmyskxz.com/2019/07/16/xiao-xi-dui-lie-kan-guo-lai/) 從我們 *前面(下方相關閱讀)* 學習的知識來看,我們雖然可以使用一個 `list` 列表結構結合 `lpush` 和 `rpop` 來實現訊息佇列的功能,但是似乎很難實現實現 **訊息多播** 的功能: ![](https://upload-images.jianshu.io/upload_images/7896890-526a5b110a7c4ea2.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 為了支援訊息多播,**Redis** 不能再依賴於那 5 種基礎的資料結構了,它單獨使用了一個模組來支援訊息多播,這個模組就是 **PubSub**,也就是 **PublisherSubscriber** *(釋出者/ 訂閱者模式)*。 ## PubSub 簡介 我們從 *上面的圖* 中可以看到,基於 `list` 結構的訊息佇列,是一種 `Publisher` 與 `Consumer` 點對點的強關聯關係,**Redis** 為了消除這樣的強關聯,引入了另一種概念:**頻道** *(channel)*: ![](https://upload-images.jianshu.io/upload_images/7896890-cc3bb012eeca9fca.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 當 `Publisher` 往 `channel` 中釋出訊息時,關注了指定 `channel` 的 `Consumer` 就能夠同時受到訊息。但這裡的 **問題** 是,消費者訂閱一個頻道是必須 **明確指定頻道名稱** 的,這意味著,如果我們想要 **訂閱多個** 頻道,那麼就必須 **顯式地關注多個** 名稱。 為了簡化訂閱的繁瑣操作,**Redis** 提供了 **模式訂閱** 的功能 **Pattern Subscribe**,這樣就可以 **一次性關注多個頻道** 了,即使生產者新增了同模式的頻道,消費者也可以立即受到訊息: ![](https://upload-images.jianshu.io/upload_images/7896890-18ac258e4e9387da.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 例如上圖中,**所有** 位於圖片下方的 **`Consumer` 都能夠受到訊息**。 `Publisher` 往 `wmyskxz.chat` 這個 `channel` 中傳送了一條訊息,不僅僅關注了這個頻道的 `Consumer 1` 和 `Consumer 2` 能夠受到訊息,圖片中的兩個 `channel` 都和模式 `wmyskxz.*` 匹配,所以 **Redis** 此時會同樣傳送訊息給訂閱了 `wmyskxz.*` 這個模式的 `Consumer 3` 和關注了在這個模式下的另一個頻道 `wmyskxz.log` 下的 `Consumer 4` 和 `Consumer 5`。 另一方面,如果接收訊息的頻道是 `wmyskxz.chat`,那麼 `Consumer 3` 也會受到訊息。 ## 快速體驗 在 **Redis** 中,**PubSub** 模組的使用非常簡單,常用的命令也就下面這麼幾條: ```bash # 訂閱頻道: SUBSCRIBE channel [channel ....] # 訂閱給定的一個或多個頻道的資訊 PSUBSCRIBE pattern [pattern ....] # 訂閱一個或多個符合給定模式的頻道 # 釋出頻道: PUBLISH channel message # 將訊息傳送到指定的頻道 # 退訂頻道: UNSUBSCRIBE [channel [channel ....]] # 退訂指定的頻道 PUNSUBSCRIBE [pattern [pattern ....]] #退訂所有給定模式的頻道 ``` 我們可以在本地快速地來體驗一下 **PubSub**: ![](https://upload-images.jianshu.io/upload_images/7896890-518e0d1e93135775.gif?imageMogr2/auto-orient/strip) 具體步驟如下: 1. 開啟本地 Redis 服務,新建兩個控制檯視窗; 2. 在其中一個視窗輸入 `SUBSCRIBE wmyskxz.chat` 關注 `wmyskxz.chat` 頻道,讓這個視窗成為 **消費者**。 3. 在另一個視窗輸入 `PUBLISH wmyskxz.chat 'message'` 往這個頻道傳送訊息,這個時候就會看到 **另一個視窗實時地出現** 了傳送的測試訊息。 ## 實現原理 可以看到,我們通過很簡單的兩條命令,幾乎就可以簡單使用這樣的一個 **釋出/ 訂閱系統** 了,但是具體是怎麼樣實現的呢? **每個 Redis 伺服器程序維持著一個標識伺服器狀態** 的 `redis.h/redisServer` 結構,其中就 **儲存著有訂閱的頻道** 以及 **訂閱模式** 的資訊: ```c struct redisServer { // ... dict *pubsub_channels; // 訂閱頻道 list *pubsub_patterns; // 訂閱模式 // ... }; ``` ### 訂閱頻道原理 當客戶端訂閱某一個頻道之後,Redis 就會往 `pubsub_channels` 這個字典中新新增一條資料,實際上這個 `dict` 字典維護的是一張連結串列,比如,下圖展示的 `pubsub_channels` 示例中,`client 1`、`client 2` 就訂閱了 `channel 1`,而其他頻道也分別被其他客戶端訂閱: ![](https://upload-images.jianshu.io/upload_images/7896890-218fc15f7c368eee.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) #### SUBSCRIBE 命令 `SUBSCRIBE` 命令的行為可以用下列的偽程式碼表示: ```python def SUBSCRIBE(client, channels): # 遍歷所有輸入頻道 for channel in channels: # 將客戶端新增到連結串列的末尾 redisServer.pubsub_channels[channel].append(client) ``` 通過 `pubsub_channels` 字典,程式只要檢查某個頻道是否為字典的鍵,就可以知道該頻道是否正在被客戶端訂閱;只要取出某個鍵的值,就可以得到所有訂閱該頻道的客戶端的資訊。 #### PUBLISH 命令 瞭解 `SUBSCRIBE`,那麼 `PUBLISH` 命令的實現也變得十分簡單了,只需要通過上述字典定位到具體的客戶端,再把訊息傳送給它們就好了:*(虛擬碼實現如下)* ```python def PUBLISH(channel, message): # 遍歷所有訂閱頻道 channel 的客戶端 for client in server.pubsub_channels[channel]: # 將資訊傳送給它們 send_message(client, message) ``` #### UNSUBSCRIBE 命令 使用 `UNSUBSCRIBE` 命令可以退訂指定的頻道,這個命令執行的是訂閱的反操作:它從 `pubsub_channels` 字典的給定頻道(鍵)中,刪除關於當前客戶端的資訊,這樣被退訂頻道的資訊就不會再發送給這個客戶端。 ### 訂閱模式原理 ![](https://upload-images.jianshu.io/upload_images/7896890-18ac258e4e9387da.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 正如我們上面說到了,當傳送一條訊息到 `wmyskxz.chat` 這個頻道時,Redis 不僅僅會發送到當前的頻道,還會發送到匹配於當前模式的所有頻道,實際上,`pubsub_patterns` 背後還維護了一個 `redis.h/pubsubPattern` 結構: ```c typedef struct pubsubPattern { redisClient *client; // 訂閱模式的客戶端 robj *pattern; // 訂閱的模式 } pubsubPattern; ``` 每當呼叫 `PSUBSCRIBE` 命令訂閱一個模式時,程式就建立一個包含客戶端資訊和被訂閱模式的 `pubsubPattern` 結構,並將該結構新增到 `redisServer.pubsub_patterns` 連結串列中。 我們來看一個 `pusub_patterns` 連結串列的示例: ![](https://upload-images.jianshu.io/upload_images/7896890-d0d3b1849fdb6162.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 這個時候客戶端 `client 3` 執行 `PSUBSCRIBE wmyskxz.java.*`,那麼 `pubsub_patterns` 連結串列就會被更新成這樣: ![](https://upload-images.jianshu.io/upload_images/7896890-edbf11995590de50.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 通過遍歷整個 `pubsub_patterns` 連結串列,程式可以檢查所有正在被訂閱的模式,以及訂閱這些模式的客戶端。 #### PUBLISH 命令 上面給出的虛擬碼並沒有 **完整描述** `PUBLISH` 命令的行為,因為 `PUBLISH` 除了將 `message` 傳送到 **所有訂閱 `channel` 的客戶端** 之外,它還會將 `channel` 和 `pubsub_patterns` 中的 **模式** 進行對比,如果 `channel` 和某個模式匹配的話,那麼也將 `message` 傳送到 **訂閱那個模式的客戶端**。 完整描述 `PUBLISH` 功能的虛擬碼定於如下: ```python def PUBLISH(channel, message): # 遍歷所有訂閱頻道 channel 的客戶端 for client in server.pubsub_channels[channel]: # 將資訊傳送給它們 send_message(client, message) # 取出所有模式,以及訂閱模式的客戶端 for pattern, client in server.pubsub_patterns: # 如果 channel 和模式匹配 if match(channel, pattern): # 那麼也將資訊發給訂閱這個模式的客戶端 send_message(client, message) ``` #### PUNSUBSCRIBE 命令 使用 `PUNSUBSCRIBE` 命令可以退訂指定的模式,這個命令執行的是訂閱模式的反操作:序會刪除 `redisServer.pubsub_patterns` 連結串列中,所有和被退訂模式相關聯的 `pubsubPattern` 結構,這樣客戶端就不會再收到和模式相匹配的頻道發來的資訊。 ## PubSub 的缺點 儘管 **Redis** 實現了 **PubSub** 模式來達到了 **多播訊息佇列** 的目的,但在實際的訊息佇列的領域,幾乎 **找不到特別合適的場景**,因為它的缺點十分明顯: - **沒有 Ack 機制,也不保證資料的連續:** PubSub 的生產者傳遞過來一個訊息,Redis 會直接找到相應的消費者傳遞過去。如果沒有一個消費者,那麼訊息會被直接丟棄。如果開始有三個消費者,其中一個突然掛掉了,過了一會兒等它再重連時,那麼重連期間的訊息對於這個消費者來說就徹底丟失了。 - **不持久化訊息:** 如果 Redis 停機重啟,PubSub 的訊息是不會持久化的,畢竟 Redis 宕機就相當於一個消費者都沒有,所有的訊息都會被直接丟棄。 基於上述缺點,Redis 的作者甚至單獨開啟了一個 Disque 的專案來專門用來做多播訊息佇列,不過該專案目前好像都沒有成熟。不過後來在 2018 年 6 月,**Redis 5.0** 新增了 `Stream` 資料結構,這個功能給 Redis 帶來了 **持久化訊息佇列**,從此 PubSub 作為訊息佇列的功能可以說是就消失了.. ![](http://ww1.sinaimg.cn/bmiddle/006APoFYjw1fbkgv6dh18g303r041744.gif) # 二、更為強大的 Stream | 持久化的釋出/訂閱系統 **Redis Stream** 從概念上來說,就像是一個 **僅追加內容** 的 **訊息連結串列**,把所有加入的訊息都一個一個串起來,每個訊息都有一個唯一的 ID 和內容,這很簡單,讓它複雜的是從 Kafka 借鑑的另一種概念:**消費者組(Consumer Group)** *(思路一致,實現不同)*: ![](https://upload-images.jianshu.io/upload_images/7896890-b9d8afde068a165f.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 上圖就展示了一個典型的 **Stream** 結構。每個 Stream 都有唯一的名稱,它就是 Redis 的 `key`,在我們首次使用 `xadd` 指令追加訊息時自動建立。我們對圖中的一些概念做一下解釋: - **Consumer Group**:消費者組,可以簡單看成記錄流狀態的一種資料結構。消費者既可以選擇使用 `XREAD` 命令進行 **獨立消費**,也可以多個消費者同時加入一個消費者組進行 **組內消費**。同一個消費者組內的消費者共享所有的 Stream 資訊,**同一條訊息只會有一個消費者消費到**,這樣就可以應用在分散式的應用場景中來保證訊息的唯一性。 - **last_delivered_id**:用來表示消費者組消費在 Stream 上 **消費位置** 的遊標資訊。每個消費者組都有一個 Stream 內 **唯一的名稱**,消費者組不會自動建立,需要使用 `XGROUP CREATE` 指令來顯式建立,並且需要指定從哪一個訊息 ID 開始消費,用來初始化 `last_delivered_id` 這個變數。 - **pending_ids**:每個消費者內部都有的一個狀態變數,用來表示 **已經** 被客戶端 **獲取**,但是 **還沒有 ack** 的訊息。記錄的目的是為了 **保證客戶端至少消費了訊息一次**,而不會在網路傳輸的中途丟失而沒有對訊息進行處理。如果客戶端沒有 ack,那麼這個變數裡面的訊息 ID 就會越來越多,一旦某個訊息被 ack,它就會對應開始減少。這個變數也被 Redis 官方稱為 **PEL** *(Pending Entries List)*。 ## 訊息 ID 和訊息內容 #### 訊息 ID 訊息 ID 如果是由 `XADD` 命令返回自動建立的話,那麼它的格式會像這樣:`timestampInMillis-sequence` *(毫秒時間戳-序列號)*,例如 `1527846880585-5`,它表示當前的訊息是在毫秒時間戳 `1527846880585` 時產生的,並且是該毫秒內產生的第 5 條訊息。 這些 ID 的格式看起來有一些奇怪,**為什麼要使用時間來當做 ID 的一部分呢?** 一方面,我們要 **滿足 ID 自增** 的屬性,另一方面,也是為了 **支援範圍查詢** 的功能。由於 ID 和生成訊息的時間有關,這樣就使得在根據時間範圍內查詢時基本上是沒有額外損耗的。 當然訊息 ID 也可以由客戶端自定義,但是形式必須是 **"整數-整數"**,而且後面加入的訊息的 ID 必須要大於前面的訊息 ID。 #### 訊息內容 訊息內容就是普通的鍵值對,形如 hash 結構的鍵值對。 ## 增刪改查示例 增刪改查命令很簡單,詳情如下: 1. `xadd`:追加訊息 2. `xdel`:刪除訊息,這裡的刪除僅僅是設定了標誌位,不影響訊息總長度 3. `xrange`:獲取訊息列表,會自動過濾已經刪除的訊息 4. `xlen`:訊息長度 5. `del`:刪除Stream 使用示例: ```bash # *號表示伺服器自動生成ID,後面順序跟著一堆key/value 127.0.0.1:6379> xadd codehole * name laoqian age 30 # 名字叫laoqian,年齡30歲 1527849609889-0 # 生成的訊息ID 127.0.0.1:6379> xadd codehole * name xiaoyu age 29 1527849629172-0 127.0.0.1:6379> xadd codehole * name xiaoqian age 1 1527849637634-0 127.0.0.1:6379> xlen codehole (integer) 3 127.0.0.1:6379> xrange codehole - + # -表示最小值, +表示最大值 1) 1) 1527849609889-0 2) 1) "name" 2) "laoqian" 3) "age" 4) "30" 2) 1) 1527849629172-0 2) 1) "name" 2) "xiaoyu" 3) "age" 4) "29" 3) 1) 1527849637634-0 2) 1) "name" 2) "xiaoqian" 3) "age" 4) "1" 127.0.0.1:6379> xrange codehole 1527849629172-0 + # 指定最小訊息ID的列表 1) 1) 1527849629172-0 2) 1) "name" 2) "xiaoyu" 3) "age" 4) "29" 2) 1) 1527849637634-0 2) 1) "name" 2) "xiaoqian" 3) "age" 4) "1" 127.0.0.1:6379> xrange codehole - 1527849629172-0 # 指定最大訊息ID的列表 1) 1) 1527849609889-0 2) 1) "name" 2) "laoqian" 3) "age" 4) "30" 2) 1) 1527849629172-0 2) 1) "name" 2) "xiaoyu" 3) "age" 4) "29" 127.0.0.1:6379> xdel codehole 1527849609889-0 (integer) 1 127.0.0.1:6379> xlen codehole # 長度不受影響 (integer) 3 127.0.0.1:6379> xrange codehole - + # 被刪除的訊息沒了 1) 1) 1527849629172-0 2) 1) "name" 2) "xiaoyu" 3) "age" 4) "29" 2) 1) 1527849637634-0 2) 1) "name" 2) "xiaoqian" 3) "age" 4) "1" 127.0.0.1:6379> del codehole # 刪除整個Stream (integer) 1 ``` ## 獨立消費示例 我們可以在不定義消費組的情況下進行 Stream 訊息的 **獨立消費**,當 Stream 沒有新訊息時,甚至可以阻塞等待。Redis 設計了一個單獨的消費指令 `xread`,可以將 Stream 當成普通的訊息佇列(list)來使用。使用 `xread` 時,我們可以完全忽略 **消費組(Consumer Group)** 的存在,就好比 Stream 就是一個普通的列表(list): ```bash # 從Stream頭部讀取兩條訊息 127.0.0.1:6379> xread count 2 streams codehole 0-0 1) 1) "codehole" 2) 1) 1) 1527851486781-0 2) 1) "name" 2) "laoqian" 3) "age" 4) "30" 2) 1) 1527851493405-0 2) 1) "name" 2) "yurui" 3) "age" 4) "29" # 從Stream尾部讀取一條訊息,毫無疑問,這裡不會返回任何訊息 127.0.0.1:6379> xread count 1 streams codehole $ (nil) # 從尾部阻塞等待新訊息到來,下面的指令會堵住,直到新訊息到來 127.0.0.1:6379> xread block 0 count 1 streams codehole $ # 我們從新開啟一個視窗,在這個視窗往Stream裡塞訊息 127.0.0.1:6379> xadd codehole * name youming age 60 1527852774092-0 # 再切換到前面的視窗,我們可以看到阻塞解除了,返回了新的訊息內容 # 而且還顯示了一個等待時間,這裡我們等待了93s 127.0.0.1:6379> xread block 0 count 1 streams codehole $ 1) 1) "codehole" 2) 1) 1) 1527852774092-0 2) 1) "name" 2) "youming" 3) "age" 4) "60" (93.11s) ``` 客戶端如果想要使用 `xread` 進行 **順序消費**,一定要 **記住當前消費** 到哪裡了,也就是返回的訊息 ID。下次繼續呼叫 `xread` 時,將上次返回的最後一個訊息 ID 作為引數傳遞進去,就可以繼續消費後續的訊息。 `block 0` 表示永遠阻塞,直到訊息到來,`block 1000` 表示阻塞 `1s`,如果 `1s` 內沒有任何訊息到來,就返回 `nil`: ```bash 127.0.0.1:6379> xread block 1000 count 1 streams codehole $ (nil) (1.07s) ``` ## 建立消費者示例 Stream 通過 `xgroup create` 指令建立消費組(Consumer Group),需要傳遞起始訊息 ID 引數用來初始化 `last_delivered_id` 變數: ```bash 127.0.0.1:6379> xgroup create codehole cg1 0-0 # 表示從頭開始消費 OK # $表示從尾部開始消費,只接受新訊息,當前Stream訊息會全部忽略 127.0.0.1:6379> xgroup create codehole cg2 $ OK 127.0.0.1:6379> xinfo codehole # 獲取Stream資訊 1) length 2) (integer) 3 # 共3個訊息 3) radix-tree-keys 4) (integer) 1 5) radix-tree-nodes 6) (integer) 2 7) groups 8) (integer) 2 # 兩個消費組 9) first-entry # 第一個訊息 10) 1) 1527851486781-0 2) 1) "name" 2) "laoqian" 3) "age" 4) "30" 11) last-entry # 最後一個訊息 12) 1) 1527851498956-0 2) 1) "name" 2) "xiaoqian" 3) "age" 4) "1" 127.0.0.1:6379> xinfo groups codehole # 獲取Stream的消費組資訊 1) 1) name 2) "cg1" 3) consumers 4) (integer) 0 # 該消費組還沒有消費者 5) pending 6) (integer) 0 # 該消費組沒有正在處理的訊息 2) 1) name 2) "cg2" 3) consumers # 該消費組還沒有消費者 4) (integer) 0 5) pending 6) (integer) 0 # 該消費組沒有正在處理的訊息 ``` ## 組內消費示例 Stream 提供了 `xreadgroup` 指令可以進行消費組的組內消費,需要提供 **消費組名稱、消費者名稱和起始訊息 ID**。它同 `xread` 一樣,也可以阻塞等待新訊息。讀到新訊息後,對應的訊息 ID 就會進入消費者的 **PEL** *(正在處理的訊息)* 結構裡,客戶端處理完畢後使用 `xack` 指令 **通知伺服器**,本條訊息已經處理完畢,該訊息 ID 就會從 **PEL** 中移除,下面是示例: ```bash # >號表示從當前消費組的last_delivered_id後面開始讀 # 每當消費者讀取一條訊息,last_delivered_id變數就會前進 127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 1 streams codehole > 1) 1) "codehole" 2) 1) 1) 1527851486781-0 2) 1) "name" 2) "laoqian" 3) "age" 4) "30" 127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 1 streams codehole > 1) 1) "codehole" 2) 1) 1) 1527851493405-0 2) 1) "name" 2) "yurui" 3) "age" 4) "29" 127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 2 streams codehole > 1) 1) "codehole" 2) 1) 1) 1527851498956-0 2) 1) "name" 2) "xiaoqian" 3) "age" 4) "1" 2) 1) 1527852774092-0 2) 1) "name" 2) "youming" 3) "age" 4) "60" # 再繼續讀取,就沒有新訊息了 127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 1 streams codehole > (nil) # 那就阻塞等待吧 127.0.0.1:6379> xreadgroup GROUP cg1 c1 block 0 count 1 streams codehole > # 開啟另一個視窗,往裡塞訊息 127.0.0.1:6379> xadd codehole * name lanying age 61 1527854062442-0 # 回到前一個視窗,發現阻塞解除,收到新訊息了 127.0.0.1:6379> xreadgroup GROUP cg1 c1 block 0 count 1 streams codehole > 1) 1) "codehole" 2) 1) 1) 1527854062442-0 2) 1) "name" 2) "lanying" 3) "age" 4) "61" (36.54s) 127.0.0.1:6379> xinfo groups codehole # 觀察消費組資訊 1) 1) name 2) "cg1" 3) consumers 4) (integer) 1 # 一個消費者 5) pending 6) (integer) 5 # 共5條正在處理的資訊還有沒有ack 2) 1) name 2) "cg2" 3) consumers 4) (integer) 0 # 消費組cg2沒有任何變化,因為前面我們一直在操縱cg1 5) pending 6) (integer) 0 # 如果同一個消費組有多個消費者,我們可以通過xinfo consumers指令觀察每個消費者的狀態 127.0.0.1:6379> xinfo consumers codehole cg1 # 目前還有1個消費者 1) 1) name 2) "c1" 3) pending 4) (integer) 5 # 共5條待處理訊息 5) idle 6) (integer) 418715 # 空閒了多長時間ms沒有讀取訊息了 # 接下來我們ack一條訊息 127.0.0.1:6379> xack codehole cg1 1527851486781-0 (integer) 1 127.0.0.1:6379> xinfo consumers codehole cg1 1) 1) name 2) "c1" 3) pending 4) (integer) 4 # 變成了5條 5) idle 6) (integer) 668504 # 下面ack所有訊息 127.0.0.1:6379> xack codehole cg1 1527851493405-0 1527851498956-0 1527852774092-0 1527854062442-0 (integer) 4 127.0.0.1:6379> xinfo consumers codehole cg1 1) 1) name 2) "c1" 3) pending 4) (integer) 0 # pel空了 5) idle 6) (integer) 745505 ``` ## QA 1:Stream 訊息太多怎麼辦? | Stream 的上限 很容易想到,要是訊息積累太多,Stream 的連結串列豈不是很長,內容會不會爆掉就是個問題了。`xdel` 指令又不會刪除訊息,它只是給訊息做了個標誌位。 Redis 自然考慮到了這一點,所以它提供了一個定長 Stream 功能。在 `xadd` 的指令提供一個定長長度 `maxlen`,就可以將老的訊息幹掉,確保最多不超過指定長度,使用起來也很簡單: ```bash > XADD mystream MAXLEN 2 * value 1 1526654998691-0 > XADD mystream MAXLEN 2 * value 2 1526654999635-0 > XADD mystream MAXLEN 2 * value 3 1526655000369-0 > XLEN mystream (integer) 2 > XRANGE mystream - + 1) 1) 1526654999635-0 2) 1) "value" 2) "2" 2) 1) 1526655000369-0 2) 1) "value" 2) "3" ``` 如果使用 `MAXLEN` 選項,當 Stream 的達到指定長度後,老的訊息會自動被淘汰掉,因此 Stream 的大小是恆定的。目前還沒有選項讓 Stream 只保留給定數量的條目,因為為了一致地執行,這樣的命令必須在很長一段時間內阻塞以淘汰訊息。*(例如在新增資料的高峰期間,你不得不長暫停來淘汰舊訊息和新增新的訊息)* 另外使用 `MAXLEN` 選項的花銷是很大的,Stream 為了節省記憶體空間,採用了一種特殊的結構表示,而這種結構的調整是需要額外的花銷的。所以我們可以使用一種帶有 `~` 的特殊命令: ```bash XADD mystream MAXLEN ~ 1000 * ... entry fields here ... ``` 它會基於當前的結構合理地對節點執行裁剪,來保證至少會有 `1000` 條資料,可能是 `1010` 也可能是 `1030`。 ## QA 2:PEL 是如何避免訊息丟失的? 在客戶端消費者讀取 Stream 訊息時,Redis 伺服器將訊息回覆給客戶端的過程中,客戶端突然斷開了連線,訊息就丟失了。但是 PEL 裡已經儲存了發出去的訊息 ID,待客戶端重新連上之後,可以再次收到 PEL 中的訊息 ID 列表。不過此時 `xreadgroup` 的起始訊息 ID 不能為引數 `>` ,而必須是任意有效的訊息 ID,一般將引數設為 `0-0`,表示讀取所有的 PEL 訊息以及自 `last_delivered_id` 之後的新訊息。 ## Redis Stream Vs Kafka Redis 基於記憶體儲存,這意味著它會比基於磁碟的 Kafka 快上一些,也意味著使用 Redis 我們 **不能長時間儲存大量資料**。不過如果您想以 **最小延遲** 實時處理訊息的話,您可以考慮 Redis,但是如果 **訊息很大並且應該重用資料** 的話,則應該首先考慮使用 Kafka。 另外從某些角度來說,`Redis Stream` 也更適用於小型、廉價的應用程式,因為 `Kafka` 相對來說更難配置一些。 # 相關閱讀 1. Redis(1)——5種基本資料結構 - [https://www.wmyskxz.com/2020/02/28/redis-1-5-chong-ji-ben-shu-ju-jie-gou/](https://www.wmyskxz.com/2020/02/28/redis-1-5-chong-ji-ben-shu-ju-jie-gou/) 2. Redis(2)——跳躍表 - [https://www.wmyskxz.com/2020/02/29/redis-2-tiao-yue-biao/](https://www.wmyskxz.com/2020/02/29/redis-2-tiao-yue-biao/) 3. Redis(3)——分散式鎖深入探究 - [https://www.wmyskxz.com/2020/03/01/redis-3/](https://www.wmyskxz.com/2020/03/01/redis-3/) 4. Reids(4)——神奇的HyperLoglog解決統計問題 - [https://www.wmyskxz.com/2020/03/02/reids-4-shen-qi-de-hyperloglog-jie-jue-tong-ji-wen-ti/](https://www.wmyskxz.com/2020/03/02/reids-4-shen-qi-de-hyperloglog-jie-jue-tong-ji-wen-ti/) 5. Redis(5)——億級資料過濾和布隆過濾器 - [https://www.wmyskxz.com/2020/03/11/redis-5-yi-ji-shu-ju-guo-lu-he-bu-long-guo-lu-qi/](https://www.wmyskxz.com/2020/03/11/redis-5-yi-ji-shu-ju-guo-lu-he-bu-long-guo-lu-qi/) 6. Redis(6)——GeoHash查詢附近的人[https://www.wmyskxz.com/2020/03/12/redis-6-geohash-cha-zhao-fu-jin-de-ren/](https://www.wmyskxz.com/2020/03/12/redis-6-geohash-cha-zhao-fu-jin-de-ren/) 7. Redis(7)——持久化【一文了解】 - [https://www.wmyskxz.com/2020/03/13/redis-7-chi-jiu-hua-yi-wen-liao-jie/](https://www.wmyskxz.com/2020/03/13/redis-7-chi-jiu-hua-yi-wen-liao-jie/) # 參考資料 1. 訂閱與釋出——Redis 設計與實現 - [https://redisbook.readthedocs.io/en/latest/feature/pubsub.html](https://redisbook.readthedocs.io/en/latest/feature/pubsub.html) 2. 《Redis 深度歷險》 - 錢文品/ 著 - [https://book.douban.com/subject/30386804/](https://book.douban.com/subject/30386804/) 3. Introduction to Redis Streams【官方文件】 - [https://redis.io/topics/streams-intro](https://redis.io/topics/streams-intro) 4. Kafka vs. Redis: Log Aggregation Capabilities and Performance - [https://logz.io/blog/kafka-vs-redis/](https://logz.io/blog/kafka-vs-redis/) > - 本文已收錄至我的 Github 程式設計師成長系列 **【More Than Java】,學習,不止 Code,歡迎 star:[https://github.com/wmyskxz/MoreThanJava](https://github.com/wmyskxz/MoreThanJava)** > - **個人公眾號** :wmyskxz,**個人獨立域名部落格**:wmyskxz.com,堅持原創輸出,下方掃碼關注,2020,與您共同成長! ![](https://upload-images.jianshu.io/upload_images/7896890-fca34cfd601e7449.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 非常感謝各位人才能 **看到這裡**,如果覺得本篇文章寫得不錯,覺得 **「我沒有三顆心臟」有點東西** 的話,**求點贊,求關注,求分享,求留言!** 創作不易,各位的支援和認可,就是我創作的最大動力,我們下篇文