1. 程式人生 > >RabbitMQ(五)訊息傳送失敗後的處理

RabbitMQ(五)訊息傳送失敗後的處理

前一篇文章,寫了訊息傳送確認的一些內容.

就是訊息傳送成功或失敗的時候,都會呼叫confirmListener 或者returnListener.

如果訊息傳送成功,就不考慮了.當訊息傳送失敗時,怎麼處理這個訊息呢.

1.自動重發

2.系統預警人工處理等

以上操作,都需要知道是哪條訊息,具體什麼內容傳送失敗了,才能進行後續處理.

在returnListener中,引數是有訊息內容,exchange,routingKey 這些內容的.

但是在confirmListener中,卻是什麼都沒有,只有個correlationData,而我們打印出的correlationData都是null

看來問題的關鍵就在這個correlationData上了, CorrelationData類只有一個屬性ID, 很明顯,我們在傳送訊息時,將訊息和correlationData的ID做一個繫結,就可以根據id拿到訊息. 然後進行重發,報警等操作了.

到傳送訊息的類裡面, 發現AmqpTemplate 裡面竟然沒有和CorrelationData相關的方法,沒辦法把CorrelationData.id和訊息進行繫結..

原來需要使用RabbitTemplate 而不是 AmqpTemplate 類偷笑


找到方法後,下面就非常簡單了.

@Service("rabbitTemplatePublishService")
public class RabbitTemplatePublishService {
	@Autowired
	private RabbitTemplate rmqpTemplate;

	public void send(String exchange, String routingKey, Object obj) {
		String msgId = UUID.randomUUID().toString();
		Message message = MessageBuilder.withBody(obj.toString().getBytes())
				.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
				.setCorrelationId(msgId.getBytes()).build();
		CorrelationData date = new CorrelationData(msgId);
		// TODO 將 msgId 與 message 的關係儲存起來,例如放到快取中
		rmqpTemplate.send(exchange, routingKey, message, date);
	}
}


上面的程式碼,message中,也可以setCorrelationId為自己指定的id,這樣方便confirm和return一起處理.


上面將msgId和message關係儲存後, 在confirmListener或者returnListener中,就可以根據msgId獲取message進行對於操作.

上面的思路看起來解決了失敗後訊息處理的問題.但是實際上卻是不可靠的尷尬

3.如果進行confirm的時候,或者return的時候, 失敗了怎麼辦?  

4.msgId和message的關係要儲存多久?

但是上面的思路已經抓到了問題的重點,即msgId和message做繫結.

針對3,4兩個問題,只需要變通下即可解決

方案如下:

傳送訊息前,繫結並儲存msgId和message的關係

當confirm或return回撥時,根據ack類別等,分別處理. 例如return或者ack=false則說明有問題,報警, 如果ack=true則刪除關係

(因為return在confirm前,所以一條訊息在return後又ack=true的情況也是按return處理)

定時檢查這個繫結關係列表,如果發現一些已經超時(自己設定的超時時間)未被處理(即未return和confirm),則手動處理這些訊息.

至此,傳送訊息基本已經沒問題了.

需要注意如果是自動重發的話,消費端需要做冪等或去重處理.

相關推薦

RabbitMQ()訊息傳送失敗處理

前一篇文章,寫了訊息傳送確認的一些內容. 就是訊息傳送成功或失敗的時候,都會呼叫confirmListener 或者returnListener. 如果訊息傳送成功,就不考慮了.當訊息傳送失敗時,怎麼處理這個訊息呢. 1.自動重發 2.系統預警人工處理等 以上操作,都需要

Spring ActiveMQ 整合(二): 重發機制(訊息傳送失敗的重新發送)

       之前已經寫了一個spring   activemq整合的demo   ,今天繼續完善一下這個demo,讓功能更強大。        假如現在我手裡有一個很重要的訊息的,想要發給一個人,

微信模版訊息傳送失敗

原因分析: 1.同一個微信使用者,登陸iOS系統,獲取form元件submit後的formId是這樣的格式:472be418fa10c861fea8faadba4caf8b; 在android下,formId格式是:1527130096088 這樣的

Java實現rabbitmq簡單訊息傳送與消費

1.搭建並配置好rabbitmq服務 2.生產訊息併發送 public class SendMessage { private final static String QUEUE_NA

RabbitMQ實戰-在服務中配置RabbitMQ實現訊息傳送

在這裡以訂單服務為訊息生產者,程式碼中都有詳細的註釋和說明,以下是示例: 新增依賴 首先,建立一個web工程(在這裡我使用springboot2.0.2)。加入rabbitMQ所需要的依賴:         <!-- rabbitmq依賴 -->       

使用SpringCloud Stream結合rabbitMQ實現訊息消費失敗重發機制

> 前言:實際專案中經常遇到訊息消費失敗了,要進行訊息的重發。比如支付訊息消費失敗後,要分不同時間段進行N次的訊息重發提醒。 # 本文模擬場景 1. 當金額少於100時,訊息消費成功 1. 當金額大於100,小於200時,會進行3次重發,第一次1秒;第二次2秒;第三次3秒。 1. 當金額大於200時

訊息中介軟體系列RabbitMQ的使用場景(非同步處理、應用解耦)

一、非同步處理 場景: 使用者註冊,寫入資料庫成功以後,傳送郵件和簡訊。 準備工作: 1)安裝RabbitMQ,參考前面的文章 2)新建一個名為RabbitMQAsyncProc的maven web工程,在pom.xml檔案裡面引入如下依賴 <project xmlns="http://maven.a

Spring Cloud Stream消費失敗處理策略(三):使用DLQ佇列(RabbitMQ

應用場景 前兩天我們已經介紹了兩種Spring Cloud Stream對訊息失敗的處理策略: 自動重試:對於一些因環境原因(如:網路抖動等不穩定因素)引發的問題可以起到比較好的作用,提高訊息處理的成功率。 自定義錯誤處理邏輯:如果業務上,訊息處理失敗之後有明確的降級邏輯可以彌補的,可以採用這種

Spring Cloud Stream消費失敗處理策略(四):重新入隊(RabbitMQ

應用場景 之前我們已經通過《Spring Cloud Stream消費失敗後的處理策略(一):自動重試》一文介紹了Spring Cloud Stream預設的訊息重試功能。本文將介紹RabbitMQ的binder提供的另外一種重試功能:重新入隊。 動手試試 準備一個會消費失敗的例子,可以直接沿用前文的工

Spring Cloud Stream消費失敗處理策略(三):使用DLQ隊列(RabbitMQ

messages tap ica hello 方式 應用 manage oot 輸入 應用場景 前兩天我們已經介紹了兩種Spring Cloud Stream對消息失敗的處理策略: 自動重試:對於一些因環境原因(如:網絡抖動等不穩定因素)引發的問題可以起到比較好的作用,提

spring-cloud-starter-hystrix(斷路器)服務不通或者調用失敗的錯誤處理和回調

系統 comm cli 處理 參考 quest 微服務架構 ron 100% 雪崩效應 在微服務架構中通常會有多個服務層調用,大量的微服務通過網絡進行通信,從而支撐起整個系統。各個微服務之間也難免存在大量的依賴關系。然而任何服務都不是100%可用的,網絡往往也是脆弱的,所

rabbitmq訊息傳送確認和消費訊息手動刪除訊息

0.application.properties新增如下配置 # 訊息傳送至exchange callback spring.rabbitmq.publisher-confirms=true # 訊息傳送至queue 失敗才callback spring.rabbitmq.publi

websocket(二)訊息傳送回撥

基於 websocket使用 需求 例:傳送登入請求後及時獲取返回值,然後根據返回值處理接下來的事件,類似http請求,不過需求是要用websocket做到這一點。 思考 我們在websocket 介紹上能夠充分認知到websocket的所有回撥函式都是非同步執行的,也就

Android--判斷髮送簡訊的狀態/傳送成功Or傳送失敗

分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow 也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!        

SpringBoot整合RabbitMQ,實現訊息傳送和消費

下載安裝Erlang和RabbitMQ Erlang和RabbitMQ:https://www.cnblogs.com/theRhyme/p/10069611.html   專案建立和依賴 推薦SpringCloud專案線上建立:https://start.spring.io/ 不用上面這

RabbitMQ(四)訊息確認(傳送確認,接收確認)

consumer--:MessageProperties [headers={spring_return_correlation=445bc7ca-a5bd-47e2-8ba3-f0448420e441}, timestamp=null, messageId=null, userId=null, appId=

用Spring的郵件封裝類JavaMailSenderImpl傳送郵件:注意埠號不需要設定(設定了反而linux上釋出傳送失敗)

===》###注意埠號不需要設定:》加不加埠:window都行,linux加了埠反而發不出去! 是的。配置了埠,Windows沒問題。linux就有問題了。仔細檢視錯誤資訊: 發現是 JavaMail API 程式碼問題。排除了是我的程式碼邏輯問題。 而且是埠超時問題。 看到這

Kafka、RabbitMQ、RocketMQ等訊息中介軟體的對比 —— 訊息傳送效能和區別

分散式系統中,我們廣泛運用訊息中介軟體進行系統間的資料交換,便於非同步解耦。現在開源的訊息中介軟體有很多,前段時間我們自家的產品 RocketMQ (MetaQ的核心) 也順利開源,得到大家的關注。 那麼,訊息中介軟體效能究竟哪家強? 帶著這個疑問,我們中介軟體測

Spring Cloud Stream消費失敗處理策略(一):自動重試

之前寫了幾篇關於Spring Cloud Stream使用中的常見問題,比如: 如何處理訊息重複消費 如何消費自己生產的訊息 下面幾天就集中來詳細聊聊,當訊息消費失敗之後該如何處理的幾種方式。不過不論哪種方式,都需要與具體業務結合,解決不同業務場景可能出現的問題。 今天第一節,介紹一下Spring Clo

Spring Cloud Stream消費失敗處理策略(二):自定義錯誤處理邏輯

應用場景 上一篇《Spring Cloud Stream消費失敗後的處理策略(一):自動重試》介紹了預設就會生效的訊息重試功能。對於一些因環境原因、網路抖動等不穩定因素引發的問題可以起到比較好的作用。但是對於諸如程式碼本身存在的邏輯錯誤等,無論重試多少次都不可能成功的問題,是無法修復的。對於這樣的情況,前文