SpringBoot整合RabbitMQ訊息元件
阿新 • • 發佈:2020-11-25
1、RabbitMQ是一個在AMQP基礎上構建的新一代企業級訊息系統,該元件由Pivotal公司提供,使用ErLang語言開發。
修改pom.xml配置檔案,追加spring-boot-starter-amqp依賴包。
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.05 https://maven.apache.org/xsd/maven-4.0.0.xsd"> 6 <modelVersion>4.0.0</modelVersion> 7 <parent> 8 <groupId>org.springframework.boot</groupId> 9 <artifactId>spring-boot-starter-parent</artifactId> 10 <version>2.3.5.RELEASE</version> 11 <relativePath /> <!-- lookup parent from repository --> 12 </parent> 13 <groupId>com.example</groupId> 14 <artifactId>demo</artifactId> 15 <version>0.0.1-SNAPSHOT</version> 16 <name>demo</name> 17<description>Demo project for Spring Boot</description> 18 19 <properties> 20 <java.version>1.8</java.version> 21 <maven-jar-plugin.version>3.1.1</maven-jar-plugin.version> 22 </properties> 23 24 <dependencies> 25 <dependency> 26 <groupId>org.springframework.boot</groupId> 27 <artifactId>spring-boot-starter-web</artifactId> 28 </dependency> 29 30 <dependency> 31 <groupId>org.springframework.boot</groupId> 32 <artifactId>spring-boot-starter-test</artifactId> 33 <scope>test</scope> 34 <exclusions> 35 <exclusion> 36 <groupId>org.junit.vintage</groupId> 37 <artifactId>junit-vintage-engine</artifactId> 38 </exclusion> 39 </exclusions> 40 </dependency> 41 42 <!-- mysql驅動包 --> 43 <dependency> 44 <groupId>mysql</groupId> 45 <artifactId>mysql-connector-java</artifactId> 46 </dependency> 47 48 <!-- druid連線池 --> 49 <dependency> 50 <groupId>com.alibaba</groupId> 51 <artifactId>druid</artifactId> 52 <version>1.1.10</version> 53 </dependency> 54 55 <dependency> 56 <groupId>org.springframework.boot</groupId> 57 <artifactId>spring-boot-starter-data-jpa</artifactId> 58 </dependency> 59 <dependency> 60 <groupId>org.springframework.boot</groupId> 61 <artifactId>spring-boot-starter-cache</artifactId> 62 </dependency> 63 <dependency> 64 <groupId>org.hibernate</groupId> 65 <artifactId>hibernate-ehcache</artifactId> 66 </dependency> 67 68 <!-- activeMQ --> 69 <dependency> 70 <groupId>org.springframework.boot</groupId> 71 <artifactId>spring-boot-starter-activemq</artifactId> 72 </dependency> 73 74 <!-- rabbitMQ --> 75 <dependency> 76 <groupId>org.springframework.boot</groupId> 77 <artifactId>spring-boot-starter-amqp</artifactId> 78 </dependency> 79 </dependencies> 80 81 <build> 82 <plugins> 83 <plugin> 84 <groupId>org.springframework.boot</groupId> 85 <artifactId>spring-boot-maven-plugin</artifactId> 86 </plugin> 87 </plugins> 88 <resources> 89 <resource> 90 <directory>src/main/resources</directory> 91 <includes> 92 <include>**/*.properties</include> 93 <include>**/*.yml</include> 94 <include>**/*.xml</include> 95 <include>**/*.p12</include> 96 <include>**/*.html</include> 97 <include>**/*.jpg</include> 98 <include>**/*.png</include> 99 </includes> 100 </resource> 101 </resources> 102 </build> 103 104 </project>
修改yml.xml配置檔案,進行RabbitMQ的相關配置,如下所示:
1 # RabbitMQ服務主機名稱 2 spring.rabbitmq.addresses=192.168.110.133 3 # 使用者名稱 4 spring.rabbitmq.username=admin 5 # 密碼 6 spring.rabbitmq.password=admin 7 # 虛擬主機 8 spring.rabbitmq.virtual-host=/
這裡搞一個訊息生產配置類,用來進行訊息處理,如下所示:
1 package com.demo.config; 2 3 import org.springframework.amqp.core.Binding; 4 import org.springframework.amqp.core.BindingBuilder; 5 import org.springframework.amqp.core.DirectExchange; 6 import org.springframework.amqp.core.Queue; 7 import org.springframework.context.annotation.Bean; 8 import org.springframework.context.annotation.Configuration; 9 10 @Configuration 11 public class RabbitMqConfig { 12 13 public static final String EXCHANGE = "rabbitmq.exchange"; // 交換空間名稱 14 public static final String ROUTINGKEY = "rabbitmq.routingkey"; // 設定路由key 15 public static final String QUEUE_NAME = "rabbitmq.queue"; // 設定佇列名稱 16 17 /** 18 * 根據路由鍵將佇列和交換機繫結到一起 19 * 20 * @param exchange 21 * @param queue 22 * @return 23 */ 24 @Bean 25 public Binding bindingExchangeQueue(DirectExchange exchange, Queue queue) { 26 Binding binding = BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY); 27 return binding; 28 } 29 30 /** 31 * 使用直連的模式 32 * 33 * @return 34 */ 35 @Bean 36 public DirectExchange getDirectExchage() { 37 return new DirectExchange(EXCHANGE, true, true); 38 } 39 40 /** 41 * 佇列訊息 42 * 43 * @return 44 */ 45 @Bean 46 public Queue queue() { 47 return new Queue(QUEUE_NAME); 48 } 49 50 }
新建訊息業務實現類,用於訊息生產,如下所示:
1 package com.demo.producer; 2 3 import org.springframework.amqp.rabbit.core.RabbitTemplate; 4 import org.springframework.beans.factory.annotation.Autowired; 5 import org.springframework.stereotype.Service; 6 7 import com.demo.config.RabbitMqConfig; 8 9 @Service 10 public class RabbitMqMessageProducer { 11 12 @Autowired 13 private RabbitTemplate rabbitTemplate; 14 15 /** 16 * 訊息傳送,將交換機和路由器進行繫結 17 * 18 * @param msg 19 */ 20 public void send(String msg) { 21 this.rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE, RabbitMqConfig.ROUTINGKEY, msg); 22 } 23 24 }
定義監聽處理類,用於訊息的消費,如下所示:
1 package com.demo.consumer; 2 3 import org.springframework.amqp.rabbit.annotation.RabbitListener; 4 import org.springframework.stereotype.Service; 5 6 @Service 7 public class RabbitMqConsumer { 8 9 /** 10 * 進行訊息的接受處理 11 * 12 * @param text 13 */ 14 @RabbitListener(queues = "rabbitmq.queue") 15 public void receiveMessage(String text) { 16 System.err.println("【*** 接受訊息 ***】 " + text); 17 } 18 19 }
此時就實現了與RabbitMQ訊息元件的整合,同時在整個程式中只需要呼叫IMessageProducer介面中的send()方法就可以正常傳送,而後會找到設定同樣ROUTINGKEY的消費者進行訊息消費。
1 package com.demo.controller; 2 3 import org.springframework.beans.factory.annotation.Autowired; 4 import org.springframework.stereotype.Controller; 5 import org.springframework.web.bind.annotation.RequestMapping; 6 import org.springframework.web.bind.annotation.ResponseBody; 7 8 import com.demo.producer.RabbitMqMessageProducer; 9 10 @Controller 11 public class RabbitMqController { 12 13 @Autowired 14 private RabbitMqMessageProducer rabbitMqMessageProducer; 15 16 @RequestMapping(value = "/messageProducer") 17 @ResponseBody 18 public void findAll() { 19 for (int i = 0; i < 10000; i++) { 20 rabbitMqMessageProducer.send("rabbitMq producer message : " + i); 21 } 22 } 23 24 }
可以通過http://192.168.110.133:15672/觀察,檢視自己的生產訊息和消費訊息的情況,如下所示: