1. 程式人生 > >ActiveMq-Spring整合

ActiveMq-Spring整合

 

1,引入jar

<dependency>
   <groupId>org.apache.activemq</groupId>
   <artifactId>activemq-all</artifactId>
   <version>5.11.0</version>
</dependency>
<dependency>  
        <groupId>org.springframework</groupId>  
        <artifactId>spring-jms</artifactId>  
        <version>${springframework}</version>  
 </dependency>

2,程式碼解析

配置檔案

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	   xmlns="http://www.springframework.org/schema/beans" xmlns:jms="http://www.springframework.org/schema/jms"
	   xsi:schemaLocation="http://www.springframework.org/schema/beans
                        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd">

	<!-- 連線工廠 -->
	<bean id="activeMqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
		<property name="brokerURL" value="failover:(tcp://192.168.81.86:51511,tcp://192.168.81.86:51512,tcp://192.168.81.86:51513)"/>
		<property name="userName" value="admin"/>
		<property name="password" value="admin"/>
		<property name="useAsyncSend" value="true"/>
	</bean>
	<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
		<property name="targetConnectionFactory" ref="activeMqConnectionFactory"/>
		<property name="sessionCacheSize" value="100"/>
	</bean>



	<!-- 點對點佇列  生產 網站提單bank-->
	<bean id="bankReqQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg index="0" value="vz.queue.service.trading"/>
	</bean>
	<!-- 生產者 -->
	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="connectionFactory"/>
        <!-- 預設傳送的訊息佇列-->
		<property name="defaultDestination" ref="bankReqQueueDestination"/>
		<property name="receiveTimeout" value="10000" />
		<property name="pubSubDomain" value="false"/>
	</bean>




	<!-- 點對點佇列 消費 銀行前置preBank-->
	<bean id="preBankRespQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg index="0" value="vz.queue.service.netweb.order.status"/>
	</bean>

	<!-- 點對點佇列 消費 稅局tax-->
	<bean id="taxRespQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg index="0" value="vz.queue.service.trading"/>
	</bean>

	<!-- 消費者  設定監聽  銀行前置preBank-->
	<bean id="preBankQueueMessageListener" class="com.vzoom.bank.mq.finance.consumer.listener.PreBankQueueMessageListener"/>
	<bean id="jmsContainer"  class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="connectionFactory"/>
		<property name="destination" ref="preBankRespQueueDestination"/>
		<property name="messageListener" ref="preBankQueueMessageListener"/>
		<property name="sessionTransacted" value="true"/>
		<!--<property name="concurrency" value="4-10"/>-->
		<!-- 設定固定的執行緒數 -->
		<property name="concurrentConsumers" value="6"></property>
		<!-- 設定動態的執行緒數 -->
		<property name="concurrency" value="2-9"></property>
		<!-- 設定最大的執行緒數 -->
		<property name="maxConcurrentConsumers" value="15"></property>
	</bean>

	<!-- 消費者  設定監聽  稅局tax-->
	<bean id="taxQueueMessageListener" class="com.vzoom.bank.mq.finance.consumer.listener.TaxQueueMessageListener"/>
	<bean id="jmsContainer1"  class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="connectionFactory"/>
		<property name="destination" ref="taxRespQueueDestination"/>
		<property name="messageListener" ref="taxQueueMessageListener"/>
		<property name="sessionTransacted" value="true"/>
		<!--<property name="concurrency" value="4-10"/>-->
		<!-- 設定固定的執行緒數 -->
		<property name="concurrentConsumers" value="6"></property>
		<!-- 設定動態的執行緒數 -->
		<property name="concurrency" value="2-9"></property>
		<!-- 設定最大的執行緒數 -->
		<property name="maxConcurrentConsumers" value="15"></property>

	</bean>

</beans>

生產者:定製佇列,預設佇列傳送

package com.vzoom.bank.mq.finance.producer.service;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

@Service
public class ProducerService {

    @Resource(name="jmsTemplate")
    private JmsTemplate jmsTemplate;

    /**
     * 向指定佇列傳送訊息,定製佇列
     */
    public void sendMessage(Destination destination, final String msg) {
        System.out.println("向佇列" + destination.toString() + "傳送了訊息------------" + msg);
        jmsTemplate.send(destination, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(msg);
            }
        });
    }

    /**
     * 向預設佇列傳送訊息,預設佇列
     */
    public void sendMessage(final String msg) {
        String destination =  jmsTemplate.getDefaultDestination().toString();
        System.out.println("向佇列" +destination+ "傳送了訊息------------" + msg);
        jmsTemplate.send(new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(msg);
            }
        });

    }

}

消費者:主動消費(需要人為主動觸發消費),監聽消費(長連線,訊息佇列有訊息立馬自動消費)兩種方式,監聽消費需要在配置檔案中配置相應監聽器

主動消費

package com.vzoom.bank.mq.finance.consumer.service;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.TextMessage;

@Service
public class ConsumerService {

    @Resource(name="jmsTemplate")
    private JmsTemplate jmsTemplate;

    /**
     * 接收訊息:區別於用監聽的方式接收訊息
     */
    public TextMessage receive(Destination destination) {
        TextMessage tm = (TextMessage) jmsTemplate.receive(destination);
        try {
            System.out.println("從佇列" + destination.toString() + "收到了訊息:\t"
                    + tm.getText());

        } catch (JMSException e) {
            e.printStackTrace();
        }

        return tm;

    }

}

監聽消費1

package com.vzoom.bank.mq.finance.consumer.listener;

import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
import org.springframework.context.ApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

import javax.jms.*;


public class PreBankQueueMessageListener implements MessageListener {
    /**
     * 當收到訊息後,自動呼叫該方法
     * @param message
     */
    @Override
    public void onMessage(Message message) {
        TextMessage tm = (TextMessage) message;
        try {
            System.out.println("PreBankQueueMessageListener監聽到了文字訊息:" + tm.getText());

            //TODO
        } catch (JMSException e){
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {

        // 建立spring容器
        ApplicationContext context = new ClassPathXmlApplicationContext("common/ActiveMQ.xml");

        // 從spring容器中獲取JMSTemplate,這個物件是用於傳送訊息的
        JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class);

        //建立訊息模式
        Destination destination = (Destination)context.getBean("bankReqQueueDestination");

        // 使用JMSTemplate傳送訊息
        jmsTemplate.send(destination, new MessageCreator() {

            @Override
            public Message createMessage(Session session) throws JMSException {
                TextMessage textMessage = new ActiveMQTextMessage();
                textMessage.setText("spring-queue的方式傳送");

                System.out.println("已傳送訊息...");

                return textMessage;
            }
        });


    }


}

監聽消費2

package com.vzoom.bank.mq.finance.consumer.listener;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class TaxQueueMessageListener implements MessageListener {
    /**
     * 當收到訊息後,自動呼叫該方法
     * @param message
     */
    @Override
    public void onMessage(Message message) {
        TextMessage tm = (TextMessage) message;
        try {
            System.out.println("TaxQueueMessageListener監聽到了文字訊息:" + tm.getText());

            //TODO
        } catch (JMSException e){
            e.printStackTrace();
        }
    }

}

測試

package com.vzoom.bank.controller;

import com.vzoom.bank.mq.finance.consumer.service.ConsumerService;
import com.vzoom.bank.mq.finance.producer.service.ProducerService;
import com.vzoom.domain.util.JsonBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import javax.annotation.Resource;
import javax.jms.Destination;

@Controller
@RequestMapping(value = "/mq")
public class TestController {

    @Autowired
    ConsumerService consumerService;
    @Autowired
    ProducerService producerService;

    @Resource(name="preBankRespQueueDestination")
    private Destination receiveQueue;


    @RequestMapping(value = "/sendMsg")
    @ResponseBody
    public String sendMsg(String msg){
        try {
            producerService.sendMessage(msg);
        }catch (Exception e){
            return JsonBean.toJsonString(JsonBean.EXCEPTION_ERROR,"失敗");
        }
        return JsonBean.toJsonString(JsonBean.SUCCESS,"成功");
    }

    @RequestMapping(value = "/getMsg")
    @ResponseBody
    public Object getMsg(){
        try {
            consumerService.receive(receiveQueue);

        }catch (Exception e){
            return JsonBean.toJsonString(JsonBean.EXCEPTION_ERROR,"失敗");
        }
        return JsonBean.toJsonString(JsonBean.SUCCESS,"成功");
    }
}

結果

特別說明,以上點對點方式,監聽器消費方式和主動消費方式只能選擇一種,及配置了監聽,那麼主動消費就會失效,因為在訊息產生的瞬間,訊息就會被監聽器監聽到而被消費。