1. 程式人生 > >spring整合activemq傳遞物件注意事項

spring整合activemq傳遞物件注意事項

初衷:兩個專案分別部署在兩臺伺服器上,專案之間以activemq傳遞object訊息

拓樸結構:

伺服器A:tomcat,專案A--生產者
伺服器B:tomcat、activemq、專案B--消費者

專案A主要程式碼

src/main/resources/ActiveMQ.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/beans   
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd   
        http://www.springframework.org/schema/context   
        http://www.springframework.org/schema/context/spring-context-4.0.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">

    <!-- ActiveMQ 連線工廠 -->
     <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供-->
    <!-- 如果連線網路:tcp://ip:61616;未連線網路:tcp://localhost:61616 以及使用者名稱,密碼-->
    <amq:connectionFactory id="amqConnectionFactory"
        brokerURL="tcp://172.16.2.36:61616" userName="admin" password="admin"  />

    <!--  訊息傳送   -->
    <bean id="queueSender" class="com.abc.queue.QueueSender"></bean>
    
    <!-- Spring Caching連線工廠 -->
     <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->  
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory -->  
          <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
          <!-- 同上,同理 -->
        <!-- <constructor-arg ref="amqConnectionFactory" /> -->
        <!-- Session快取數量 -->
        <property name="sessionCacheSize" value="100" />
    </bean>
    
    <!-- Spring JmsTemplate 的訊息生產者 start-->
    
    <!-- 定義JmsTemplate的Queue型別 -->
    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory物件 -->  
        <constructor-arg ref="connectionFactory" />
        <!-- 非pub/sub模型(釋出/訂閱),即佇列模式 -->
        <property name="pubSubDomain" value="false" />
    </bean>
    
    <!-- 定義JmsTemplate的Topic型別 -->
    <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
         <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory物件 -->  
        <constructor-arg ref="connectionFactory" />
        <!-- pub/sub模型(釋出/訂閱) -->
        <property name="pubSubDomain" value="true" />
    </bean>
    
    <!--Spring JmsTemplate 的訊息生產者 end-->
   
</beans>  
com.abc.queue.QueueSender
package com.abc.queue;

import java.io.Serializable;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

/**
 * 
 * ClassName: QueueSender <br/>  
 * Function:  佇列訊息生產者,傳送訊息到佇列<br/>  
 * Created on: 2017年12月5日 下午2:59:44 <br/>  
 *  
 * @author: Wender(FengHaiBing)  
 * @version:   
 * @since: JDK 1.8
 */
@Component("queueSender")
public class QueueSender {
	
	Logger LOG = LoggerFactory.getLogger(QueueSender.class);
	
	/**
	 * 因為還有一個jmsTopicTemplate,
	 * 所以這裡要用到@Qualifier,明確此例項變數對應的是哪一個JmsTemplate
	 */
	@Autowired
	@Qualifier("jmsQueueTemplate")
	private JmsTemplate jmsTemplate;//通過@Qualifier修飾符來注入對應的bean
	
	/**
	 * 傳送一條訊息到指定的佇列(目標)
	 * @param queueName 佇列名稱
	 * @param message 訊息內容
	 */
	public void send(String queueName,final String message){
		try {
			jmsTemplate.send(queueName, new MessageCreator() {
			@Override
				public Message createMessage(Session session) throws JMSException {
					return session.createObjectMessage((Serializable)message);
				}
			});
			
		} catch (Exception e) {
			LOG.error("QueueSender.send() exception occured, sending msg " + message + " to " + queueName,e);
		}
	}
	
	
}

專案B主要程式碼

src/main/resources/ActiveMQ.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
	xmlns:jms="http://www.springframework.org/schema/jms"
	xsi:schemaLocation="http://www.springframework.org/schema/beans   
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd   
        http://www.springframework.org/schema/context   
        http://www.springframework.org/schema/context/spring-context-4.0.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">

	<!-- ActiveMQ 連線工廠 -->
 	<!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供-->
	<!-- 如果連線網路:tcp://ip:61616;未連線網路:tcp://localhost:61616 以及使用者名稱,密碼-->
	<amq:connectionFactory id="amqConnectionFactory"
		brokerURL="tcp://172.16.2.36:61616" userName="admin" password="admin"  />

	<!--  訊息接收   -->
    <bean id="queueVfReceiver" class="com.abc.queue.QueueVfReceiver"></bean>
	
	<!-- Spring Caching連線工廠 -->
 	<!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->  
	<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
		<!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory -->  
  		<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
  		<!-- 同上,同理 -->
		<!-- <constructor-arg ref="amqConnectionFactory" /> -->
		<!-- Session快取數量 -->
		<property name="sessionCacheSize" value="100" />
	</bean>
	
	<!-- Spring JmsTemplate 的訊息生產者 start-->
	
	<!-- 定義JmsTemplate的Queue型別 -->
	<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
		<!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory物件 -->  
		<constructor-arg ref="connectionFactory" />
		<!-- 非pub/sub模型(釋出/訂閱),即佇列模式 -->
		<property name="pubSubDomain" value="false" />
	</bean>
	
	<!-- 定義JmsTemplate的Topic型別 -->
	<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
		 <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory物件 -->  
		<constructor-arg ref="connectionFactory" />
		<!-- pub/sub模型(釋出/訂閱) -->
		<property name="pubSubDomain" value="true" />
	</bean>
	
	<!--Spring JmsTemplate 的訊息生產者 end-->

	
	<!-- 訊息消費者 start-->
	
	<!-- 定義Queue監聽器 -->
	<jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
		<jms:listener destination="queue.lv.img" ref="queueLvReceiver"/>
	</jms:listener-container>
	
	<!-- 訊息消費者 end -->
</beans>  

com.abc.queue.QueueVfReceiver

接收到物件後,需要用ActiveMQObjectMessage,做一次型別轉換,將message的object轉換為需要用的型別

package com.abc.queue;


import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;

import org.apache.activemq.command.ActiveMQObjectMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.alibaba.druid.util.StringUtils;
import com.abc.facade.entity.VfInfo;

/**
 * 
 * ClassName: QueueReceiver <br/>  
 * Function:  ADD FUNCTION. <br/>  
 * Created on: 2017年12月5日 下午1:56:46 <br/>  
 *  
 * @author: Wender(FengHaiBing)  
 * @version:   
 * @since: JDK 1.8
 */
@Component
public class QueueVfReceiver implements MessageListener {
    
//    @Override
    public void onMessage(Message message) {
        try {
            if (null == message) {
                return ;
            }
            ActiveMQObjectMessage aMsg = (ActiveMQObjectMessage)message;
            VfInfo vfInfo = (VfInfo)aMsg.getObject();
            if(StringUtils.isEmpty(vfInfo.getId())) {
                //不處理無效資料
                return ;
            }            
            ....
 } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

activemq配置

將需要傳遞的物件所在的包,寫入到變數ACTIVEMQ_OPTS

if [ -z "$ACTIVEMQ_OPTS" ] ; then
#    ACTIVEMQ_OPTS="$ACTIVEMQ_OPTS_MEMORY -Djava.util.logging.config.file=logging.properties -Djava.security.auth.login.config=$ACTIVEMQ_CONF/login.config"
    ACTIVEMQ_OPTS="$ACTIVEMQ_OPTS_MEMORY -Djava.util.logging.config.file=logging.properties -Djava.security.auth.login.config=$ACTIVEMQ_CONF/login.config -Dorg.apache.activemq.SERIALIZABLE_PACKAGES=java.lang,javax.security,java.util,org.apache.activemq,com.abc.facade.entity"
fi