1. 程式人生 > 實用技巧 >SpringBoot整合ActiveMQ訊息元件

SpringBoot整合ActiveMQ訊息元件

1、ActiveMQ是Apache提供的開源元件,是基於JMS標準的實現元件。利用SpringBoot整合ActiveMQ元件,實現佇列訊息的傳送與接收。修改pom.xml配置檔案,追加spring-boot-starter-activemq依賴庫。

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0"
 3     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4     xsi:schemaLocation="
http://maven.apache.org/POM/4.0.0 5 https://maven.apache.org/xsd/maven-4.0.0.xsd"> 6 <modelVersion>4.0.0</modelVersion> 7 <parent> 8 <groupId>org.springframework.boot</groupId> 9 <artifactId>spring-boot-starter-parent</artifactId> 10 <version>2.3
.5.RELEASE</version> 11 <relativePath /> <!-- lookup parent from repository --> 12 </parent> 13 <groupId>com.example</groupId> 14 <artifactId>demo</artifactId> 15 <version>0.0.1-SNAPSHOT</version> 16 <name>demo</name> 17
<description>Demo project for Spring Boot</description> 18 19 <properties> 20 <java.version>1.8</java.version> 21 <maven-jar-plugin.version>3.1.1</maven-jar-plugin.version> 22 </properties> 23 24 <dependencies> 25 <dependency> 26 <groupId>org.springframework.boot</groupId> 27 <artifactId>spring-boot-starter-web</artifactId> 28 </dependency> 29 30 <dependency> 31 <groupId>org.springframework.boot</groupId> 32 <artifactId>spring-boot-starter-test</artifactId> 33 <scope>test</scope> 34 <exclusions> 35 <exclusion> 36 <groupId>org.junit.vintage</groupId> 37 <artifactId>junit-vintage-engine</artifactId> 38 </exclusion> 39 </exclusions> 40 </dependency> 41 42 <!-- mysql驅動包 --> 43 <dependency> 44 <groupId>mysql</groupId> 45 <artifactId>mysql-connector-java</artifactId> 46 </dependency> 47 48 <!-- druid連線池 --> 49 <dependency> 50 <groupId>com.alibaba</groupId> 51 <artifactId>druid</artifactId> 52 <version>1.1.10</version> 53 </dependency> 54 55 <dependency> 56 <groupId>org.springframework.boot</groupId> 57 <artifactId>spring-boot-starter-data-jpa</artifactId> 58 </dependency> 59 <dependency> 60 <groupId>org.springframework.boot</groupId> 61 <artifactId>spring-boot-starter-cache</artifactId> 62 </dependency> 63 <dependency> 64 <groupId>org.hibernate</groupId> 65 <artifactId>hibernate-ehcache</artifactId> 66 </dependency> 67 68 <!-- activeMQ --> 69 <dependency> 70 <groupId>org.springframework.boot</groupId> 71 <artifactId>spring-boot-starter-activemq</artifactId> 72 </dependency> 73 </dependencies> 74 75 <build> 76 <plugins> 77 <plugin> 78 <groupId>org.springframework.boot</groupId> 79 <artifactId>spring-boot-maven-plugin</artifactId> 80 </plugin> 81 </plugins> 82 <resources> 83 <resource> 84 <directory>src/main/resources</directory> 85 <includes> 86 <include>**/*.properties</include> 87 <include>**/*.yml</include> 88 <include>**/*.xml</include> 89 <include>**/*.p12</include> 90 <include>**/*.html</include> 91 <include>**/*.jpg</include> 92 <include>**/*.png</include> 93 </includes> 94 </resource> 95 </resources> 96 </build> 97 98 </project>

修改application.yml配置檔案,進行ActiveMQ的配置,如下所示:

1 # 配置訊息型別,true表示為topic訊息,false表示Queue訊息
2 spring.jms.pub-sub-domain=false
3 # 連線的使用者名稱
4 spring.activemq.user=admin
5 # 密碼
6 spring.activemq.password=admin
7 # 訊息元件的連線主機資訊
8 spring.activemq.broker-url=tcp://192.168.110.142:61616

定義訊息消費監聽類,如下所示:

 1 package com.demo.consumer;
 2 
 3 import org.springframework.jms.annotation.JmsListener;
 4 import org.springframework.stereotype.Service;
 5 
 6 @Service
 7 public class MessageConsumer {
 8 
 9     /**
10      * 
11      * @param text
12      */
13     @JmsListener(destination = "msg.queue") // 定義訊息監聽佇列
14     public void receiveMessage(String text) {
15         // 進行訊息接受處理
16         System.err.println("【*** 接受訊息 ***】" + text);
17     }
18 }

定義訊息生產者業務類,如下所示:

 1 package com.demo.producer;
 2 
 3 import javax.jms.Queue;
 4 
 5 import org.springframework.beans.factory.annotation.Autowired;
 6 import org.springframework.jms.core.JmsMessagingTemplate;
 7 import org.springframework.stereotype.Service;
 8 
 9 /**
10  * 
11  * @author 訊息傳送
12  *
13  */
14 @Service
15 public class MessageProducer {
16 
17     // 訊息傳送模板
18     @Autowired
19     private JmsMessagingTemplate jmsMessagingTemplate;
20 
21     // 注入佇列
22     @Autowired
23     private Queue queue;
24 
25     /**
26      * 傳送訊息
27      */
28     public void send(String msg) {
29         this.jmsMessagingTemplate.convertAndSend(this.queue, msg);
30     }
31 
32 }

定義JMS訊息傳送配置類,該類主要用於配置佇列資訊,如下所示:

 1 package com.demo.config;
 2 
 3 import javax.jms.Queue;
 4 
 5 import org.apache.activemq.command.ActiveMQQueue;
 6 import org.springframework.context.annotation.Bean;
 7 import org.springframework.context.annotation.Configuration;
 8 import org.springframework.jms.annotation.EnableJms;
 9 
10 @Configuration
11 @EnableJms
12 public class ActiveMqConfig {
13 
14     @Bean
15     public Queue queue() {
16         ActiveMQQueue activeMQQueue = new ActiveMQQueue("msg.queue");
17         return activeMQQueue;
18     }
19 }

使用ActiveMQ實現了訊息的傳送與接收處理。每當有訊息接收到時,都會自動執行MessageConsumer類,進行訊息消費。

 1 package com.demo.controller;
 2 
 3 import org.springframework.beans.factory.annotation.Autowired;
 4 import org.springframework.stereotype.Controller;
 5 import org.springframework.web.bind.annotation.RequestMapping;
 6 import org.springframework.web.bind.annotation.ResponseBody;
 7 
 8 import com.demo.producer.MessageProducer;
 9 
10 @Controller
11 public class ActiveMqController {
12 
13     @Autowired
14     private MessageProducer messageProducer;
15 
16     @RequestMapping(value = "/messageProducer")
17     @ResponseBody
18     public void findAll() {
19         for (int i = 0; i < 10000; i++) {
20             messageProducer.send("active producer message : " + i);
21         }
22     }
23 
24 }

在瀏覽器或者可以執行命令的地方執行,http://127.0.0.1:8080/messageProducer,可以在activemq的監控地址進行觀察http://192.168.110.142:8161/admin/queues.jsp