ActiveMQ整合Spring(多消費者)
阿新 • • 發佈:2019-02-09
專案目錄:
src/test/java目錄下有main方法測試傳送 接受訊息
com.ryx.amp包下有傳送和監聽的兩個類 可以使用main方法測試
配置檔案中已配置:多個消費者(多執行緒消費佇列),activemq訊息異常重發策略
整合過程
maven依賴
<!--activemq --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.11.1</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.11.1</version> </dependency>
applicationContext-amq.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.0.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-4.1.xsd"> <!-- 載入配置檔案 --> <context:property-placeholder location="classpath:properties/*.properties" /> <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <!-- ActiveMQ服務地址 --> <property name="brokerURL" value="${mq.brokerURL}" /> <property name="userName" value="${mq.userName}"></property> <property name="password" value="${mq.password}"></property> <!-- 這裡定義重試策略,注意:只有持久化的才會重試--> <property name="redeliveryPolicyMap" ref="redeliveryPolicyMap"/> </bean> <!--這裡設定各個訊息佇列的重發機制--> <bean id="redeliveryPolicyMap" class="org.apache.activemq.broker.region.policy.RedeliveryPolicyMap"> <property name="redeliveryPolicyEntries"> <list> <ref bean="bizRedeliveryPolicy"/> <!--這裡可以設定多個 --> <!-- <ref bean="bizRedeliveryPolicy2"/> --> </list> </property> </bean> <bean id="bizRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy"> <!--重發次數 延時、延時係數、延時指數開關、目標(重發等待時間1s, 2s, 4s, 8s)--> <property name="maximumRedeliveries" value="3"/> <property name="redeliveryDelay" value="1000"/> <property name="backOffMultiplier" value="2"/> <property name="useExponentialBackOff" value="true"/> <property name="destination" ref="destination"/> </bean> <!-- ActiveMQ為我們提供了一個PooledConnectionFactory,通過往裡面注入一個ActiveMQConnectionFactory 可以用來將Connection、Session和MessageProducer池化,這樣可以大大的減少我們的資源消耗。 要依賴於 activemq-pool包 --> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="connectionFactory" ref="targetConnectionFactory" /> <property name="maxConnections" value="${mq.pool.maxConnections}" /> </bean> <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="pooledConnectionFactory" /> <property name="reconnectOnException" value="true"/> </bean> <!-- Spring提供的JMS工具類,它可以進行訊息傳送、接收等 --> <!-- 佇列模板 --> <bean id="activeMqJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory物件 --> <property name="connectionFactory" ref="connectionFactory"/> <property name="defaultDestination" ref="destination"/> <!-- 使 deliveryMode, priority, timeToLive設定生效--> <property name="explicitQosEnabled" value="true" /> <!-- 持久化 如果設定為非持久化MQ伺服器重啟後MQ中的資料會丟失--> <property name="deliveryPersistent" value="true"/> <!--這裡注意:如果不開啟事務,訊息在異常的情況下是不會重試的--> <property name="sessionTransacted" value="true"/> </bean> <bean id="producerMessageSender" class="com.ryx.amq.ProducerMessageSender"/> <!--目的地,就是要監聽的佇列 --> <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="${queueName}" /> </bean> <!--這個是sessionAwareQueue目的地 --> <bean id="sessionAwareQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>${queueName}</value> </constructor-arg> </bean> <!-- 可以獲取session的MessageListener --> <bean id="consumerSessionAwareMessageListener" class="com.ryx.amq.ConsumerSessionAwareMessageListener"/> <bean id="sessionAwareListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="sessionAwareQueue" /> <property name="messageListener" ref="consumerSessionAwareMessageListener" /> <!-- 這裡注意:如果不開啟事務,訊息在異常的情況下是不會重試的 --> <property name="sessionTransacted" value="true"></property> <!-- 設定固定的執行緒數 --> <!-- <property name="concurrentConsumers" value="3"></property> --> <!-- 設定動態的執行緒數 --> <property name="concurrency" value="2-3"></property> <!-- 設定最大的執行緒數 --> <!-- <property name="maxConcurrentConsumers" value="3"></property> --> </bean> </beans>
Web.xml
<!-- 載入spring容器 -->
<context-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:spring/applicationContext-*.xml</param-value>
</context-param>
ProducerMessageSender.java
package com.ryx.amq; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; public class ProducerMessageSender { @Autowired private JmsTemplate jmsTemplate; public void SendMessage(final Integer count){ jmsTemplate.send(new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { return session.createTextMessage("這是一首簡單的小情歌"+"---->"+count); } }); } }
ConsumerSessionAwareMessageListener.java
package com.ryx.amq;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.SessionAwareMessageListener;
/**
* 佇列監聽器
* @author broadthinking
*
*/
public class ConsumerSessionAwareMessageListener implements SessionAwareMessageListener<Message> {
private static final Log log = LogFactory.getLog(ConsumerSessionAwareMessageListener.class);
@Autowired
private JmsTemplate activeMqJmsTemplate;
@Autowired
private Destination sessionAwareQueue;
public synchronized void onMessage(Message message, Session session) throws JMSException {
ActiveMQTextMessage msg = (ActiveMQTextMessage) message;
final String ms = msg.getText();
System.out.println(Thread.currentThread().getName()+"------>"+ms);
//這裡是測試異常重發
//throw new RuntimeException();
}
}