中介軟體 - 訊息佇列 - RabbitMQ - 訊息的可靠性投遞
阿新 • • 發佈:2020-10-11
訊息的投遞有4個環節, 如下圖
.
環節1: 生產者Producer把訊息傳送給伺服器Broker
Producer怎麼知道Broker有沒有接收到訊息
服務端確認-Transaction模式
只要channel.txCommit()方法返回, 服務端就一定接收到了訊息
缺點: 同步模式, 等Broker返回成功之後 , Producer才會繼續傳送下一條訊息, 大大降低了效率
服務端確認-Confirm模式
訊息傳送一條, 確認一條, 一樣很慢
下面的程式碼是批量的傳送一批訊息, 確認一批訊息
只要waitForConfirmOrDie方法沒有拋異常, 那麼伺服器就批量接收到了一批訊息, 這個時候, 效率是有提升的
服務端確認 - 非同步確認模式
這才是完美的解決方案
先定義一個執行緒安全的confirmSet, 每次Producer發訊息的時候, 會順帶往這個集合裡扔訊息id進去
在handleNack方法中, 是Broker未確認的訊息, 可以存起來, 在後面重新發送
在handleAck方法中, 是Broker確認的訊息, 確認的訊息, 就從confirmSet中刪除
控制檯輸出:
.
環節2: 訊息到達Exchange之後, Exchange會查詢路由規則列表, 將訊息投遞到一個或多個Queue上
問題: 某條訊息發到Exchange之後, 根據路由規則找不到Queue
解決方案: 路由保證
- mandatory = true + ReturnListener
- 指定交換機的備份交換機
s1引數是用來做路由的, 如果根據Exchange的路由規則, 沒有對應的Queue, 那麼會去看第三個引數mandatory. 如果mandatory是false, 那麼訊息會被直接丟棄, 如果是true, 那麼就會把這條訊息轉給另一個指定的Exchange, 下圖是教你如何指定這個ReturnExchange的
.
在控制檯介面裡配置Arguments, 新增一行alternate-exchange
.
環節3: 訊息在Queue上儲存, 還沒有Consumer來消費, 會一直存在Queue中
問題: 訊息預設存在Broker記憶體中, Broker重啟會導致訊息丟失
解決方案是要確保如下幾條同時滿足
- Exchange要持久化
- Queue要持久化
- 訊息本身也要持久化
- Broker叢集 ( 如果只有單臺Broker, 那麼即使是開啟了持久化, 一旦這個Broker硬碟掛了, 訊息還是會全部丟失 )
環節4:Queue裡的訊息一條一條投遞到Consumer
Queue如何知道Consumer成功消費了訊息
消費者確認機制
- Consumer自動ack: 只要接收到訊息, 就傳送ack給Broker, 所以會導致漏處理 ( 不推薦 )
- Consumer手動ack: 可以在接收到訊息, 並且處理完業務邏輯之後, 才發ack給Broker
下面是Consumer端的推薦寫法: 開啟手工應答, 並根據處理各種不同情況分別做 reject / nack / ack 的響應給Broker
整合Spring的時候, AcknowledgeMode的列舉型別起名容易引起歧義