1. 程式人生 > >spring-消息

spring-消息

cto div () localhost 進行 實例 默認 消息隊列 發送

1、異步消息

  當一個消息發送時候,消息會被交給消息代理,消息代理可以確保消息被發送到指定的目的地,同時解放發送者,使其能夠繼續進行其它業務。消息代理通常有ActiveMQ、RabbitMQ...,目的地通常有隊列和主題,隊列采用點對點的模型,主題采用發布訂閱模型

  • 點對點模型:消息隊列可以有多個接受者,但每條消息只能被一個接收者取走

技術分享圖片

  • 發布訂閱模型:消息隊列可以有多個訂閱者,每條消息可以發送給多個主題訂閱者

技術分享圖片

2、JMS發送/接收消息

1)activemq配置,使用ActiveMQ,並使用了JMSTemplate。

  JMS模板為開發者提供了與消息代理進行交互發送和接收消息的標準API,幾乎每個消息代理都支持JMS。Jms模板和spring Date提供的jdbc模板一樣可以消除樣板代碼,讓開發更集中在業務處理上。

   <!-- ActiveMQ連接工廠 -->
    <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616"></property>
    </bean>
    <!-- 消息隊列 -->
    <bean id="queue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="userqueue"></constructor-arg>
    </bean>
    <!-- 消息主題 -->
    <bean id="topic" class
="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="usertopic"></constructor-arg> </bean> <!-- 定義模板 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="activeMQConnectionFactory"></property>   </bean>

2)發送消息

package com.cn.activemq;

import com.cn.pojo.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

@Component
public class SendMessageMQUtil {

    @Autowired
    private JmsTemplate jmsTemplate;

    @Autowired
    @Qualifier("queue")
    private Destination queue;

    @Autowired
    @Qualifier("topic")
    private Destination topic;

    /**
     * 隊列--發送消息
     * @param user
     */
    public void sendUserQueue(final User user){

        jmsTemplate.send(queue, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createObjectMessage(user);
            }
        });
    }

    /**
     * 主題--發送消息
     * @param user
     */
    public void sendUserTopic(final User user){

        jmsTemplate.send(topic, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createObjectMessage(user);
            }
        });
    }
}

3)接收消息

package com.cn.activemq;

import com.cn.pojo.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.ObjectMessage;

@Component
public class ReceiveMessageMQUtil {

    @Autowired
    private JmsTemplate jmsTemplate;

    @Autowired
    @Qualifier("queue")
    private Destination queue;

    @Autowired
    @Qualifier("topic")
    private Destination topic;

    /**
     * 隊列--接受消息
     * @return
     */
    public User receiveUserQueue(){
        try {
            ObjectMessage objectMessage=(ObjectMessage) jmsTemplate.receive(queue);
            return (User)objectMessage.getObject();
        } catch (JMSException e) {
            e.printStackTrace();
        }
        return null;
    }
    /**
     * 主題--接受消息
     * @return
     */
    public User receiveUserTopic(){
        try {
            ObjectMessage objectMessage=(ObjectMessage) jmsTemplate.receive(topic);
            return (User)objectMessage.getObject();
        } catch (JMSException e) {
            e.printStackTrace();
        }
        return null;
    }

}

4)測試

分別新建測試類

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath:spring-activemq.xml", "classpath:spring-redis.xml","classpath:springMvc.xml"})//不可使用classpath:spring-*.xml,否則配置文件不起作用
public class SendMessageMQUtilTest {

    @Autowired
    private SendMessageMQUtil sendMessageMQUtil;

    @Test
    public void sendUserQueue() throws Exception {
        User user=new User("computer1", "111111");
        sendMessageMQUtil.sendUserQueue(user);
    }

    @Test
    public void sendUserTopic() throws Exception {
        User user=new User("computer2", "222222");
        sendMessageMQUtil.sendUserTopic(user);
    }
}
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath:spring-activemq.xml", "classpath:spring-redis.xml","classpath:springMvc.xml"})
public class ReceiveMessageMQUtilTest {

    @Autowired
    private ReceiveMessageMQUtil receiveMessageMQUtil;

    @Test
    public void receiveUserQueue() throws Exception {

        User user=receiveMessageMQUtil.receiveUserQueue();
        System.out.println("ActiveMQ接收到的數據:"+user);
    }

    @Test
    public void receiveUserTopic() throws Exception {

        User user=receiveMessageMQUtil.receiveUserTopic();
        System.out.println("ActiveMQ接收到的數據:"+user);
    }

    @Test
    public void receiveUserTopic2() throws Exception {

        User user=receiveMessageMQUtil.receiveUserTopic();
        System.out.println("ActiveMQ接收到的數據:"+user);
    }

}

分別運行測試方法,結合activeMQ的控制臺可以看出隊列、主題以及各自的消費者等情況

顯示隊列:

技術分享圖片

顯示主題:

技術分享圖片

3、其它

1)設置默認的目的地

  上述在發送消息和接收消息時,每次調用發送/接收消息的方法都傳入了一個目的地參數。然而,可以在JmsTemplate實例化的時候,指定默認的目的地,如下:

    <!-- 定義隊列 -->
    <bean id="queue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="userqueue"></constructor-arg>
    </bean>
    <!-- 定義模板 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="activeMQConnectionFactory"></property>
        <property name="defaultDestination" ref="queue"></property>//此處註入隊列,也可以註入主題
    </bean>

采用指定默認目的地的方式,則發送/接收消息調用的方法不用傳遞目的地了

 /**
     * 隊列--發送消息
     * @param user
     */
    public void sendUserQueue(final User user){

        jmsTemplate.send(new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createObjectMessage(user);
            }
        });

 /**
     * 隊列--接受消息
     * @return
     */
    public User receiveUserQueue(){
        try {
            ObjectMessage objectMessage=(ObjectMessage) jmsTemplate.receive();
            return (User)objectMessage.getObject();
        } catch (JMSException e) {
            e.printStackTrace();
        }
        return null;
    }
   }

2)發送消息對消息進行轉換

  除了send(..)方法,JmsTemplate還提供了convertAndSend()方法,該方法不需要MessageCreator參數,而使用內置的消息轉換器創建消息並發送。在JmsTemplate實例化時未指定消息轉換器,在調用convertAndSend()方法則使用默認的SimpleMessageConverter消息轉換器;receiveAndConvert()方法則在接收時候使用消息轉換器

/**
     * 隊列--發送消息
     * @param user
     */
    public void sendUserQueue(final User user){

        jmsTemplate.convertAndSend(user);
    }
    /**
     * 隊列--接受消息
     * @return
     */
    public User receiveUserQueue(){
        return (User)jmsTemplate.receiveAndConvert();
    }

在JmsTemplate實例化指定消息轉換器,則會在使用convertAndSend()/receiveAndConvert()方法使用指定的消息轉換器

    <!-- 定義隊列 -->
    <bean id="queue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="userqueue"></constructor-arg>
    </bean>
    <!-- 消息轉換器  -->
    <bean id="mappingJackson2MessageConverter" class="org.springframework.jms.support.converter.MappingJackson2MessageConverter"></bean>
    <!-- 定義模板 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="activeMQConnectionFactory"></property>
        <property name="defaultDestination" ref="queue"></property>
        <property name="messageConverter" ref="mappingJackson2MessageConverter"></property>
    </bean>

綜合1)2),使用convertAndSend()/receiveAndConvert()發送和接收消息更加簡單;在某些情況下,統一配置目的地也簡化的使用

spring-消息