以SpringMVC框架為中心瘋狂擴充套件-06、MessageListener實時監聽ActiveMQ中的訊息
阿新 • • 發佈:2018-12-26
1、在spring-activemq.xml中新加入listenerContainer和syxTopicDest等配置,實現訊息監聽容器,在connectionFactory中加入clientId。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.1.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd"> <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://127.0.0.1:61616" /> <!-- 配置connectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- 目標ConnectionFactory 引用至 amqConnectionFactory--> <property name="targetConnectionFactory" ref="amqConnectionFactory"></property> <!-- Session快取數量 --> <property name="sessionCacheSize" value="100" /> <property name="clientId" value="syxConsumer1"/> </bean> <!-- Spring JmsTemplate 的訊息生產者 --> <!-- 定義JmsTemplate的Queue型別 --> <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 引用自定義的 ConnectionFactory物件 --> <constructor-arg ref="connectionFactory" /> <!-- 非pub/sub模型(釋出/訂閱),即佇列模式 --> <property name="pubSubDomain" value="false" /> <!-- 預設目標名稱(佇列名) --> <property name="defaultDestinationName" value="syxQueue" /> </bean> <!-- 定義JmsTemplate的Topic型別 --> <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 引用自定義的 ConnectionFactory物件 --> <constructor-arg ref="connectionFactory" /> <!-- pub/sub模型(釋出/訂閱) --> <property name="pubSubDomain" value="true" /> <!-- 預設目標名稱(訂閱名) --> <property name="defaultDestinationName" value="syxTopic" /> </bean> <bean id="messageReceiver" class="com.syx.customer.receiver.MessageReceiver"> <property name="jmsTopicTemplate" ref="jmsTopicTemplate"></property> </bean> <!-- 訊息監聽容器 --> <bean id="syxTopicDest" class="org.apache.activemq.command.ActiveMQTopic"> <!-- 設定訊息佇列的名字 --> <constructor-arg index="0" value="syxTopic" /> </bean> <bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="pubSubDomain" value="true"/><!-- default is false --> <property name="destination" ref="syxTopicDest" /> <!-- listen topic: syxTopic --> <property name="subscriptionDurable" value="true"/> <property name="messageListener" ref="messageReceiverListener" /><!-- 訊息監聽類 --> <!-- 監聽並行區間範圍 最小10個 最大15個 --> <property name="concurrency" value="10-15"></property> </bean> </beans>
2、新建一個MessageReceiverListener類,來實現訊息監聽
採用@Component註解方式實現spring的配置
3、重啟專案就可以實現訊息監聽了,發多少個都能實時接收到。package com.syx.customer.receiver; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageListener; import org.springframework.stereotype.Component; /** * 接收訊息監聽類 * * @author sunyx * @since JDK 1.8 */ @Component("messageReceiverListener") public class MessageReceiverListener implements MessageListener { public void onMessage(Message m) { System.out.println("[receive message]"); try { final MapMessage mapmessage=(MapMessage) m; final String info = mapmessage.getString("info"); System.out.println("info:"+info); System.out.println("model:"+m.getJMSDeliveryMode()); System.out.println("destination:"+m.getJMSDestination()); System.out.println("type:"+m.getJMSType()); System.out.println("messageId:"+m.getJMSMessageID()); System.out.println("time:"+m.getJMSTimestamp()); System.out.println("expiredTime:"+m.getJMSExpiration()); System.out.println("priority:"+m.getJMSPriority()); } catch (final JMSException e) { e.printStackTrace(); } } }