1. 程式人生 > >ActiveMQ整合Spring(多消費者)

ActiveMQ整合Spring(多消費者)


專案目錄:
src/test/java目錄下有main方法測試傳送 接受訊息
com.ryx.amp包下有傳送和監聽的兩個類  可以使用main方法測試
配置檔案中已配置:多個消費者(多執行緒消費佇列),activemq訊息異常重發策略

整合過程

maven依賴

<!--activemq  -->
<dependency>
	<groupId>org.apache.activemq</groupId>
	<artifactId>activemq-all</artifactId>
	<version>5.11.1</version>
</dependency>
<dependency>
	<groupId>org.apache.activemq</groupId>
	<artifactId>activemq-pool</artifactId>
	<version>5.11.1</version>
</dependency>

applicationContext-amq.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
	xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:task="http://www.springframework.org/schema/task"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
	http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
	http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
	http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.0.xsd
	http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-4.1.xsd">
	
	<!-- 載入配置檔案 -->
	<context:property-placeholder location="classpath:properties/*.properties" />
	
	<!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 -->
	<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
		<!-- ActiveMQ服務地址 -->
        <property name="brokerURL" value="${mq.brokerURL}" />
        <property name="userName" value="${mq.userName}"></property>
        <property name="password" value="${mq.password}"></property> 
        <!-- 這裡定義重試策略,注意:只有持久化的才會重試-->  
        <property name="redeliveryPolicyMap" ref="redeliveryPolicyMap"/>
	</bean>
	
	<!--這裡設定各個訊息佇列的重發機制-->  
    <bean id="redeliveryPolicyMap" class="org.apache.activemq.broker.region.policy.RedeliveryPolicyMap">  
        <property name="redeliveryPolicyEntries">  
            <list>  
                <ref bean="bizRedeliveryPolicy"/>  
                <!--這裡可以設定多個  -->
                <!-- <ref bean="bizRedeliveryPolicy2"/>   -->
            </list>  
        </property>  
    </bean>
    
    <bean id="bizRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">  
        <!--重發次數 延時、延時係數、延時指數開關、目標(重發等待時間1s, 2s, 4s, 8s)-->  
        <property name="maximumRedeliveries" value="3"/>  
        <property name="redeliveryDelay" value="1000"/>  
        <property name="backOffMultiplier" value="2"/>  
        <property name="useExponentialBackOff" value="true"/>  
        <property name="destination" ref="destination"/>  
    </bean>  
	
	<!-- 
    	ActiveMQ為我們提供了一個PooledConnectionFactory,通過往裡面注入一個ActiveMQConnectionFactory
    	可以用來將Connection、Session和MessageProducer池化,這樣可以大大的減少我們的資源消耗。
    	要依賴於 activemq-pool包
     -->
	<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
		<property name="connectionFactory" ref="targetConnectionFactory" />
		<property name="maxConnections" value="${mq.pool.maxConnections}" />
	</bean>
	
	<!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->
	<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
		<!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory -->
		<property name="targetConnectionFactory" ref="pooledConnectionFactory" />
		<property name="reconnectOnException" value="true"/>
	</bean>
	
	<!-- Spring提供的JMS工具類,它可以進行訊息傳送、接收等 -->
	
	<!-- 佇列模板 -->
	<bean id="activeMqJmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
	    <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory物件 -->  
	    <property name="connectionFactory" ref="connectionFactory"/>  
	    <property name="defaultDestination" ref="destination"/>
	    <!-- 使 deliveryMode, priority, timeToLive設定生效-->
        <property name="explicitQosEnabled" value="true" />
        <!-- 持久化 如果設定為非持久化MQ伺服器重啟後MQ中的資料會丟失-->
        <property name="deliveryPersistent" value="true"/>
        <!--這裡注意:如果不開啟事務,訊息在異常的情況下是不會重試的-->
        <property name="sessionTransacted" value="true"/>
	</bean> 
	
	<bean id="producerMessageSender" class="com.ryx.amq.ProducerMessageSender"/>
	
	<!--目的地,就是要監聽的佇列  -->
	<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg index="0" value="${queueName}" />
	</bean>
	
	<!--這個是sessionAwareQueue目的地 -->
	<bean id="sessionAwareQueue" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg>
			<value>${queueName}</value>
		</constructor-arg>
	</bean>
	
	<!-- 可以獲取session的MessageListener -->
	<bean id="consumerSessionAwareMessageListener" class="com.ryx.amq.ConsumerSessionAwareMessageListener"/>


	<bean id="sessionAwareListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="connectionFactory" />
		<property name="destination" ref="sessionAwareQueue" />
		<property name="messageListener" ref="consumerSessionAwareMessageListener" />
		<!-- 這裡注意:如果不開啟事務,訊息在異常的情況下是不會重試的 -->
		<property name="sessionTransacted" value="true"></property>
		<!-- 設定固定的執行緒數 -->
		<!-- <property name="concurrentConsumers" value="3"></property> -->
		<!-- 設定動態的執行緒數 -->
		<property name="concurrency" value="2-3"></property>
		<!-- 設定最大的執行緒數 -->
		<!-- <property name="maxConcurrentConsumers" value="3"></property> -->
	</bean>
	
</beans>

Web.xml

<!-- 載入spring容器 -->
<context-param>
	<param-name>contextConfigLocation</param-name>
	<param-value>classpath:spring/applicationContext-*.xml</param-value>
</context-param>

ProducerMessageSender.java

package com.ryx.amq;


import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;


public class ProducerMessageSender {


	@Autowired
	private JmsTemplate jmsTemplate;
	
	public void SendMessage(final Integer count){
		jmsTemplate.send(new MessageCreator() {
			
			@Override
			public Message createMessage(Session session) throws JMSException {
				return session.createTextMessage("這是一首簡單的小情歌"+"---->"+count);
			}
		});
	}
}

ConsumerSessionAwareMessageListener.java

package com.ryx.amq;


import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;


import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.SessionAwareMessageListener;


/**
 * 佇列監聽器
 * @author broadthinking
 *
 */
public class ConsumerSessionAwareMessageListener implements SessionAwareMessageListener<Message> {


	private static final Log log = LogFactory.getLog(ConsumerSessionAwareMessageListener.class);


	@Autowired
	private JmsTemplate activeMqJmsTemplate;
	@Autowired
	private Destination sessionAwareQueue;


	public synchronized void onMessage(Message message, Session session) throws JMSException {
			ActiveMQTextMessage msg = (ActiveMQTextMessage) message;
			final String ms = msg.getText();
			System.out.println(Thread.currentThread().getName()+"------>"+ms);
			//這裡是測試異常重發
			//throw new RuntimeException();
	}
}