1. 程式人生 > 程式設計 >mq如何通過非同步命令元件做到順序消費

mq如何通過非同步命令元件做到順序消費

三端可靠

  1. 傳送方和mq保證訊息送達到mq
  2. mq保證儲存的訊息不丟失
  3. 消費方和mq一起保證訊息被成功消費

傳送方和mq保證訊息送達到mq

方案一、rabbitmq如果是用spring boot提供的模版介面傳送 需要呼叫rabbitTemplate.convertSendAndReceive()方法傳送 這個是當訊息成功到隊列了才會返回結果 如果失敗則會拋異常 不過這就會導致等待時間比較長 適合高可靠場景
不過一般在業務開發都是完成業務以後再發訊息 比如插入訂單表一筆訂單 傳送訂單建立的訊息 這兩步是需要保證原子性的 要麼都成功要麼都失敗 rabbitmq支援事務訊息 不過如果出現下面情況

  • 開啟事務
  • 插入訂單表
  • 傳送mq訊息
  • 提交資料庫事務成功
  • 提交mq事務失敗
  • 訊息丟失

所以如果是用rabbitmq的事務訊息來做 其實在極端情況是會丟失訊息的 在這裡可以採用一個非同步命令元件提供的方案https://github.com/bojiw/asyncmd

  • 開啟事務
  • 插入訂單表
  • 插入非同步命令表
  • 提交資料庫事務
  • 執行緒掃描非同步命令表撈取訊息
  • 通過rabbitTemplate.convertSendAndReceive()方法傳送
  • 如果失敗 則重試 並且報警

方案二、如採用rabbitTemplate.convertAndSend和confirms(消費回撥)加Return(錯誤回撥)模式

  • convertAndSend 傳送到mq 立刻返回 不管交換機是否成功處理 所以併發會高
  • confirms(消費回撥) 實現介面ConfirmCallback 訊息成功傳送到rabbitmq交換機上則會回撥介面 入參ack為true代表成功傳送到交換機 false代表異常
  • Return(錯誤回撥) 實現介面ReturnCallback 訊息從交換機到佇列 成功不會回撥 如果傳送到佇列失敗 則會呼叫回撥

上面這種方式如果在回撥中處理訊息傳送失敗的邏輯時出現異常或者應用伺服器掛了 則會導致訊息丟失 因為只會回撥一次 這種情況可以採用加一張訊息表 先插入訊息表 然後掃表傳送訊息 confirms回撥成功 則更新表狀態 如果回撥的時候異常 則訊息表會重新傳送 這種就會出現訊息重發的情況 不過一般訊息消費者都要保證冪等 所以這個問題不大 不過如果出現以下情況

  • 資料庫有兩個欄位 confirms預設0 和 return 預設0
  • 回撥成功confirms=1 回撥失敗confirms=2 錯誤回撥return=2
  • 當回撥成功 confirms=1 錯誤回撥處理失敗沒有成功更新表 則return還是0
  • 這個時候你掃表就不確定需不需要重發訊息 因為如果訊息成功到佇列 表的狀態也是confirms=1 return=0
  • 無法對傳送佇列成功和傳送佇列失敗可在回撥異常這兩種情況做區分

這裡邏輯就會出問題 所以只能處理訊息成功到交換機 是否到佇列則不管 因為一般都是成功的 除了極端情況 比如佇列被人誤刪除
方案二和方案一其實從整個流程來講 傳送訊息速度其實差不多的 不過可靠性還是方案一高一點

mq保證儲存的訊息不丟失

訊息、交換機、佇列都需要設定持久化

消費方和mq一起保證訊息被成功消費

消費者開啟手動確認

acknowledge="manual"
複製程式碼

在業務程式碼裡 成功處理業務 才返回給rabbitmq消費成功的確認

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
複製程式碼

如果業務處理失敗則重新放到佇列重新消費

channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
複製程式碼

由消費者 只有業務成功處理才進行ack 記得需要做好冪等 不過這裡rabbitmq在重試這塊沒有做好 如果不確定會一直重試 如果因為依賴的一個系統掛了 要一個小時以後才會啟動成功 在這一個小時裡會一直重試 這就會對rabbitmq和消費者帶來一定的壓力 這塊也可以採用非同步命令元件提供的方案https://github.com/bojiw/asyncmd

  • 接收訊息
  • 把訊息插入非同步命令
  • 返回rabbitmq成功
  • 非同步元件執行業務邏輯
  • 呼叫介面失敗重試
  • 重試一定次數則不重試 由人工進行處理 也可以把重試間隔設定的長一點 比如前三次每隔1s重試 第四次隔一個小時重試