第5篇 RabbitMQ整合SpringBoot實現Direct模式
阿新 • • 發佈:2019-01-07
直接程式碼 專案結構 pom需要增加對RabbitM的支援
Pom檔案如下
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.haibo</groupId> <artifactId>spring-rabbit-hello</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>spring-rabbit-hello</name> <description>Demo project for Spring Boot</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.3.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent><properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> <repositories> <repository> <id>nexus</id> <name>nexus</name> <url>http://localhost:8081/nexus/content/groups/public/</url> </repository> </repositories> </project>
package com.haibo; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * rabbitmq 的配置類 * 趙海波 on 2017/6/6. */ @Configuration public class RabbitMQConfig10 { /** * 訊息交換機的名字 */ public static final String EXCHANGE = "my-mq-exchange"; /** * 佇列key1 */ public static final String ROUTINGKEY1 = "queue_one_key1"; /** * 佇列key2 */ public static final String ROUTINGKEY2 = "queue_one_key2"; /** * 配置連結資訊 * @return */ @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses("139.199.11.69:5672"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setVirtualHost("/"); connectionFactory.setPublisherConfirms(true); // 必須要設定 return connectionFactory; } /** * 配置訊息交換機 * 針對消費者配置 * FanoutExchange: 將訊息分發到所有的繫結佇列,無routingkey的概念 * HeadersExchange :通過新增屬性key-value匹配 * DirectExchange:按照routingkey分發到指定佇列 * TopicExchange:多關鍵字匹配 */ @Bean public DirectExchange defaultExchange() { return new DirectExchange(EXCHANGE, true, false); } /** * 配置訊息佇列1 * 針對消費者配置 * * @return */ @Bean public Queue queue() { return new Queue("queue_one", true); //佇列持久 } /** * 將訊息佇列1與交換機繫結 * 針對消費者配置 * * @return */ @Bean public Binding binding() { return BindingBuilder.bind(queue()).to(defaultExchange()).with(RabbitMQConfig10.ROUTINGKEY1); } /** * 配置訊息佇列2 * 針對消費者配置 * * @return */ @Bean public Queue queue1() { return new Queue("queue_one1", true); //佇列持久 } /** * 將訊息佇列2與交換機繫結 * 針對消費者配置 * * @return */ @Bean public Binding binding1() { return BindingBuilder.bind(queue1()).to(defaultExchange()).with(RabbitMQConfig10.ROUTINGKEY2); } /** * 接受訊息的監聽,這個監聽會接受訊息佇列1的訊息 * 針對消費者配置 * * @return */ @Bean public SimpleMessageListenerContainer messageContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory()); container.setQueues(queue()); container.setExposeListenerChannel(true); container.setMaxConcurrentConsumers(1); container.setConcurrentConsumers(1); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設定確認模式手工確認 container.setMessageListener(new ChannelAwareMessageListener() { public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception { byte[] body = message.getBody(); System.out.println("收到訊息 : " + new String(body)); channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); //確認訊息成功消費 } }); return container; } /** * @return */ @Bean public SimpleMessageListenerContainer messageContainer2() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory()); container.setQueues(queue1()); container.setExposeListenerChannel(true); container.setMaxConcurrentConsumers(1); container.setConcurrentConsumers(1); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設定確認模式手工確認 container.setMessageListener(new ChannelAwareMessageListener() { public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception { byte[] body = message.getBody(); System.out.println("queue1 收到訊息 : " + new String(body)); channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); //確認訊息成功消費 } }); return container; } @Bean public SimpleMessageListenerContainer messageContainer3() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory()); container.setQueues(queue1()); container.setExposeListenerChannel(true); container.setMaxConcurrentConsumers(1); container.setConcurrentConsumers(1); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設定確認模式手工確認 container.setMessageListener(new ChannelAwareMessageListener() { public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception { byte[] body = message.getBody(); System.out.println("========queue2 收到訊息 : " + new String(body)); channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); //確認訊息成功消費 } }); return container; } public static void main(String[] args) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(new RabbitMQConfig10().connectionFactory()); container.setQueues(new RabbitMQConfig10().queue1()); container.setExposeListenerChannel(true); container.setMaxConcurrentConsumers(1); container.setConcurrentConsumers(1); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設定確認模式手工確認 container.setMessageListener(new ChannelAwareMessageListener() { public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception { byte[] body = message.getBody(); System.out.println("queue1 收到訊息 : " + new String(body)); channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); //確認訊息成功消費 } }); } }
生產者類如下
package com.haibo; import java.util.UUID; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * 測試RabbitMQ傳送訊息的Controller */ @RestController public class SendController10 implementsRabbitTemplate.ConfirmCallback{ private RabbitTemplate rabbitTemplate; /** * 配置傳送訊息的rabbitTemplate,因為是構造方法,所以不用註解Spring也會自動注入(應該是新版本的特性) * @param rabbitTemplate */ public SendController10(RabbitTemplate rabbitTemplate){ this.rabbitTemplate = rabbitTemplate; //設定消費回撥 this.rabbitTemplate.setConfirmCallback(this); } /** * 向訊息佇列1中傳送訊息 * @param msg * @return */ @RequestMapping("send1") public String send1(String msg){ String uuid = UUID.randomUUID().toString(); CorrelationData correlationId = new CorrelationData(uuid); rabbitTemplate.convertAndSend(RabbitMQConfig10.EXCHANGE, RabbitMQConfig10.ROUTINGKEY1, msg, correlationId); return null; } /** * 向訊息佇列2中傳送訊息 * @param msg * @return */ @RequestMapping("send2") public String send2(String msg){ String uuid = UUID.randomUUID().toString(); CorrelationData correlationId = new CorrelationData(uuid); rabbitTemplate.convertAndSend(RabbitMQConfig10.EXCHANGE, RabbitMQConfig10.ROUTINGKEY2, msg, correlationId); return null; } /** * 訊息的回撥,主要是實現RabbitTemplate.ConfirmCallback介面 * 注意,訊息回撥只能代表成功訊息傳送到RabbitMQ伺服器,不能代表訊息被成功處理和接受 */ public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println(" 回撥id:" + correlationData); if (ack) { System.out.println("訊息成功消費"); } else { System.out.println("訊息消費失敗:" + cause+"\n重新發送"); } } }
測試如下
http://localhost:8000/send1?msg=123
http://localhost:8000/send2?msg=123