ActiveMQ與Spring整合:(3)訊息監聽器
阿新 • • 發佈:2019-02-13
JMS監聽器有三種訊息監聽器實現:MessageListener,SessionAwareMessageListener,MessageListenerAdapter。訊息接受者只需要實現這些介面就可以非同步接收訊息。
1、實現MessageListener介面,實現MessageListener介面,必須重寫onMessage方法。
訊息監聽器需要註冊到訊息監聽容器中,示例配置:package com.hua.spring.jms.listener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @description * @date:(2016-8-25 下午10:01:07) * @author Administrator * @version v1.0 * @since v1.0 * * Modified history * * Modified date: * Modifier user: * description: * * */ public class ResponseQueueMessageListener implements MessageListener{ private Logger logger=LoggerFactory.getLogger(ResponseQueueMessageListener.class); @Override public void onMessage(Message message) { if(message instanceof TextMessage){ try { String text=((TextMessage) message).getText(); logger.info("訊息生產者接收到響應:"+text); } catch (JMSException e) { logger.error("訊息生產者接收訊息時發生異常:",e); } } } }
2、實現SessionAwareMessageListener介面,實現這個介面需要實現void onMessage(Message message, Session session)方法。這個介面與MessageListener介面不同之處在於,引數裡面有Session引數,訊息接受者接收訊息之後可通過session進行回覆訊息傳送者。<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd"> <!-- 連線池 --> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory"> <!-- 連線工廠 --> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616" /> </bean> </property> <property name="maxConnections" value="10"/> </bean> <!-- 訊息模板 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="pooledConnectionFactory" /> <property name="defaultDestination" ref="queueDestination" /> <property name="messageConverter"> <bean class="org.springframework.jms.support.converter.SimpleMessageConverter" /> </property> <!-- 訊息轉換器 --> <!-- <property name="messageConverter" ref="emailMessageConverter"/> --> </bean> <!-- 配置訊息目標 --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 目標,在ActiveMQ管理員控制檯建立 http://localhost:8161/admin/queues.jsp --> <constructor-arg index="0" value="helloQueue" /> </bean> <!-- 用於測試訊息回覆的 --> <bean id="responseQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>responseQueue</value> </constructor-arg> </bean> <!-- 訊息監聽器 --> <bean id="responseMessageListener" class="com.hua.spring.jms.listener.ResponseQueueMessageListener"/> <!-- 訊息監聽容器 --> <bean id="responseQueueListenerAdapter" class=" org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="pooledConnectionFactory" /> <property name="destination" ref="responseQueue" /> <property name="messageListener" ref="responseMessageListener" /> <property name="sessionTransacted" value="false"/> </bean> </beans>
同時把訊息監聽器註冊到監聽器容器中:package com.hua.spring.jms.listener; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.listener.SessionAwareMessageListener; /** * @description * @date:(2016-8-28 上午9:49:32) * @author Administrator * @version v1.0 * @since v1.0 * * Modified history * * Modified date: * Modifier user: * description: * * */ public class QueueDestinationMessageListener implements SessionAwareMessageListener<Message>{ private Logger logger=LoggerFactory.getLogger(QueueDestinationMessageListener.class); private Destination destination; @Override public void onMessage(Message message, Session session) throws JMSException { if(message instanceof TextMessage){ try { String text=((TextMessage)message).getText(); logger.info("接受者:我收到訊息-->"+text); MessageProducer producer = session.createProducer(destination); Message textMessage = session.createTextMessage("Hello sender! I have received your news."); producer.send(textMessage); } catch (JMSException e) { logger.error("監聽訊息發生異常",e); } } } public Destination getDestination() { return destination; } public void setDestination(Destination destination) { this.destination = destination; } }
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.2.xsd">
<!-- 連線池 -->
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory">
<!-- 連線工廠 -->
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
</property>
<property name="maxConnections" value="10"/>
</bean>
<!-- 訊息模板 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="pooledConnectionFactory" />
<property name="defaultDestination" ref="queueDestination" />
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter" />
</property>
<!-- 訊息轉換器 -->
<!-- <property name="messageConverter" ref="emailMessageConverter"/> -->
</bean>
<!-- 配置訊息目標 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 目標,在ActiveMQ管理員控制檯建立 http://localhost:8161/admin/queues.jsp -->
<constructor-arg index="0" value="helloQueue" />
</bean>
<!-- 用於測試訊息回覆的 -->
<bean id="responseQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>responseQueue</value>
</constructor-arg>
</bean>
<!-- 可以獲取session的MessageListener -->
<bean id="queueDestinationMessageListener" class="com.hua.spring.jms.listener.QueueDestinationMessageListener">
<property name="destination" ref="responseQueue"/>
</bean>
<!-- 訊息監聽容器 -->
<bean id="responseQueueListenerAdapter" class=" org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="pooledConnectionFactory" />
<property name="destination" ref="queueDestination" />
<property name="messageListener" ref="queueDestinationMessageListener" />
<property name="sessionTransacted" value="false"/>
</bean>
</beans>
3、MessageListenerAdapter介面,MessageListenerAdapter介面由MessageListener, SessionAwareMessageListener擴充套件而來。訊息監聽類只需要實現過載handleMessage方法,最終監聽器會動態匹配引數。
package com.hua.spring.jms.listener;
import java.util.Map;
/**
* @description
* @date:(2016-8-29 下午9:19:25)
* @author Administrator
* @version v1.0
* @since v1.0
*
* Modified history
*
* Modified date:
* Modifier user:
* description:
*
* */
public class JMSReceiver {
/**
* 接收一條轉換的TextMessage訊息
* */
public void handleMessage(String message){
System.out.println(message);
}
/**
*接收一條轉換的ByteMessage訊息
* */
public void handleMessage(byte[] message){
}
/**
* 接收一條轉換的MapMessage訊息
* */
public void handleMessage(Map message){
}
/**
* 接收一條轉換的ObjectMessage訊息
* */
public void handleMessage(Object message){
}
}
applicationContext.xml配置<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.2.xsd">
<!-- 連線池 -->
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory">
<!-- 連線工廠 -->
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
</property>
<property name="maxConnections" value="10"/>
</bean>
<!-- 訊息模板 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="pooledConnectionFactory" />
<property name="defaultDestination" ref="queueDestination" />
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter" />
</property>
<!-- 訊息轉換器 -->
<!-- <property name="messageConverter" ref="emailMessageConverter"/> -->
</bean>
<!-- 配置訊息目標 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 目標,在ActiveMQ管理員控制檯建立 http://localhost:8161/admin/queues.jsp -->
<constructor-arg index="0" value="helloQueue" />
</bean>
<!-- 用於測試訊息回覆的 -->
<bean id="responseQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>responseQueue</value>
</constructor-arg>
</bean>
<!-- 通過實現MessageListenerAdapter介面實現訊息監聽 -->
<bean id="jMSReceiver" class="com.hua.spring.jms.listener.JMSReceiver"></bean>
<bean id="queueDestinationMessageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg ref="jMSReceiver"></constructor-arg>
</bean>
<!-- 訊息監聽容器 -->
<bean id="jMSReceiverQueueListenerAdapter" class=" org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="pooledConnectionFactory" />
<property name="destination" ref="queueDestination" />
<property name="messageListener" ref="queueDestinationMessageListenerAdapter" />
<property name="concurrentConsumers" value="100"/>
<property name="sessionTransacted" value="false"/>
</bean>
</beans>