1. 程式人生 > 實用技巧 >中介軟體 - 訊息佇列 - RabbitMQ - 訊息的可靠性投遞

中介軟體 - 訊息佇列 - RabbitMQ - 訊息的可靠性投遞

訊息的投遞有4個環節, 如下圖
在這裡插入圖片描述

.

環節1: 生產者Producer把訊息傳送給伺服器Broker

Producer怎麼知道Broker有沒有接收到訊息

服務端確認-Transaction模式在這裡插入圖片描述

在這裡插入圖片描述
只要channel.txCommit()方法返回, 服務端就一定接收到了訊息
缺點: 同步模式, 等Broker返回成功之後 , Producer才會繼續傳送下一條訊息, 大大降低了效率

服務端確認-Confirm模式

在這裡插入圖片描述
在這裡插入圖片描述
訊息傳送一條, 確認一條, 一樣很慢
下面的程式碼是批量的傳送一批訊息, 確認一批訊息
在這裡插入圖片描述
只要waitForConfirmOrDie方法沒有拋異常, 那麼伺服器就批量接收到了一批訊息, 這個時候, 效率是有提升的

但是批次的數量很有講究, 如果每批數量少了,那提升的效率很有限. 如果一批的數量過多, 那麼只要有一條訊息出錯, 就會導致這批裡的所有訊息都要重發. 所以也是不夠完美

服務端確認 - 非同步確認模式

這才是完美的解決方案
在這裡插入圖片描述
先定義一個執行緒安全的confirmSet, 每次Producer發訊息的時候, 會順帶往這個集合裡扔訊息id進去
在這裡插入圖片描述
在handleNack方法中, 是Broker未確認的訊息, 可以存起來, 在後面重新發送
在這裡插入圖片描述
在handleAck方法中, 是Broker確認的訊息, 確認的訊息, 就從confirmSet中刪除

在這裡插入圖片描述
控制檯輸出:
在這裡插入圖片描述

.

環節2: 訊息到達Exchange之後, Exchange會查詢路由規則列表, 將訊息投遞到一個或多個Queue上

問題: 某條訊息發到Exchange之後, 根據路由規則找不到Queue

解決方案: 路由保證

  1. mandatory = true + ReturnListener
  2. 指定交換機的備份交換機
    在這裡插入圖片描述
    s1引數是用來做路由的, 如果根據Exchange的路由規則, 沒有對應的Queue, 那麼會去看第三個引數mandatory. 如果mandatory是false, 那麼訊息會被直接丟棄, 如果是true, 那麼就會把這條訊息轉給另一個指定的Exchange, 下圖是教你如何指定這個ReturnExchange的
    .
    在這裡插入圖片描述
    在控制檯介面裡配置Arguments, 新增一行alternate-exchange
    在這裡插入圖片描述

.

環節3: 訊息在Queue上儲存, 還沒有Consumer來消費, 會一直存在Queue中

問題: 訊息預設存在Broker記憶體中, Broker重啟會導致訊息丟失

解決方案是要確保如下幾條同時滿足

  1. Exchange要持久化
  2. Queue要持久化
  3. 訊息本身也要持久化
  4. Broker叢集 ( 如果只有單臺Broker, 那麼即使是開啟了持久化, 一旦這個Broker硬碟掛了, 訊息還是會全部丟失 )

環節4:Queue裡的訊息一條一條投遞到Consumer

Queue如何知道Consumer成功消費了訊息

消費者確認機制

  1. Consumer自動ack: 只要接收到訊息, 就傳送ack給Broker, 所以會導致漏處理 ( 不推薦 )
  2. Consumer手動ack: 可以在接收到訊息, 並且處理完業務邏輯之後, 才發ack給Broker

下面是Consumer端的推薦寫法: 開啟手工應答, 並根據處理各種不同情況分別做 reject / nack / ack 的響應給Broker
在這裡插入圖片描述
在這裡插入圖片描述

整合Spring的時候, AcknowledgeMode的列舉型別起名容易引起歧義
在這裡插入圖片描述

上面所有的方案分別解決了Producer可靠投遞Exchange, Exchange可靠投遞到Queue, Queue能知道Consumer的最終消費情況是ack還是nack或reject, 但是在某些場景下, Producer需要知道Consumer的消費情況, 來決定是否需要重發訊息