Spring JMS---三種訊息監聽器
訊息監聽器MessageListener
在spring整合JMS的應用中我們在定義訊息監聽器的時候一共可以定義三種類型的訊息監聽器,分別是MessageListener、SessionAwareMessageListener和MessageListenerAdapter。下面就分別來介紹一下這幾種型別的區別。
1. MessageListener
MessageListener是最原始的訊息監聽器,它是JMS規範中定義的一個介面。其中定義了一個用於處理接收到的訊息的onMessage方法,該方法只接收一個Message引數。我們前面在講配置消費者的時候用的訊息監聽器就是MessageListener,程式碼如下:
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class ConsumerMessageListener implements MessageListener {
public void onMessage(Message message) {
//這裡我們知道生產者傳送的就是一個純文字訊息,所以這裡可以直接進行強制轉換,或者直接把onMessage方法的引數改成Message的子類TextMessage
TextMessage textMsg = (TextMessage) message;
System.out.println("接收到一個純文字訊息。");
try {
System.out.println("訊息內容是:" + textMsg.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
- SessionAwareMessageListener
SessionAwareMessageListener是Spring為我們提供的,它不是標準的JMS MessageListener。MessageListener的設計只是純粹用來接收訊息的,假如我們在使用MessageListener處理接收到的訊息時我們需要傳送一個訊息通知對方我們已經收到這個訊息了,那麼這個時候我們就需要在程式碼裡面去重新獲取一個Connection或Session。SessionAwareMessageListener的設計就是為了方便我們在接收到訊息後傳送一個回覆的訊息,它同樣為我們提供了一個處理接收到的訊息的onMessage方法,但是這個方法可以同時接收兩個引數,一個是表示當前接收到的訊息Message,另一個就是可以用來發送訊息的Session物件。先來看一段程式碼:
package com.tiantian.springintejms.listener;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.springframework.jms.listener.SessionAwareMessageListener;
public class ConsumerSessionAwareMessageListener implements
SessionAwareMessageListener<TextMessage> {
private Destination destination;
public void onMessage(TextMessage message, Session session) throws JMSException {
System.out.println("收到一條訊息");
System.out.println("訊息內容是:" + message.getText());
MessageProducer producer = session.createProducer(destination);
Message textMessage = session.createTextMessage("ConsumerSessionAwareMessageListener。。。");
producer.send(textMessage);
}
public Destination getDestination() {
returndestination;
}
public void setDestination(Destination destination) {
this.destination = destination;
}
}
在上面程式碼中我們定義了一個SessionAwareMessageListener,在這個Listener中我們在接收到了一個訊息之後,利用對應的Session建立了一個到destination的生產者和對應的訊息,然後利用建立好的生產者傳送對應的訊息。
接著我們在Spring的配置檔案中配置該訊息監聽器將處理來自一個叫sessionAwareQueue的目的地的訊息,並且往該MessageListener中通過set方法注入其屬性destination的值為queueDestination。這樣當我們的SessionAwareMessageListener接收到訊息之後就會往queueDestination傳送一個訊息。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">
<context:component-scan base-package="com.tiantian" />
<!-- Spring提供的JMS工具類,它可以進行訊息傳送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory物件 -->
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
<!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>
<!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory"/>
</bean>
<!--這個是佇列目的地-->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>queue</value>
</constructor-arg>
</bean>
<!--這個是sessionAwareQueue目的地-->
<bean id="sessionAwareQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>sessionAwareQueue</value>
</constructor-arg>
</bean>
<!-- 訊息監聽器 -->
<bean id="consumerMessageListener" class="com.tiantian.springintejms.listener.ConsumerMessageListener"/>
<!-- 可以獲取session的MessageListener -->
<bean id="consumerSessionAwareMessageListener" class="com.tiantian.springintejms.listener.ConsumerSessionAwareMessageListener">
<property name="destination" ref="queueDestination"/>
</bean>
<!-- 訊息監聽容器 -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueDestination" />
<property name="messageListener" ref="consumerMessageListener" />
</bean>
<bean id="sessionAwareListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="sessionAwareQueue" />
<property name="messageListener" ref="consumerSessionAwareMessageListener" />
</bean>
</beans>
接著我們來做一個測試,測試程式碼如下:
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("/applicationContext.xml")
public class ProducerConsumerTest {
@Autowired
private ProducerService producerService;
@Autowired
@Qualifier("sessionAwareQueue")
private Destination sessionAwareQueue;
@Test
public void testSessionAwareMessageListener() {
producerService.sendMessage(sessionAwareQueue, "測試SessionAwareMessageListener");
}
}
在上述測試程式碼中,我們通過前面定義好的生產者往我們定義好的SessionAwareMessageListener監聽的sessionAwareQueue傳送了一個訊息。程式執行之後控制檯輸出如下:
這說明我們已經成功的往sessionAwareQueue傳送了一條純文字訊息,訊息會被ConsumerSessionAwareMessageListener的onMessage方法進行處理,在onMessage方法中ConsumerSessionAwareMessageListener就是簡單的把接收到的純文字資訊的內容打印出來了,之後再往queueDestination傳送了一個純文字訊息,訊息內容是“ConsumerSessionAwareMessageListener…”,該訊息隨後就被ConsumerMessageListener處理了,根據我們的定義,在ConsumerMessageListener中也只是簡單的列印了一下接收到的訊息內容
- MessageListenerAdapter
MessageListenerAdapter類實現了MessageListener介面和SessionAwareMessageListener介面,它的主要作用是將接收到的訊息進行型別轉換,然後通過反射的形式把它交給一個普通的Java類進行處理。
MessageListenerAdapter會把接收到的訊息做如下轉換:
TextMessage轉換為String物件;
BytesMessage轉換為byte陣列;
MapMessage轉換為Map物件;
ObjectMessage轉換為對應的Serializable物件。
既然前面說了MessageListenerAdapter會把接收到的訊息做一個型別轉換,然後利用反射把它交給真正的目標處理器——一個普通的Java類進行處理(如果真正的目標處理器是一個MessageListener或者是一個SessionAwareMessageListener,那麼Spring將直接使用接收到的Message物件作為引數呼叫它們的onMessage方法,而不會再利用反射去進行呼叫),那麼我們在定義一個MessageListenerAdapter的時候就需要為它指定這樣一個目標類。這個目標類我們可以通過MessageListenerAdapter的構造方法引數指定,如:
<!-- 訊息監聽介面卡 -->
<bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="com.tiantian.springintejms.listener.ConsumerListener"/>
</constructor-arg>
</bean>
也可以通過它的delegate屬性來指定,如
<!-- 訊息監聽介面卡 -->
<bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<property name="delegate">
<bean class="com.tiantian.springintejms.listener.ConsumerListener"/>
</property>
<property name="defaultListenerMethod" value="receiveMessage"/>
</bean>
前面說了如果我們指定的這個目標處理器是一個MessageListener或者是一個SessionAwareMessageListener的時候Spring將直接利用接收到的Message物件作為方法引數呼叫它們的onMessage方法。但是如果指定的目標處理器是一個普通的Java類時Spring將利用Message進行了型別轉換之後的物件作為引數通過反射去呼叫真正的目標處理器的處理方法,那麼Spring是如何知道該呼叫哪個方法呢?這是通過MessageListenerAdapter的defaultListenerMethod屬性來決定的,當我們沒有指定該屬性時,Spring會預設呼叫目標處理器的handleMessage方法。
接下來我們來看一個示例,假設我們有一個普通的Java類ConsumerListener,其對應有兩個方法,handleMessage和receiveMessage,其程式碼如下:
package com.tiantian.springintejms.listener;
public class ConsumerListener {
public void handleMessage(String message) {
System.out.println("ConsumerListener通過handleMessage接收到一個純文字訊息,訊息內容是:" + message);
}
public void receiveMessage(String message) {
System.out.println("ConsumerListener通過receiveMessage接收到一個純文字訊息,訊息內容是:" + message);
}
}
假設我們要把它作為一個訊息監聽器來監聽傳送到adapterQueue的訊息,這個時候我們就可以定義一個對應的MessageListenerAdapter來把它當做一個MessageListener使用。
<!-- 訊息監聽介面卡 -->
<bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<property name="delegate">
<bean class="com.tiantian.springintejms.listener.ConsumerListener"/>
</property>
<property name="defaultListenerMethod" value="receiveMessage"/>
</bean>
當然,有了MessageListener之後我們還需要配置其對應的MessageListenerContainer,這裡配置如下:
<!-- 訊息監聽介面卡對應的監聽容器 -->
<bean id="messageListenerAdapterContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="adapterQueue"/>
<property name="messageListener" ref="messageListenerAdapter"/><!-- 使用MessageListenerAdapter來作為訊息監聽器 -->
</bean>
<!-- 用於測試訊息監聽介面卡的佇列目的地 -->
<bean id="adapterQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>adapterQueue</value>
</constructor-arg>
</bean>
在上面的MessageListenerAdapter中我們指定了其defaultListenerMethod屬性的值為receiveMessage,所以當MessageListenerAdapter接收到訊息之後會自動的呼叫我們指定的ConsumerListener的receiveMessage方法。
MessageListenerAdapter除了會自動的把一個普通Java類當做MessageListener來處理接收到的訊息之外,其另外一個主要的功能是可以自動的傳送返回訊息。
當我們用於處理接收到的訊息的方法的返回值不為空的時候,Spring會自動將它封裝為一個JMS Message,然後自動進行回覆。那麼這個時候這個回覆訊息將傳送到哪裡呢?這主要有兩種方式可以指定。
第一,可以通過傳送的Message的setJMSReplyTo方法指定該訊息對應的回覆訊息的目的地。這裡我們把我們的生產者傳送訊息的程式碼做一下修改,在傳送訊息之前先指定該訊息對應的回覆目的地為一個叫responseQueue的佇列目的地,具體程式碼如下所示:
package com.tiantian.springintejms.service.impl;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
import com.tiantian.springintejms.service.ProducerService;
@Component
public class ProducerServiceImpl implements ProducerService {
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
@Qualifier("responseQueue")
private Destination responseDestination;
public void sendMessage(Destination destination, final String message) {
System.out.println("---------------生產者傳送訊息-----------------");
System.out.println("---------------生產者發了一個訊息:" + message);
jmsTemplate.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
TextMessage textMessage = session.createTextMessage(message);
textMessage.setJMSReplyTo(responseDestination);
return textMessage;
}
});
}
}
接著定義一個叫responseQueue的佇列目的地及其對應的訊息監聽器和監聽容器。
<!-- 用於測試訊息回覆的 -->
<bean id="responseQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>responseQueue</value>
</constructor-arg>
</bean>
<!-- responseQueue對應的監聽器 -->
<bean id="responseQueueListener" class="com.tiantian.springintejms.listener.ResponseQueueListener"/>
<!-- responseQueue對應的監聽容器 -->
<bean id="responseQueueMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="responseQueue"/>
<property name="messageListener" ref="responseQueueListener"/>
</bean>
ResponseQueueListener的定義如下所示:
public class ResponseQueueListener implements MessageListener {
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接收到傳送到responseQueue的一個文字訊息,內容是:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
接著把我們接收訊息的ConsumerListener的receiveMessage方法改為如下:
/**
* 當返回型別是非null時MessageListenerAdapter會自動把返回值封裝成一個Message,然後進行回覆
* @param message
* @return
*/
public String receiveMessage(String message) {
System.out.println("ConsumerListener通過receiveMessage接收到一個純文字訊息,訊息內容是:" + message);
return "這是ConsumerListener物件的receiveMessage方法的返回值。";
}
我們可以看到在上述負責接收訊息的receiveMessage方法有一個非空的返回值。
接著我們執行我們的測試程式碼,利用生產者往我們定義好的MessageListenerAdapter負責處理的adapterQueue目的地傳送一個訊息。測試程式碼如下所示:
這說明我們的生產者傳送訊息被MessageListenerAdapter處理之後,MessageListenerAdapter確實把監聽器的返回內容封裝成一個Message往原Message通過setJMSReplyTo方法指定的回覆目的地傳送了一個訊息。對於MessageListenerAdapter對應的監聽器處理方法返回的是一個null值或者返回型別是void的情況,MessageListenerAdapter是不會自動進行訊息的回覆的,有興趣的網友可以自己測試一下。
第二,通過MessageListenerAdapter的defaultResponseDestination屬性來指定。這裡我們也來做一個測試,首先維持生產者傳送訊息的程式碼不變,即傳送訊息前不通過Message的setJMSReplyTo方法指定訊息的回覆目的地;接著我們在定義MessageListenerAdapter的時候通過其defaultResponseDestination屬性指定其預設的回覆目的地是“defaultResponseQueue”,並定義defaultResponseQueue對應的訊息監聽器和訊息監聽容器。
<!-- 訊息監聽介面卡 -->
<bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<!-- <constructor-arg>
<bean class="com.tiantian.springintejms.listener.ConsumerListener"/>
</constructor-arg> -->
<property name="delegate">
<bean class="com.tiantian.springintejms.listener.ConsumerListener"/>
</property>
<property name="defaultListenerMethod" value="receiveMessage"/>
<property name="defaultResponseDestination" ref="defaultResponseQueue"/>
</bean>
<!-- 訊息監聽介面卡對應的監聽容器 -->
<bean id="messageListenerAdapterContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="adapterQueue"/>
<property name="messageListener" ref="messageListenerAdapter"/><!-- 使用MessageListenerAdapter來作為訊息監聽器 -->
</bean>
<!-- 預設的訊息回覆佇列 -->
<bean id="defaultResponseQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>defaultResponseQueue</value>
</constructor-arg>
</bean>
<!-- defaultResponseQueue對應的監聽器 -->
<bean id="defaultResponseQueueListener" class="com.tiantian.springintejms.listener.DefaultResponseQueueListener"/>
<!-- defaultResponseQueue對應的監聽容器 -->
<bean id="defaultResponseQueueMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="defaultResponseQueue"/>
<property name="messageListener" ref="defaultResponseQueueListener"/>
</bean>