Spring整合ActiveMQ教程
寫在前面:這篇文章會涉及二者的整合思路,為了能夠更好地讀懂本文,建議最好你能夠在不整合其他環境的情況下使用ActiveMQ編寫簡單的控制檯Demo,並且能夠了解JMS(Java Message Service)。當然,你也可以直接略過整合思路看整合的過程。
簡介
ActiveMQ是Apache出品的一個訊息佇列(Message Queue)軟體,它可以與諸如C#、C++、PHP、Java等語言進行整合。本文重點敘述的是與Java Web中Spring框架的整合,ActiveMQ很好地實現了JMS介面,為編寫高併發的應用程式提供了高效的解決方案。
整合思路
在整合之前,我想先說一下思路,古人云:知其然知其所以然嘛~
Spring最厲害的地方就是它的Bean了,還有它特有的IOC(控制反轉)和AOP(面向切面程式設計)技術。有了這些,我們就可以不用new關鍵字構造物件,同時,可以方便地使用注入往類中的屬性進行初始化。如果你編寫過ActiveMQ之類的JMS應用程式,無論對於訊息的生產者還是消費者,最重要的介面有以下兩個:
1.ConnectionFactory
2.Destination
ConnectionFactory
是一切的基礎,有了它才有了Connection
,然後才有Session
,只有通過Session
物件,我們才能建立訊息佇列、構建生產者/消費者,繼而傳送/接收訊息。
Destination
試想,如果這一切都能借助Spring強大的Bean管理的話,我們在編寫程式的時候會更加的方便簡潔。幸運的是,ActiveMQ官方提供了完美的Spring框架支援,一切只需要在xml檔案中配置即可~
Spring官方提供了一個叫JmsTemplate
的類,這個類就專門用來處理JMS的,在該類的Bean配置標籤中有兩個屬性connectionFactory-ref
和defaultDestination-ref
正好對應JMS中的ConnectionFactory
和Destination
,如果你有興趣檢視原始碼的話,就可以發現JmsTemplate
OK,上面是一個基本的思路,還有一些細節會在整合的過程中說明。
整合過程
首先,先說一下我的配置環境:
- Spring:4.1.3.RELEASE
- Activemq:5.15.3
- IDE:IntelliJ IDEA 15.0.5
- Maven:3.3.9
- 此次整合是針對訊息佇列中的P2P(點對點)模式
專案結構:
Step1:配置Maven的pom.xml
這裡我只貼出關鍵的依賴選項:
1、JMS依賴
<dependency>
<groupId>javax.jms</groupId>
<artifactId>jms</artifactId>
<version>1.1</version>
</dependency>
2、 ActiveMQ核心依賴
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.7.0</version>
</dependency>
3、 Spring依賴(${spring.version}
的值為4.1.3.RELEASE)
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-oxm</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring.version}</version>
</dependency>
4、日誌依賴
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.6.1</version>
</dependency>
其中,slf4j一定要有,不然的話執行會報錯,如下圖,提示找不到包。
Step2:配置Spring的spring-activemq.xml
1、首先是各種約束
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd"
></beans>
2、如果你在專案中使用了註解,請開啟註解的自動掃描(這裡略)
3、連線到ActiveMQ,建立一個ConnectionFactory
<bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"
p:brokerURL="tcp://localhost:61616"></bean>
4、對上步建立的ConnectionFactory
進行快取包裝,這樣做的目的是提升效能,對sessions, connections 和 producers進行快取複用,減少開銷。
<bean id="cachedConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"
p:targetConnectionFactory-ref="amqConnectionFactory"
p:sessionCacheSize="10"></bean>
5、建立訊息目的地,constructor-arg
是目的地名稱
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<!--訊息佇列名稱-->
<constructor-arg value="FOO.TEST"/>
</bean>
6、構建JmsTemplate
<bean id="producerTemplate" class="org.springframework.jms.core.JmsTemplate"
p:connectionFactory-ref="cachedConnectionFactory"
p:defaultDestination-ref="destination"></bean>
7、對於訊息的消費者,Spring官方提供了一個叫 DMLS(DefaultMessageListenerContainer)的容器,它能有效剋制MDB(Message Driven Beans)的缺點。
要使用這個容器,我們需要建立自己的監聽器(下面會提及),並且註冊進容器中,這樣一旦目的地有訊息,就會自動觸發監聽事件。
<jms:listener-container
container-type="default"
connection-factory="amqConnectionFactory"
acknowledge="auto">
<jms:listener destination="FOO.TEST" ref="simpleMsgListener" method="onMessage"></jms:listener>
</jms:listener-container>
其中,connection-factory
與前面用org.apache.activemq.ActiveMQConnectionFactory
包建立的bean的id對應,listener標籤中destination
填前面建立的目的地名稱,ref
填listener的bean id。
Step3:建立監聽器
我這裡建立了一個叫SimpleMsgListener
的監聽器
package com.test.listener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
* Created by Martin Huang on 2018/4/20.
*/
//bean id
@Component(value = "simpleMsgListener")
public class SimpleMsgListener implements MessageListener {
//收到資訊時的動作
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("收到的資訊:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
Step4:建立資訊生成器
package com.test.creator;
import org.springframework.jms.core.MessageCreator;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
/**
* Created by Martin Huang on 2018/4/20.
*/
public class MyMessageCreator implements MessageCreator {
private int id;
public MyMessageCreator(int id)
{
this.id = id;
}
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage message = session.createTextMessage("Spring-ActiveMQ傳送的第【"+id+"】條訊息");
System.out.println("Spring-ActiveMQ傳送的第【"+id+"】條訊息");
return message;
}
}
Step5:建立生產者
package com.test.producer;
import com.test.creator.MyMessageCreator;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
/**
* Created by Martin Huang on 2018/4/20.
*/
@Component(value = "producer")
public class SimpleProducer {
@Autowired
private JmsTemplate jmsTemplate;
public void sendMessage() throws Exception
{
//每次傳送10條資訊
for(int i = 0 ; i < 10 ; i++)
{
//這裡填入建立好的資訊生成器
jmsTemplate.send(new MyMessageCreator(i));
}
}
}
Step6:測試
首先開啟ActiveMQ服務,然後編寫我們的測試類
@Test
public void testAmqProducer()
{
ApplicationContext context = new ClassPathXmlApplicationContext("spring-activemq.xml");
SimpleProducer simpleProducer = (SimpleProducer) context.getBean("producer");
try {
simpleProducer.sendMessage();
} catch (Exception e) {
e.printStackTrace();
}
}
如果順利的話,你可能會看到以下結果:
ActiveMQ控制檯:
可以發現這裡傳送了10條訊息,但是隻處理了8條。這個測試便體現了DMLC(DefaultMessageListenerContainer)的特點:它是非同步容器,它不一定要等有訊息處理之後,再發送新的訊息。
提示
listener-container
可以同時支援多個監聽器,如果你設定了多個監聽器,那麼這些監聽器會輪流去佇列中獲取資訊處理。比如我定義了兩個監聽器SimpleMsgListener
和SimpleMsgListener1
,那麼執行的效果是這樣的: