RabbitMQ(五)訊息傳送失敗後的處理
前一篇文章,寫了訊息傳送確認的一些內容.
就是訊息傳送成功或失敗的時候,都會呼叫confirmListener 或者returnListener.
如果訊息傳送成功,就不考慮了.當訊息傳送失敗時,怎麼處理這個訊息呢.
1.自動重發
2.系統預警人工處理等
以上操作,都需要知道是哪條訊息,具體什麼內容傳送失敗了,才能進行後續處理.
在returnListener中,引數是有訊息內容,exchange,routingKey 這些內容的.
但是在confirmListener中,卻是什麼都沒有,只有個correlationData,而我們打印出的correlationData都是null
看來問題的關鍵就在這個correlationData上了, CorrelationData類只有一個屬性ID, 很明顯,我們在傳送訊息時,將訊息和correlationData的ID做一個繫結,就可以根據id拿到訊息. 然後進行重發,報警等操作了.
到傳送訊息的類裡面, 發現AmqpTemplate 裡面竟然沒有和CorrelationData相關的方法,沒辦法把CorrelationData.id和訊息進行繫結..
原來需要使用RabbitTemplate 而不是 AmqpTemplate 類
找到方法後,下面就非常簡單了.
@Service("rabbitTemplatePublishService") public class RabbitTemplatePublishService { @Autowired private RabbitTemplate rmqpTemplate; public void send(String exchange, String routingKey, Object obj) { String msgId = UUID.randomUUID().toString(); Message message = MessageBuilder.withBody(obj.toString().getBytes()) .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN) .setCorrelationId(msgId.getBytes()).build(); CorrelationData date = new CorrelationData(msgId); // TODO 將 msgId 與 message 的關係儲存起來,例如放到快取中 rmqpTemplate.send(exchange, routingKey, message, date); } }
上面的程式碼,message中,也可以setCorrelationId為自己指定的id,這樣方便confirm和return一起處理.
上面將msgId和message關係儲存後, 在confirmListener或者returnListener中,就可以根據msgId獲取message進行對於操作.
上面的思路看起來解決了失敗後訊息處理的問題.但是實際上卻是不可靠的
3.如果進行confirm的時候,或者return的時候, 失敗了怎麼辦?
4.msgId和message的關係要儲存多久?
但是上面的思路已經抓到了問題的重點,即msgId和message做繫結.
針對3,4兩個問題,只需要變通下即可解決
方案如下:
傳送訊息前,繫結並儲存msgId和message的關係
當confirm或return回撥時,根據ack類別等,分別處理. 例如return或者ack=false則說明有問題,報警, 如果ack=true則刪除關係
(因為return在confirm前,所以一條訊息在return後又ack=true的情況也是按return處理)
定時檢查這個繫結關係列表,如果發現一些已經超時(自己設定的超時時間)未被處理(即未return和confirm),則手動處理這些訊息.
至此,傳送訊息基本已經沒問題了.
需要注意如果是自動重發的話,消費端需要做冪等或去重處理.
相關推薦
RabbitMQ(五)訊息傳送失敗後的處理
前一篇文章,寫了訊息傳送確認的一些內容. 就是訊息傳送成功或失敗的時候,都會呼叫confirmListener 或者returnListener. 如果訊息傳送成功,就不考慮了.當訊息傳送失敗時,怎麼處理這個訊息呢. 1.自動重發 2.系統預警人工處理等 以上操作,都需要
Spring ActiveMQ 整合(二): 重發機制(訊息傳送失敗後的重新發送)
之前已經寫了一個spring activemq整合的demo ,今天繼續完善一下這個demo,讓功能更強大。 假如現在我手裡有一個很重要的訊息的,想要發給一個人,
微信模版訊息傳送失敗
原因分析: 1.同一個微信使用者,登陸iOS系統,獲取form元件submit後的formId是這樣的格式:472be418fa10c861fea8faadba4caf8b; 在android下,formId格式是:1527130096088 這樣的
Java實現rabbitmq簡單訊息傳送與消費
1.搭建並配置好rabbitmq服務 2.生產訊息併發送 public class SendMessage { private final static String QUEUE_NA
RabbitMQ實戰-在服務中配置RabbitMQ實現訊息傳送
在這裡以訂單服務為訊息生產者,程式碼中都有詳細的註釋和說明,以下是示例: 新增依賴 首先,建立一個web工程(在這裡我使用springboot2.0.2)。加入rabbitMQ所需要的依賴: <!-- rabbitmq依賴 -->
使用SpringCloud Stream結合rabbitMQ實現訊息消費失敗重發機制
> 前言:實際專案中經常遇到訊息消費失敗了,要進行訊息的重發。比如支付訊息消費失敗後,要分不同時間段進行N次的訊息重發提醒。 # 本文模擬場景 1. 當金額少於100時,訊息消費成功 1. 當金額大於100,小於200時,會進行3次重發,第一次1秒;第二次2秒;第三次3秒。 1. 當金額大於200時
訊息中介軟體系列五:RabbitMQ的使用場景(非同步處理、應用解耦)
一、非同步處理 場景: 使用者註冊,寫入資料庫成功以後,傳送郵件和簡訊。 準備工作: 1)安裝RabbitMQ,參考前面的文章 2)新建一個名為RabbitMQAsyncProc的maven web工程,在pom.xml檔案裡面引入如下依賴 <project xmlns="http://maven.a
Spring Cloud Stream消費失敗後的處理策略(三):使用DLQ佇列(RabbitMQ)
應用場景 前兩天我們已經介紹了兩種Spring Cloud Stream對訊息失敗的處理策略: 自動重試:對於一些因環境原因(如:網路抖動等不穩定因素)引發的問題可以起到比較好的作用,提高訊息處理的成功率。 自定義錯誤處理邏輯:如果業務上,訊息處理失敗之後有明確的降級邏輯可以彌補的,可以採用這種
Spring Cloud Stream消費失敗後的處理策略(四):重新入隊(RabbitMQ)
應用場景 之前我們已經通過《Spring Cloud Stream消費失敗後的處理策略(一):自動重試》一文介紹了Spring Cloud Stream預設的訊息重試功能。本文將介紹RabbitMQ的binder提供的另外一種重試功能:重新入隊。 動手試試 準備一個會消費失敗的例子,可以直接沿用前文的工
Spring Cloud Stream消費失敗後的處理策略(三):使用DLQ隊列(RabbitMQ)
messages tap ica hello 方式 應用 manage oot 輸入 應用場景 前兩天我們已經介紹了兩種Spring Cloud Stream對消息失敗的處理策略: 自動重試:對於一些因環境原因(如:網絡抖動等不穩定因素)引發的問題可以起到比較好的作用,提
spring-cloud-starter-hystrix(斷路器)服務不通或者調用失敗後的錯誤處理和回調
系統 comm cli 處理 參考 quest 微服務架構 ron 100% 雪崩效應 在微服務架構中通常會有多個服務層調用,大量的微服務通過網絡進行通信,從而支撐起整個系統。各個微服務之間也難免存在大量的依賴關系。然而任何服務都不是100%可用的,網絡往往也是脆弱的,所
rabbitmq訊息傳送確認和消費訊息手動刪除訊息
0.application.properties新增如下配置 # 訊息傳送至exchange callback spring.rabbitmq.publisher-confirms=true # 訊息傳送至queue 失敗才callback spring.rabbitmq.publi
websocket(二)訊息傳送後回撥
基於 websocket使用 需求 例:傳送登入請求後及時獲取返回值,然後根據返回值處理接下來的事件,類似http請求,不過需求是要用websocket做到這一點。 思考 我們在websocket 介紹上能夠充分認知到websocket的所有回撥函式都是非同步執行的,也就
Android--判斷髮送簡訊後的狀態/傳送成功Or傳送失敗
分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow 也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!  
SpringBoot整合RabbitMQ,實現訊息傳送和消費
下載安裝Erlang和RabbitMQ Erlang和RabbitMQ:https://www.cnblogs.com/theRhyme/p/10069611.html 專案建立和依賴 推薦SpringCloud專案線上建立:https://start.spring.io/ 不用上面這
RabbitMQ(四)訊息確認(傳送確認,接收確認)
consumer--:MessageProperties [headers={spring_return_correlation=445bc7ca-a5bd-47e2-8ba3-f0448420e441}, timestamp=null, messageId=null, userId=null, appId=
用Spring的郵件封裝類JavaMailSenderImpl傳送郵件:注意埠號不需要設定(設定了反而linux上釋出後傳送失敗)
===》###注意埠號不需要設定:》加不加埠:window都行,linux加了埠反而發不出去! 是的。配置了埠,Windows沒問題。linux就有問題了。仔細檢視錯誤資訊: 發現是 JavaMail API 程式碼問題。排除了是我的程式碼邏輯問題。 而且是埠超時問題。 看到這
Kafka、RabbitMQ、RocketMQ等訊息中介軟體的對比 —— 訊息傳送效能和區別
分散式系統中,我們廣泛運用訊息中介軟體進行系統間的資料交換,便於非同步解耦。現在開源的訊息中介軟體有很多,前段時間我們自家的產品 RocketMQ (MetaQ的核心) 也順利開源,得到大家的關注。 那麼,訊息中介軟體效能究竟哪家強? 帶著這個疑問,我們中介軟體測
Spring Cloud Stream消費失敗後的處理策略(一):自動重試
之前寫了幾篇關於Spring Cloud Stream使用中的常見問題,比如: 如何處理訊息重複消費 如何消費自己生產的訊息 下面幾天就集中來詳細聊聊,當訊息消費失敗之後該如何處理的幾種方式。不過不論哪種方式,都需要與具體業務結合,解決不同業務場景可能出現的問題。 今天第一節,介紹一下Spring Clo
Spring Cloud Stream消費失敗後的處理策略(二):自定義錯誤處理邏輯
應用場景 上一篇《Spring Cloud Stream消費失敗後的處理策略(一):自動重試》介紹了預設就會生效的訊息重試功能。對於一些因環境原因、網路抖動等不穩定因素引發的問題可以起到比較好的作用。但是對於諸如程式碼本身存在的邏輯錯誤等,無論重試多少次都不可能成功的問題,是無法修復的。對於這樣的情況,前文