Spring整合訊息佇列RabbitMQ(訊息失敗處理)
1. RabbitMQ簡介
1.1. RabbitMQ
RabbitMQ是由Erlang(愛立信公司)語言開發,實現Advanced Message Queuing Protocol (AMQP高階訊息佇列協議)的訊息中介軟體。訊息中介軟體主要用於元件之間的解耦,訊息的傳送者無需知道訊息使用者的存在,反之亦然。
1.2. 結構圖
• Broker:訊息佇列伺服器實體,例如RabbitMQ服務
• Vhost:虛擬主機,預設為“/”,一個broker裡可以有多個vhost,區分不同使用者許可權,類似java的命令空間
• Connection:應用程式與broker連線,可有多個連線
• Channel:訊息通道,connection中可建立多個channel,每個channel代表一個會話任務,所有操作都在channel中進行。
• Exchange:訊息交換機,channel中可有多個,用於投遞訊息。應用程式傳送訊息時先把訊息給交換機,由交換機投遞給佇列,不是直接給佇列。
型別有三種:fanout(廣播)、Direct(處理路由鍵,輪播實現)、Topic(支援訊息模糊匹配)
• Queue:佇列,用於存放訊息
• Message:訊息,應用程式需要傳送的資料
• Bind:根據routingKey繫結exchange與queue規則,決定訊息傳送的方向
1.3. 物件間關係
2. rabbitMQ與spring整合
2.1. 傳送訊息Producer
傳送介面
public interface SimpleMQProducer {
/**
* 傳送訊息至MQ
*/
public void sendDataToMQ(Object message);
/**
* 傳送訊息至MQ
*/
public void sendDataToMQ(Object message, String msgid);
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
傳送介面實現
public class SmartMQProducer implements InitializingBean,SimpleMQProducer{
protected final Loggerx logger = Loggerx.getLogger("dao");
protected RabbitTemplate rabbitTemplate = new RabbitTemplate();
protected String queue;
protected String exchange;
protected String routingKey;
protected ConnectionFactory connectionFactory;
protected MessageConverter messageConverter;
protected RetryTemplate retryTemplate;
protected ConfirmCallback confirmCallback;
protected ReturnCallback failedCallback;
public RabbitTemplate getRabbitTemplate() {
return rabbitTemplate;
}
public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void setQueue(String queue) {
this.queue = queue;
}
public void setExchange(String exchange) {
this.exchange = exchange;
}
public void setRoutingKey(String routingKey) {
this.routingKey = routingKey;
}
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
public void setMessageConverter(MessageConverter messageConverter) {
this.messageConverter = messageConverter;
}
public void setRetryTemplate(RetryTemplate retryTemplate) {
this.retryTemplate = retryTemplate;
}
public void setConfirmCallback(ConfirmCallback confirmCallback) {
this.confirmCallback = confirmCallback;
}
public void setFailedCallback(ReturnCallback failedCallback) {
this.failedCallback = failedCallback;
}
@Override
public void sendDataToMQ(Object message) {
CorrelationData correlationId = null;
try {
correlationId = new CorrelationData(GUID.genTxNo(25));
} catch (Exception e) {
logger.error(LogType.EX, "產生訊息id失敗",e);
correlationId = new CorrelationData(UUID.randomUUID().toString());
}
this.rabbitTemplate.convertAndSend(this.routingKey, message, correlationId);
logger.info(LogType.EX, "傳送到MQ的訊息內容["+JsonUtil.toJSONString(message)+"],訊息ID["+correlationId.getId()+"]");
}
@Override
public void sendDataToMQ(Object message, String msgid) {
CorrelationData correlationId = new CorrelationData(msgid);
this.rabbitTemplate.convertAndSend(this.routingKey, message, correlationId);
logger.info(LogType.EX, "傳送到MQ的訊息內容["+JsonUtil.toJSONString(message)+"],訊息ID["+correlationId.getId()+"]");
}
@Override
public void afterPropertiesSet() throws Exception {
this.rabbitTemplate.setQueue(this.queue);
this.rabbitTemplate.setExchange(this.exchange);
this.rabbitTemplate.setRoutingKey(this.routingKey);
this.rabbitTemplate.setConnectionFactory(this.connectionFactory);
this.rabbitTemplate.setMessageConverter(this.messageConverter);
this.rabbitTemplate.setMandatory(true);
if (null == this.failedCallback) {
// 確認訊息是否到達broker伺服器,也就是隻確認是否正確到達exchange中即可,只要正確的到達exchange中,broker即可確認該訊息返回給客戶端ack。
this.rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
//記錄本地日誌
Object object = StringUtil.ByteToObject(message.getBody());
logger.error(LogType.EX, "訊息傳送到MQ失敗,內容["+object+"]");
}
});
}else {
this.rabbitTemplate.setReturnCallback(this.failedCallback);
}
//設定回撥
this.rabbitTemplate.setConfirmCallback(this.confirmCallback);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
傳送到MQ失敗回撥處理
public abstract class SmartMQFailedCallBack implements ReturnCallback {
protected final Loggerx logger = Loggerx.getLogger("bo");
/**
* 確認訊息是否到達broker伺服器,也就是隻確認是否正確到達queue中即可,只要正確的到達queue中,broker即可確認該訊息返回給客戶端ack。
*/
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
if (null != message) {
Object object = StringUtil.ByteToObject(message.getBody());
logger.error(LogType.EX, "訊息傳送到MQ失敗,內容["+object+"]");
executeFailedMessage(object);
}else {
logger.error(LogType.EX, "訊息傳送到MQ失敗,訊息內容為null");
}
}
public abstract void executeFailedMessage(Object message);
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
傳送到MQ後回撥處理(不分成功或失敗)
public abstract class SmartMQConfirmCallBack implements ConfirmCallback{
protected final Logger logger = Logger.getLogger("bo");
/**
* 確認訊息是否到達broker伺服器,也就是隻確認是否正確到達exchange中即可,只要正確的到達exchange中,broker即可確認該訊息返回給客戶端ack。
*
*/
public void confirm(CorrelationData correlationData, boolean ack) {
if (ack) {
logger.info(LogType.INFO, "訊息成功消費,訊息ID["+correlationData.getId()+"]");
} else {
logger.error(LogType.EX, "訊息失敗消費,訊息ID["+correlationData.getId()+"]");
}
executeCallBack(correlationData.getId(),ack);
}
public abstract void executeCallBack(String msgID,boolean ack);
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
傳送端spring
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
<!-- 連線服務配置 -->
<bean id="mqConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="xxx.xxx.xx.xx"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
<property name="virtualHost" value="/"/>
<property name="channelCacheSize" value="50"/>
<property name="publisherConfirms" value="true"/>
<property name="publisherReturns" value="true"/>
</bean>
<rabbit:admin connection-factory="mqConnectionFactory" />
<!-- 宣告訊息轉換器為SimpleMessageConverter -->
<bean id="msgConverter" class="org.springframework.amqp.support.converter.SimpleMessageConverter" />
<!-- 訊息傳送重試 ,可選項-->
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="500" />
<property name="multiplier" value="10.0" />
<property name="maxInterval" value="10000" />
</bean>
</property>
</bean>
<!-- queue 佇列宣告 -->
<!-- durable=true,交換機持久化,rabbitmq服務重啟交換機依然存在,保證不丟失; durable=false,相反 -->
<!-- auto-delete=true:無消費者時,佇列自動刪除; auto-delete=false:無消費者時,佇列不會自動刪除 -->
<!-- 排他性,exclusive=true:首次申明的connection連線下可見; exclusive=false:所有connection連線下都可見-->
<rabbit:queue id="test" durable="true" auto-delete="false" exclusive="false" name="test" />
<!-- exchange queue binging key 繫結 -->
<!-- durable=true,交換機持久化,rabbitmq服務重啟交換機依然存在,保證不丟失; durable=false,相反 -->
<!-- auto-delete=true:無消費者時,佇列自動刪除; auto-delete=false:無消費者時,佇列不會自動刪除 -->
<rabbit:direct-exchange name="test" durable="true" auto-delete="false" id="test">
<rabbit:bindings>
<rabbit:binding queue="test" key="test_key" />
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 訊息傳送到mq回撥處理,需要處理錯誤訊息,可選項 -->
<bean id="testfailedCallback" class="xxx.TestMsgFailedCallBack"></bean>
<!-- 訊息傳送到mq回撥處理,接著業務處理 ,可選項-->
<bean id="testconfirmCallback" class="xxx.TestconfirmCallback"></bean>
<bean id="testProducer" class="XXXX.SmartMQProducer">
<property name="connectionFactory" ref="mqConnectionFactory" />
<property name="messageConverter" ref="msgConverter" />
<property name="retryTemplate" ref="retryTemplate" />
<property name="confirmCallback" ref="testconfirmCallback" />
<property name="failedCallback" ref="testfailedCallback" />
<property name="exchange" value="test" />
<property name="queue" value="test" />
<property name="routingKey" value="test" />
</bean>
</beans>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
2.2. 消費端Consumer
接收端spring配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.1.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.1.xsd">
<!-- 連線服務配置 -->
<bean id="mqConnectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="xxx.xxx.xxx.xxx" />
<property name="username" value="guest" />
<property name="password" value="guest" />
<property name="virtualHost" value="/" />
<property name="channelCacheSize" value="50" />
</bean>
<!-- 建立rabbitAdmin 代理類 -->
<rabbit:admin connection-factory="mqConnectionFactory" />
<!-- 宣告訊息轉換器為SimpleMessageConverter -->
<bean id="msgConverter"
class="org.springframework.amqp.support.converter.SimpleMessageConverter" />
<!-- queue 佇列宣告 -->
<rabbit:queue id="test" name="test" durable="true" auto-delete="false" exclusive="false" />
<!-- exchange queue binging key 繫結 -->
<!-- durable=true,交換機持久化,rabbitmq服務重啟交換機依然存在,保證不丟失; durable=false,相反 -->
<!-- auto-delete=true:無消費者時,佇列自動刪除; auto-delete=false:無消費者時,佇列不會自動刪除 -->
<!-- 通過Binding來判定Queue、Exchange、routingKey -->
<rabbit:direct-exchange id="test" name="test"
durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="test" key="test_key" />
</rabbit:bindings>
</rabbit:direct-exchange>
<bean id="testMsgHandler" class="xxxx.testMsgHandler" />
<bean id="testMsgAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">
<constructor-arg ref="testHandler" />
<property name="defaultListenerMethod" value="handleTxMsg"></property>
<property name="messageConverter" ref="msgConverter"></property>
</bean>
<
相關推薦
Spring整合訊息佇列RabbitMQ(訊息失敗處理)
1. RabbitMQ簡介
1.1. RabbitMQ
RabbitMQ是由Erlang(愛立信公司)語言開發,實現Advanced Message Queuing Protocol (AMQP高階訊息佇列協議)的訊息中介軟體。訊息中介軟體主要用於元件之間的解耦,訊息的傳送者無需知道訊息使用者的存在,
(二)RabbitMQ訊息佇列-RabbitMQ訊息佇列架構與基本概念
沒錯我還是沒有講怎麼安裝和寫一個HelloWord,不過快了,這一章我們先了解下RabbitMQ的基本概念。
RabbitMQ架構
說是架構其實更像是應用場景下的架構(自己畫的有點醜,勿嫌棄)
從圖中可以看出RabbitMQ主要由Exchange和Qu
訊息佇列RabbitMQ與Spring整合
1.RabbitMQ簡介
RabbitMQ是流行的開源訊息佇列系統,用erlang語言開發。RabbitMQ是AMQP(高階訊息佇列協議)的標準實現。
官網:http://www.rabbitmq.com/
2.Spring整合RabbitM
訊息佇列 RabbitMQ 與 Spring 整合使用
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSc
MQ訊息佇列--RabbitMQ整合Spring理論及例項講解
今天Boss叫我去他的小黑屋分配任務,出門就記得倆詞“MQ”、“訊息佇列”。從來都沒聽說過這讓我怎麼搞?對於這種情況我慣有的方法論就是:先搞清楚它是什麼、有什麼用、有什麼工具可用、怎麼用,然後就是……擼起袖子使勁幹吧!
1、什麼是訊息佇列
訊息是指在兩個
SpringBoot(八) Spring和訊息佇列RabbitMQ
概述
1.大多數應用中,可以通過訊息服務中介軟體來提升系統非同步能力和拓展解耦能力。
2.訊息服務中的兩個重要概念:訊息代理(Message broker)和目的地(destination)
當訊息傳送者傳送訊息後,將由訊息代理接管,訊息代理保證訊息傳遞到指定目的地。
3.訊息佇列主要有兩種形式的目的
.NET 開源工作流: Slickflow流程引擎高階開發(七)--訊息佇列(RabbitMQ)的整合使用
前言:工作流流程過程中,除了正常的人工審批型別的節點外,事件型別的節點處理也尤為重要。比如比較常見的事件型別的節點有:Timer/Message/Signal等。本文重點闡述訊息型別的節點處理,以及實現訊息驅動流程過程中對訊息佇列(RabbitMQ)的整合使用方式。
1. 節點間訊息傳遞
1.1 Messag
訊息佇列 RabbitMQ
什麼叫訊息佇列
訊息(Message)是指在應用間傳送的資料。訊息可以非常簡單,比如只包含文字字串,也可以更復雜,可能包含嵌入物件。
訊息佇列(Message Queue)是一種應用間的通訊方式,訊息傳送後可以立即返回,由訊息系統來確保訊息的可靠傳遞。訊息
訊息佇列RabbitMQ應答模式
為了確保訊息不會丟失,RabbitMQ支援訊息應答。消費者傳送一個訊息應答,告訴RabbitMQ這個訊息已經接收並且處理完畢了。RabbitMQ就可以刪除它了。如果一個消費者掛掉卻沒有傳送應答,RabbitMQ會理解為這個訊息沒有處理完全,然後交給另一個消費者去重新處理。這樣,你就可以確認即使消費者偶爾掛掉也
訊息佇列rabbitmq在mac上的安裝
一、安裝rabbitMq.
在mac平臺上安裝rabbitMq,開啟終端,在終端上輸入以下命令:
brew install rabbitmq
安裝rabbitMq需要一些時間
Python 訊息佇列rabbitmq使用之工作佇列使用多個worker接收訊息
前面已經介紹過怎麼安裝rabbitmq以及要使用的三方庫
因此這裡直接進入例項
1、釋出端程式碼
# new_task.py
import pika # 匯入pika
import sys
Python 訊息佇列rabbitmq使用之 更加細緻的 有選擇的 釋出訊息/接收訊息
1、釋出端程式碼
# new_topic_p.py
import pika
import sys
connection = pika.BlockingConnection(pika.Connec
Python 訊息佇列rabbitmq使用之 實現一個RPC系統
1、服務端程式碼
# rpc_server.py
import pika
# 建立連線
connection = pika.BlockingConnection(pika.ConnectionP
使用訊息佇列RabbitMQ
RabbitMQ 即一個訊息佇列,主要是用來實現應用程式的非同步和解耦,同時也能起到訊息緩衝,訊息分發的作用。
RabbitMQ是實現AMQP(高階訊息佇列協議)的訊息中介軟體的一種,AMQP,即Advanced Message Queuing Protocol, 高階訊息
訊息佇列RabbitMQ入門與5種模式詳解
1.RabbitMQ概述
簡介:
MQ全稱為Message Queue,訊息佇列是應用程式和應用程式之間的通訊方法;
RabbitMQ是開源的,實現了AMQP協議的,採用Erlang(面向併發程式語言)編寫的,可複用的企業級訊息系統;
AMQP(高階訊息佇列協議)
訊息佇列-RabbitMq(PHP)
首先進行安裝:
將composer.json檔案放在你的專案中
composer.json
{"require":{"php-amqplib/php-amqplib":"2.5.*"}
在C#中使用訊息佇列RabbitMQ
http://www.cnblogs.com/qy1141/p/4054135.html
作用就是提高系統的併發性,將一些不需要及時響應客戶端且佔用較多資源的操作,放入佇列,再由另外一個執行緒,去非同步處理這些佇列,可極大的提高系統的併發能力。
2、安裝
訊息佇列RabbitMq的五種形式佇列
MQ全稱為Message
Queue,訊息佇列是系統之間的通訊方法;
RabbitMQ是開源的,實現了AMQP協議的,採用Erlang(面向併發程式語言)編寫的,可複用的企業級訊息系統;
AMQP(高階訊息佇列協議)是一個非同步訊息傳遞所使用應用層協議規範,為面向訊息中介
初步對訊息佇列RabbitMQ的瞭解
RabbitMQ是流行的開源訊息佇列系統,用erlang語言開發,完整的實現了AMPQ(高階訊息佇列協議)。網站: http://www.rabbitmq.com/
erlang網站:http://www.erlang.org/ 中文站:http://www.erlang-cn.com/
首先,先安裝下R
OpenStack中訊息佇列(RabbitMQ)分析
可以說OpenStack使用這種MOM模式的訊息佇列機制無疑是一個聰明的選擇。其鬆耦合性以及動態可擴充套件性都非常符合開源雲的要求。無論是開發還是執行,都會帶了很多好處。唯一的缺點就是它是一個single point failure,如果RabbitMQ出錯了,那整個OpenStack也就無法運行了。雖然R