SpringBoot與JMS整合(中介軟體為ActiveMQ)
一、ActiveMQ學習
1、基本概念以及介紹
Apache ActiveMQ是最受歡迎和強有力的開源訊息和整合模式伺服器,支援許多跨語言客戶端和協議,便利使用企業整合模式還有許多先進的特性。
相關命令
activemq start:啟動
activemq stop:關閉
管理介面:
http://127.0.0.1:8161/admin/
login:admin
password:admin
監聽埠
ActiveMQ預設埠是61616。
windows檢視埠:netstat -an|find "61616"
unix檢視埠:netstat -nl|grep 61616
2、兩種訊息模式
(1)點對點模式
點對點的模式主要建立在一個佇列上面,當連線一個佇列的時候,傳送端不需要知道接收端是否正在接收,傳送的訊息,將會先進入訊息佇列中,如果
有接收端在監聽,則會發向接收端,如果沒有接收端接收,則會儲存到activemq伺服器,直到接收端接收資訊,點對點的訊息模式可以有多個傳送端,
多個接收端,但是一條訊息,只會被一個接收端接收,那個接收端先連上activemq,則會先接收到,而後來的接收端則接收不到那條資訊。
(2)訂閱/釋出模式
訂閱/釋出模式,同樣可以有多個傳送端與接收端,但是接收端和傳送端存在時間上的依賴,如果傳送端傳送訊息的時候,接收端並沒有接收資訊,那麼
activemq不會儲存資訊,認為訊息已經發送。換一種說法,就是傳送端傳送訊息的時候,接收端不線上,是接收端不到資訊的,哪怕以後監聽訊息,同樣
也是接收不到的。這個模式還有一個特點,傳送端傳送的訊息,將會被所有的接收端接收。不類似點對點,一條訊息只會被一個接收端接收到。
點對點模式傳送端
public class QueueSender {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection conn = factory.createConnection();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 開啟連線
conn.start();
Destination des = session.createQueue("sampleQueue");
MessageProducer producer = session.createProducer(des);
// 預設為persistent,當activemq關閉時,佇列資料將會被儲存。當為non_persistent時,activemq關閉時,佇列資料將會被清空。
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
producer.send(session.createTextMessage("Hello World!"));
producer.close();
session.close();
conn.close();
}
}
點對點模式接收端
public class QueueConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection conn = factory.createConnection();
// 開啟連線
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination des = session.createQueue("sampleQueue");
MessageConsumer consumer = session.createConsumer(des);
consumer.setMessageListener(message -> {
if (message instanceof TextMessage) {
TextMessage tm = TextMessage.class.cast(message);
try {
System.out.println(tm.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// 程式等待
System.in.read();
consumer.close();
session.close();
conn.close();
}
}
訂閱模式傳送端
public class TopicSender {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection conn = factory.createConnection();
// 開啟連線
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination des = session.createTopic("myTopic");
MessageProducer producer = session.createProducer(des);
// 預設為persistent,當activemq關閉時,佇列資料將會被儲存。當為non_persistent時,activemq關閉時,佇列資料將會被清空。
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
producer.send(session.createTextMessage("Hello World!"));
producer.close();
session.close();
conn.close();
}
}
訂閱模式接收端
public class TopicConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection conn = factory.createConnection();
// 開啟連線
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination des = session.createTopic("myTopic");
MessageConsumer consumer = session.createConsumer(des);
consumer.setMessageListener(message -> {
if (message instanceof TextMessage) {
TextMessage tm = TextMessage.class.cast(message);
try {
System.out.println(tm.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// 程式等待
System.in.read();
consumer.close();
session.close();
conn.close();
}
}
3、傳送訊息的資料型別
//純字串的資料
session.createTextMessage();
//序列化的物件
session.createObjectMessage();
//流,可以用來傳遞檔案等
session.createStreamMessage();
//用來傳遞位元組
session.createBytesMessage();
//這個方法創建出來的就是一個map,可以把它當作map來用,當你看了它的一些方法,你就懂了
session.createMapMessage();
//這個方法,拿到的是javax.jms.Message,是所有message的介面
session.createMessage();
4、保證訊息的成功處理
建立session的時候使用客戶端確認模式,如:
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
AUTO_ACKNOWLEDGE:
當訊息傳送給接收端之後,就自動確認成功了,而不管接收端有沒有處理成功,而一旦確認成功後,就會把佇列裡面的訊息給清除掉,避免下一個接收端
收到同樣的訊息。
CLIENT_ACKNOWLEDGE:
由客戶端進行確認,當客戶端接收到訊息進行處理後,客戶端呼叫Message介面的acknowledge方法進行確認,如果不確定,activemq將會發給下一個接收端處理。
注意:CLIENT_ACKNOWLEDGE模式只在點對點模式有效,訂閱/釋出模式中,即使不確認,也不會儲存訊息。
二、Spring中的JMS
1、Spring整合JMS各包介紹
Spring提供了JMS整合框架,簡化了JMS API的使用,JMS大致能分為兩塊功能:訊息的生產和消費。JMSTemplate類用於訊息的生產和非同步訊息的接收,
Spring提供了許多訊息訊息監聽器容器用來建立訊息驅動的POJO,同時也提供了宣告式的方法來建立訊息監聽器。
org.springframework.jms.core提供了使用JMS的核心功能,包括通過處理資源建立和釋放來簡化JMS API使用的JMS模板類。Spring模板類的設計原則是提供
幫助方法執行通用的操作和為了更加複雜的使用,而把處理任務的核心委託給使用者實現的回撥介面。
org.springframework.jms.support提供了JMSException的異常轉化功能,會把受檢的JMSException轉換成非受檢異常,例如javax.jms.JMSException會被包裝為
UncategorizedJmsException。
org.springframework.jms.support.converter提供了在Java物件和JMS訊息之間的訊息轉換器抽象。
org.springframework.jms.support.destination提供了許多管理JMS目的地的策略,比如為儲存在JNDI的目的地提供服務定位器。
org.springframework.jms.annotation提供了支援使用@JMSListener註解的必要設施。
org.springframework.jms.config提供了jms名稱空間的解析實現以及配置監聽器容器的java配置支援。
org.springframework.jms.connection提供了獨立應用程式中適合使用的連線工廠的使用。同時也包含了JMS的平臺事務管理器,也就是JmsTransactionManager。
2、使用Spring JMS
(1)JmsTemplate
JmsTemplate類簡化了JMS的使用,因為當傳送訊息和非同步接收訊息它會處理資源的建立和釋放,使用JmsTemplate只需要實現回撥介面。
MessageCreator回撥介面指定session就能建立訊息,對於更復雜的使用,SessionCallback提供使用者Jms會話和ProducerCallbacn回撥。
備註:JmsTemplate例項一旦配置就是執行緒安全的,這就意味著可以配置一個JmsTemplate類的例項然後注入共享引用到多個協作者中,
簡單來說,JmsTemplate是有狀態的,因為它維護者一個連線工廠的引用,但是這種狀態是非會話狀態。
(2)Connections
JmsTemplate需要一個連線工廠的引用,通常被客戶端應用作為工廠建立連線,而且它還包含了一些配置引數,許多引數是供應相關的,比如SSL配置選項。
(3)Caching Messaging Resources
在連線工廠和傳送操作間,有三個中間物件會被建立和銷燬,如下:
ConnectionFactory->Connection->Session->MessageProducer->send
為了優化資源的使用、提升效能,將會提供兩個連線工廠。
(4)連線工廠分類
SingleConnectionFactory:每次都會建立一個新的連線,適合測試
CachingConnectionFactory:基於SingleConnectionFactory的基礎上增加了Sessions、MessageProducers和MessageConsumer的快取,初始快取大小為1,
通過sessionCacheSize可以增加快取sessions的數量。
注意:實際快取Sessions的數量將會比設定的要大,因為快取的sessions是基於確認模式的。
備註:推薦使用apache的PooledConnectionFactory,PooledConnectionFactory支援Connection、Session和MessageProducer的池化,連線、會話和訊息生產者例項
使用後會返回池裡,以便後續使用。而Consumer不會池化,消費者處理完訊息後應該關閉,而不是放到池裡後續使用。儘管消費者空閒,但ActiveMQ會繼續傳送訊息
到消費者的預獲取緩衝區,訊息會在預獲取緩衝區阻塞,直到消費者再次被啟用。
(5)Destination Management
JmsTemplate會把目的地名的解析委託給JMS目的地解析物件,也就是DestinationResolver的實現。DynamicDestinationResolver是JmsTemplate使用的
預設實現,它提供解析動態目的地。
通常在JMS應用中的目的地只在執行時得知,當應用程式部署時是不會被建立的。這是因為在互動式的系統元件中有共享程式邏輯,這些系統元件會根據
一個已知的命名規則在執行期間建立目的地。
(6)Message Listener Containers
訊息監聽容器用來接收來自JMS訊息佇列的訊息並驅動已注入的訊息監聽器。監聽容器負責所有訊息接收的執行緒並分發訊息給監聽器,訊息監聽器容器是在
MDP(message-driven POJO)和訊息提供者的媒介,它關注接收訊息的註冊、參與事務、資源獲取和釋放以及異常轉化。
分類:
SimpleMessageListenerContainer
這種訊息監聽容器會在啟動時建立固定數量的JMS sessions和consumers,並且用標準的MessageConsumer.setMessageListener()註冊監聽器,然後讓JMS
訊息生產者呼叫回撥介面。
注意:SimpleMessageListenerContainer並不允許動態適應執行時需求以及參與外部事務,它只支援本地事務,將sessionTransacted標識設為true即可。來自訊息監聽器的異常
會導致回滾,而且訊息會重新發送。
DefaultMessageListenerContainer
這種訊息監聽器是使用最多的,和SimpleMessageListenerContainer相反,這種容器變體允許動態適應執行時需求,當配置了JtaTransactionManager時,每條被
接收的訊息都會伴隨XA transaction而註冊。這種監聽器在JMS生產者的低需求、參與外部管理事務的功能和相容Java EE環境中打造了平衡。
(7)Transaction management
Spring提供了JmsTransactionManager為單個連線工廠管理事務,允許應用程式利用Spring的事務管理特性。JmsTransactionManager管理本地資源事務,
繫結來自指定連線工廠的JMS連線/會話對到執行緒中,JmsTemplate自動檢測事務資源並相應進行操作。
三、SpringBoot與ActiveMQ集合使用
1、加入相關依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.iboxpay</groupId>
<artifactId>activemq_learning</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>war</packaging>
<name>activemq_learning</name>
<description>activemq_learning for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.4.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<!-- spring框架 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
<scope>provided</scope>
</dependency>
<!-- spring測試框架 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- mybatis與spring整合 -->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>1.3.2</version>
</dependency>
<!-- mysql連線驅動 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<!-- druid -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.10</version>
</dependency>
<!-- mq連線池 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
<!-- jsp -->
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-jasper</artifactId>
<scope>provided</scope>
</dependency>
<!-- jstl中包含標準標籤庫 -->
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>jstl</artifactId>
</dependency>
<!--javamelody應用監控 -->
<dependency>
<groupId>net.bull.javamelody</groupId>
<artifactId>javamelody-spring-boot-starter</artifactId>
<version>1.74.0</version>
</dependency>
<!-- fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.49</version>
</dependency>
<!-- commons lang -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<!-- commons code -->
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2、整合相關配置
@Configuration
@PropertySource("classpath:jms.properties")
public class ActiveMQConfig {
@Autowired
private JmsConfig jmsConfig;
@Autowired
private MyListener myListener;
/**
* ActiveMQ連線池
* @return
*/
@Bean(name = "pooledConnectionFactory", destroyMethod = "stop")
public PooledConnectionFactory pooledConnectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(jmsConfig.getBrokerUrl());
return new PooledConnectionFactory(connectionFactory);
}
/**
* 傳送或接收目的地
* @return
*/
@Bean("activeMQQueue")
public ActiveMQQueue activeMQQueue() {
return new ActiveMQQueue(jmsConfig.getDestinationName());
}
/**
* JmsTemplate訊息傳送模板類配置
* @param connectionFactory
* @param activeMQQueue
* @return
*/
@Bean
public JmsTemplate jmsTemplate(@Qualifier("pooledConnectionFactory") PooledConnectionFactory connectionFactory,
@Qualifier("activeMQQueue") ActiveMQQueue activeMQQueue) {
JmsTemplate jmsTemplate = new JmsTemplate();
// 指定連線工廠
jmsTemplate.setConnectionFactory(connectionFactory);
// 開啟事務
jmsTemplate.setSessionTransacted(true);
// 指定確認模式,預設為自動確認
jmsTemplate.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
// 指定傳送目的地
jmsTemplate.setDefaultDestination(activeMQQueue);
return jmsTemplate;
}
/**
* jms事務管理器,建立session時需開啟事務
* @param connectionFactory
* @return
*/
@Bean(name = "jmsTransactionManager")
public JmsTransactionManager jmsTransactionManager(
@Qualifier("pooledConnectionFactory") PooledConnectionFactory connectionFactory) {
return new JmsTransactionManager(connectionFactory);
}
/**
* 訊息監聽器容器,用來接收來自指定目的地的訊息
* @param connectionFactory
* @param jmsTransactionManager
* @param threadPoolTaskExecutor
* @param activeMQQueue
* @return
*/
@Bean
public DefaultMessageListenerContainer defaultMessageListenerContainer(
@Qualifier("pooledConnectionFactory") PooledConnectionFactory connectionFactory,
@Qualifier("jmsTransactionManager") JmsTransactionManager jmsTransactionManager,
@Qualifier("threadPoolTaskExecutor") ThreadPoolTaskExecutor threadPoolTaskExecutor,
@Qualifier("activeMQQueue") ActiveMQQueue activeMQQueue) {
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 開啟事務
container.setSessionTransacted(true);
container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
container.setDestination(activeMQQueue);
// 指定事務管理器
container.setTransactionManager(jmsTransactionManager);
// 指定訊息監聽器
container.setMessageListener(myListener);
// 併發消費者數量
container.setConcurrentConsumers(jmsConfig.getConcurrentConsumers());
// 最大併發消費者數量
container.setMaxConcurrentConsumers(jmsConfig.getMaxConcurrentConsumers());
// 指定任務執行器執行監聽執行緒
container.setTaskExecutor(threadPoolTaskExecutor);
// 接收時長
container.setReceiveTimeout(jmsConfig.getReceiveTimeout());
return container;
}
/**
* 執行緒池任務執行器配置
* @return
*/
@Bean("threadPoolTaskExecutor")
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(20);
threadPoolTaskExecutor.setMaxPoolSize(100);
threadPoolTaskExecutor.setKeepAliveSeconds(300);
threadPoolTaskExecutor.setQueueCapacity(1000);
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
return threadPoolTaskExecutor;
}
}
@Component
@ConfigurationProperties(prefix = "jms")
public class JmsConfig {
/**
* broker地址
*/
private String brokerUrl;
/**
* 併發連線消費者數
*/
private Integer concurrentConsumers;
/**
* 最大併發連線消費者數
*/
private Integer maxConcurrentConsumers;
/**
* 目的地
*/
private String destinationName;
/**
* 訊息接收時長
*/
private Integer receiveTimeout;
public String getBrokerUrl() {
return brokerUrl;
}
public Integer getConcurrentConsumers() {
return concurrentConsumers;
}
public Integer getMaxConcurrentConsumers() {
return maxConcurrentConsumers;
}
public String getDestinationName() {
return destinationName;
}
public Integer getReceiveTimeout() {
return receiveTimeout;
}
public void setBrokerUrl(String brokerUrl) {
this.brokerUrl = brokerUrl;
}
public void setConcurrentConsumers(Integer concurrentConsumers) {
this.concurrentConsumers = concurrentConsumers;
}
public void setMaxConcurrentConsumers(Integer maxConcurrentConsumers) {
this.maxConcurrentConsumers = maxConcurrentConsumers;
}
public void setDestinationName(String destinationName) {
this.destinationName = destinationName;
}
public void setReceiveTimeout(Integer receiveTimeout) {
this.receiveTimeout = receiveTimeout;
}
@Override
public String toString() {
return "JmsConfig [brokerUrl=" + brokerUrl + ", concurrentConsumers=" + concurrentConsumers
+ ", maxConcurrentConsumers=" + maxConcurrentConsumers + ", destinationName=" + destinationName
+ ", receiveTimeout=" + receiveTimeout + "]";
}
}
#================ jms相關配置 ===============#
#broker地址
jms.brokerURL=tcp://127.0.0.1:61616
#併發消費者數量
jms.concurrentConsumers=1
#最大併發消費者數量
jms.maxConcurrentConsumers=1
#傳送目的地名稱
jms.destinationName=spring_jms_test
#訊息監聽器容器接收時長
jms.receiveTimeout=0
以上為jms.properties屬性檔案內容
3、訊息監聽器配置
@Component
public class MyListener implements MessageListener {
private static Logger logger = LoggerFactory.getLogger(MyListener.class);
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
TextMessage textMessage = TextMessage.class.cast(message);
System.out.println(textMessage.getText());
} catch (JMSException e) {
logger.error("接收訊息失敗:{}", e.getMessage(), e);
}
}
}
}
4、訊息傳送控制器
@Controller
public class JmsController {
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@RequestMapping("sendMessage.action")
@ResponseBody
public Map<String, Object> sendMessage(String textMessage) {
Map<String, Object> resp = new HashMap<>();
if (StringUtils.isBlank(textMessage)) {
resp.put("statusCode", 0);
resp.put("msg", "訊息不能為空!");
return resp;
}
// 非同步傳送訊息
threadPoolTaskExecutor.execute(() -> {
// 實現回撥介面建立訊息
jmsTemplate.send(session -> {
return session.createTextMessage(textMessage);
});
});
resp.put("statusCode", 1);
resp.put("msg", "訊息傳送成功!");
return resp;
}
}
該學習專案github地址:https://github.com/liuyalou1996/activemq_learning