1. 程式人生 > 實用技巧 >SpringBoot整合RabbitMQ訊息元件

SpringBoot整合RabbitMQ訊息元件

1、RabbitMQ是一個在AMQP基礎上構建的新一代企業級訊息系統,該元件由Pivotal公司提供,使用ErLang語言開發。

修改pom.xml配置檔案,追加spring-boot-starter-amqp依賴包。

  1 <?xml version="1.0" encoding="UTF-8"?>
  2 <project xmlns="http://maven.apache.org/POM/4.0.0"
  3     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
5 https://maven.apache.org/xsd/maven-4.0.0.xsd"> 6 <modelVersion>4.0.0</modelVersion> 7 <parent> 8 <groupId>org.springframework.boot</groupId> 9 <artifactId>spring-boot-starter-parent</artifactId> 10 <version>2.3.5
.RELEASE</version> 11 <relativePath /> <!-- lookup parent from repository --> 12 </parent> 13 <groupId>com.example</groupId> 14 <artifactId>demo</artifactId> 15 <version>0.0.1-SNAPSHOT</version> 16 <name>demo</name> 17
<description>Demo project for Spring Boot</description> 18 19 <properties> 20 <java.version>1.8</java.version> 21 <maven-jar-plugin.version>3.1.1</maven-jar-plugin.version> 22 </properties> 23 24 <dependencies> 25 <dependency> 26 <groupId>org.springframework.boot</groupId> 27 <artifactId>spring-boot-starter-web</artifactId> 28 </dependency> 29 30 <dependency> 31 <groupId>org.springframework.boot</groupId> 32 <artifactId>spring-boot-starter-test</artifactId> 33 <scope>test</scope> 34 <exclusions> 35 <exclusion> 36 <groupId>org.junit.vintage</groupId> 37 <artifactId>junit-vintage-engine</artifactId> 38 </exclusion> 39 </exclusions> 40 </dependency> 41 42 <!-- mysql驅動包 --> 43 <dependency> 44 <groupId>mysql</groupId> 45 <artifactId>mysql-connector-java</artifactId> 46 </dependency> 47 48 <!-- druid連線池 --> 49 <dependency> 50 <groupId>com.alibaba</groupId> 51 <artifactId>druid</artifactId> 52 <version>1.1.10</version> 53 </dependency> 54 55 <dependency> 56 <groupId>org.springframework.boot</groupId> 57 <artifactId>spring-boot-starter-data-jpa</artifactId> 58 </dependency> 59 <dependency> 60 <groupId>org.springframework.boot</groupId> 61 <artifactId>spring-boot-starter-cache</artifactId> 62 </dependency> 63 <dependency> 64 <groupId>org.hibernate</groupId> 65 <artifactId>hibernate-ehcache</artifactId> 66 </dependency> 67 68 <!-- activeMQ --> 69 <dependency> 70 <groupId>org.springframework.boot</groupId> 71 <artifactId>spring-boot-starter-activemq</artifactId> 72 </dependency> 73 74 <!-- rabbitMQ --> 75 <dependency> 76 <groupId>org.springframework.boot</groupId> 77 <artifactId>spring-boot-starter-amqp</artifactId> 78 </dependency> 79 </dependencies> 80 81 <build> 82 <plugins> 83 <plugin> 84 <groupId>org.springframework.boot</groupId> 85 <artifactId>spring-boot-maven-plugin</artifactId> 86 </plugin> 87 </plugins> 88 <resources> 89 <resource> 90 <directory>src/main/resources</directory> 91 <includes> 92 <include>**/*.properties</include> 93 <include>**/*.yml</include> 94 <include>**/*.xml</include> 95 <include>**/*.p12</include> 96 <include>**/*.html</include> 97 <include>**/*.jpg</include> 98 <include>**/*.png</include> 99 </includes> 100 </resource> 101 </resources> 102 </build> 103 104 </project>

修改yml.xml配置檔案,進行RabbitMQ的相關配置,如下所示:

1 # RabbitMQ服務主機名稱
2 spring.rabbitmq.addresses=192.168.110.133
3 # 使用者名稱
4 spring.rabbitmq.username=admin
5 # 密碼
6 spring.rabbitmq.password=admin
7 # 虛擬主機
8 spring.rabbitmq.virtual-host=/

這裡搞一個訊息生產配置類,用來進行訊息處理,如下所示:

 1 package com.demo.config;
 2 
 3 import org.springframework.amqp.core.Binding;
 4 import org.springframework.amqp.core.BindingBuilder;
 5 import org.springframework.amqp.core.DirectExchange;
 6 import org.springframework.amqp.core.Queue;
 7 import org.springframework.context.annotation.Bean;
 8 import org.springframework.context.annotation.Configuration;
 9 
10 @Configuration
11 public class RabbitMqConfig {
12 
13     public static final String EXCHANGE = "rabbitmq.exchange"; // 交換空間名稱
14     public static final String ROUTINGKEY = "rabbitmq.routingkey"; // 設定路由key
15     public static final String QUEUE_NAME = "rabbitmq.queue"; // 設定佇列名稱
16 
17     /**
18      * 根據路由鍵將佇列和交換機繫結到一起
19      * 
20      * @param exchange
21      * @param queue
22      * @return
23      */
24     @Bean
25     public Binding bindingExchangeQueue(DirectExchange exchange, Queue queue) {
26         Binding binding = BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY);
27         return binding;
28     }
29 
30     /**
31      * 使用直連的模式
32      * 
33      * @return
34      */
35     @Bean
36     public DirectExchange getDirectExchage() {
37         return new DirectExchange(EXCHANGE, true, true);
38     }
39 
40     /**
41      * 佇列訊息
42      * 
43      * @return
44      */
45     @Bean
46     public Queue queue() {
47         return new Queue(QUEUE_NAME);
48     }
49 
50 }

新建訊息業務實現類,用於訊息生產,如下所示:

 1 package com.demo.producer;
 2 
 3 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 4 import org.springframework.beans.factory.annotation.Autowired;
 5 import org.springframework.stereotype.Service;
 6 
 7 import com.demo.config.RabbitMqConfig;
 8 
 9 @Service
10 public class RabbitMqMessageProducer {
11 
12     @Autowired
13     private RabbitTemplate rabbitTemplate;
14 
15     /**
16      * 訊息傳送,將交換機和路由器進行繫結
17      * 
18      * @param msg
19      */
20     public void send(String msg) {
21         this.rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE, RabbitMqConfig.ROUTINGKEY, msg);
22     }
23 
24 }

定義監聽處理類,用於訊息的消費,如下所示:

 1 package com.demo.consumer;
 2 
 3 import org.springframework.amqp.rabbit.annotation.RabbitListener;
 4 import org.springframework.stereotype.Service;
 5 
 6 @Service
 7 public class RabbitMqConsumer {
 8 
 9     /**
10      * 進行訊息的接受處理
11      * 
12      * @param text
13      */
14     @RabbitListener(queues = "rabbitmq.queue")
15     public void receiveMessage(String text) {
16         System.err.println("【*** 接受訊息 ***】 " + text);
17     }
18 
19 }

此時就實現了與RabbitMQ訊息元件的整合,同時在整個程式中只需要呼叫IMessageProducer介面中的send()方法就可以正常傳送,而後會找到設定同樣ROUTINGKEY的消費者進行訊息消費。

 1 package com.demo.controller;
 2 
 3 import org.springframework.beans.factory.annotation.Autowired;
 4 import org.springframework.stereotype.Controller;
 5 import org.springframework.web.bind.annotation.RequestMapping;
 6 import org.springframework.web.bind.annotation.ResponseBody;
 7 
 8 import com.demo.producer.RabbitMqMessageProducer;
 9 
10 @Controller
11 public class RabbitMqController {
12 
13     @Autowired
14     private RabbitMqMessageProducer rabbitMqMessageProducer;
15 
16     @RequestMapping(value = "/messageProducer")
17     @ResponseBody
18     public void findAll() {
19         for (int i = 0; i < 10000; i++) {
20             rabbitMqMessageProducer.send("rabbitMq producer message : " + i);
21         }
22     }
23 
24 }

可以通過http://192.168.110.133:15672/觀察,檢視自己的生產訊息和消費訊息的情況,如下所示: