spring整合JMS一同步收發訊息(基於ActiveMQ的實現)
1. 安裝ActiveMQ
注意:JDK版本需要1.7及以上才行
bin目錄結構如下: |
如果我們是32位的機器,就雙擊win32目錄下的activemq.bat,如果是64位機器,則雙擊win64目錄下的activemq.bat,執行結果如下:
啟動成功!成功之後在瀏覽器輸入http://127.0.0.1:8161/地址,可以看到ActiveMQ的管理頁面,使用者名稱和密碼預設都是admin,如下:
2. 新建一個Maven工程,並配置pom檔案如下:
3. 配置連線工廠(ConnectionFactory)<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.chhliu.myself</groupId> <artifactId>activemq_start</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>activemq_start</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <spring-version>3.2.5.RELEASE</spring-version> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.10</version> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring-version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>${spring-version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>${spring-version}</version> </dependency> <dependency> <groupId>javax.annotation</groupId> <artifactId>jsr250-api</artifactId> <version>1.0</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.13.3</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>2.0</version> </dependency> </dependencies> </project>
Spring給我們提供瞭如下的連線工廠:
其中SingleConnectionFactory保證每次返回的都是同一個連線,CachingConnectionFactory繼承了SingleConnectionFactory,在保證同一連線的同時,增加了快取的功能,可以快取Session以及生產者,消費者。當然,JMS提供的連線工廠只是用來實現管理的,並不是真正連線MQ的,真正的連線工廠需要具體的MQ廠商提供,下面我們以ActiveMQ為例來說明,配置如下:
<!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616" /> </bean> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="targetConnectionFactory" /> </bean>
為了減少我們連線的資源消耗,ActiveMQ為我們提供了一個連線工廠管理池--PooledConnectionFactory,通過連線工廠池,可以將Connection,Session等都放在池裡面,用的時候直接返回池裡面的內容,無需臨時建立連線,節約開銷。配置如下:
4. 配置JmsTemplate<!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616" /> </bean> <!-- 通過往PooledConnectionFactory注入一個ActiveMQConnectionFactory可以用來將Connection,Session和MessageProducer池化這樣可以大大減少我們的資源消耗, --> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="connectionFactory" ref="targetConnectionFactory" /> <property name="maxConnections" value="10" /> </bean> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="pooledConnectionFactory" /> </bean>
配置好連線工廠之後,就需要配置JMS的JmsTemplate,JmsTemplate的作用和JdbcTemplate類似,我們傳送和接收訊息,都是通過JmsTemplate來實現的,配置如下:
<!-- 配置生產者:配置好ConnectionFactory之後我們就需要配置生產者。生產者負責產生訊息併發送到JMS伺服器,這通常對應的是我們的一個業務邏輯服務實現類。 但是我們的服務實現類是怎麼進行訊息的傳送的呢?這通常是利用Spring為我們提供的JmsTemplate類來實現的, 所以配置生產者其實最核心的就是配置進行訊息傳送的JmsTemplate。對於訊息傳送者而言,它在傳送訊息的時候要知道自己該往哪裡發, 為此,我們在定義JmsTemplate的時候需要往裡面注入一個Spring提供的ConnectionFactory物件 -->
<!-- Spring提供的JMS工具類,它可以進行訊息傳送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory物件 -->
<property name="connectionFactory" ref="connectionFactory" />
</bean>
5. 生產者實現
配置完這些之後,我們就可以寫程式碼實現生產者和消費者了,生產者主要用來生產訊息,並向目的佇列中推送訊息,介面定義如下:
public interface ProducerService {
void sendMessage(Destination destination, final String message);
}
實現類程式碼如下:
@Service("producerServiceImpl")
public class ProducerServiceImpl implements ProducerService {
/**
* 注入JmsTemplate
*/
@Resource(name="jmsTemplate")
private JmsTemplate jTemplate;
/**
* attention:
* Details:傳送訊息
* @author chhliu
* 建立時間:2016-7-28 下午2:33:14
* @param destination
* @param message
*/
@Override
public void sendMessage(Destination receivedestination, final String message) {
System.out.println("================生產者建立了一條訊息==============");
jTemplate.send(receivedestination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage("hello acticeMQ:"+message);
}
});
}
}
6. 消費者實現
假設生產者已經建立了一條訊息,並推送到了對應的佇列中,消費者需要從這個佇列中取出訊息,並同時回覆一條報文,自己已經收到了這條訊息,為了測試回覆報文的功能,我們下面會將回復報文放到另一個佇列中,此例使用同步接收訊息的方式,而不是非同步監聽的方式實現,介面定義如下:
public interface ConsumerService {
String receiveMessage(Destination destination, Destination replyDestination);
}
實現類程式碼如下:
@Service("consumerServiceImpl")
public class ConsumerServiceImpl implements ConsumerService {
/**
* 注入JmsTemplate
*/
@Resource(name="jmsTemplate")
private JmsTemplate jTemplate;
/**
* attention:
* Details:接收訊息,同時回覆訊息
* @author chhliu
* 建立時間:2016-7-28 下午2:39:45
* @param destination
* @return
*/
@Override
public String receiveMessage(Destination destination, Destination replyDestination) {
/**
* 接收訊息佇列中的訊息
*/
Message message = jTemplate.receive(destination);
try {
/**
* 此處為了更好的容錯性,可以使用instanceof來判斷下訊息型別
*/
if(message instanceof TextMessage){
String receiveMessage = ((TextMessage) message).getText();
System.out.println("收到生產者的訊息:"+receiveMessage);
/**
* 收到訊息之後,將回復報文放到回覆佇列裡面去
*/
jTemplate.send(replyDestination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage("消費者已經收到生產者的訊息了,這是一條確認報文!");
}
});
return receiveMessage;
}
} catch (JMSException e) {
e.printStackTrace();
}
return "";
}
}
生產者和消費者實現之後,我們要做的就是配置隊列了,下面給出專案完整的配置檔案:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:p="http://www.springframework.org/schema/p" xmlns:cache="http://www.springframework.org/schema/cache"
xmlns:jpa="http://www.springframework.org/schema/data/jpa"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.2.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.2.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.2.xsd
http://www.springframework.org/schema/cache
http://www.springframework.org/schema/cache/spring-cache-3.2.xsd
http://www.springframework.org/schema/data/jpa
http://www.springframework.org/schema/data/jpa/spring-jpa-1.3.xsd">
<!-- 掃描註解包 -->
<context:annotation-config />
<context:component-scan base-package="com.chhliu.myself.activemq.start"></context:component-scan>
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="connectionFactory" ref="targetConnectionFactory" />
<property name="maxConnections" value="10" />
</bean>
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="pooledConnectionFactory" />
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
</bean>
<!-- 在真正利用JmsTemplate進行訊息傳送的時候,我們需要知道訊息傳送的目的地,即destination。 在Jms中有一個用來表示目的地的Destination介面,它裡面沒有任何方法定義,只是用來做一個標識而已。當我們在使用JmsTemplate進行訊息傳送時沒有指定destination的時候將使用預設的Destination。 預設Destination可以通過在定義jmsTemplate bean物件時通過屬性defaultDestination或defaultDestinationName來進行注入, defaultDestinationName對應的就是一個普通字串 -->
<!--這個是佇列目的地,點對點的 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>NTF_MOCK_INPUT</value>
</constructor-arg>
</bean>
<!--這個是回覆佇列,點對點的 -->
<bean id="responseQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>NTF_MOCK_OUTPUT</value>
</constructor-arg>
</bean>
</beans>
到這裡,所有的程式碼和配置檔案就都整好了,下面就是進行測試,測試程式碼如下:
生產者測試程式碼:
package com.chhliu.myself.activemq.start.sync;
import javax.annotation.Resource;
import javax.jms.Destination;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "classpath:applicationContext.xml" })
public class SyncProducerActiveMQTest {
@Resource(name="producerServiceImpl")
private ProducerService pService;
@Resource(name="queueDestination")
private Destination receiveQueue;
@Test
public void producerTest(){
pService.sendMessage(receiveQueue, "my name is chhliu!");
}
}
消費者測試程式碼:
package com.chhliu.myself.activemq.start.sync;
import javax.annotation.Resource;
import javax.jms.Destination;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "classpath:applicationContext.xml" })
public class SyncConsumerActiveMQTest {
@Resource(name="consumerServiceImpl")
private ConsumerService cService;
@Resource(name="queueDestination")
private Destination receiveQueue;
@Resource(name="responseQueue")
private Destination replyQueue;
@Test
public void producerTest(){
String result = cService.receiveMessage(receiveQueue, replyQueue);
System.out.println(result);
}
}
測試結果如下:
<span style="background-color: rgb(51, 255, 51);">生產者測試結果:
================生產者建立了一條訊息==============
消費者測試結果:
收到生產者的訊息:hello acticeMQ:my name is chhliu!
hello acticeMQ:my name is chhliu!</span>
再來看下ActiveMQ的管理頁面的結果:
從管理頁面中可以看到,生產者生產了訊息,並且入隊列了,同時消費者也消費了訊息,並將回覆訊息放到了回覆佇列中,測試成功。
但是這種同步取訊息的方式有個缺點,每次只會取一條訊息消費,取完之後就會一直阻塞,下面來測試一下:首先讓生產者再生產5條訊息,然後執行消費者程式,發現會只消費一條訊息,除非我們在消費者程式裡面加while(true),一直輪詢佇列,這種實現方式不僅耗記憶體,效率也不是很高,後面,我們會對這種方式進行改進,使用非同步監聽模式,測試效果如下:
生產者建立了5條訊息: =======生產者建立了一條訊息======== =======生產者建立了一條訊息======== =======生產者建立了一條訊息======== =======生產者建立了一條訊息======== ======生產者建立了一條訊息========= ActiveMQ管理頁面如下: 消費者消費一條訊息: 收到生產者的訊息:hello acticeMQ:my name is chhliu! hello acticeMQ:my name is chhliu! 消費者消費訊息後,ActiveMQ管理頁面如下: |
從上面的對比中,我們可以看出來,同步模式下,消費者消費訊息時,是逐條消費,每次只消費一條訊息。