Spring Boot整合RabbitMQ實踐
阿新 • • 發佈:2020-08-01
一、建立生產者服務
1、建立生產者服務 rabbit-producer
spring boot版本為2.1.16.RELEASE
2、pom.xml
引入spring-boot-starter-amqp
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies>
3、配置application.properties
server.servlet.context-path=/ server.port=8002 # 叢集用逗號分割 spring.rabbitmq.addresses=xx.xx.xx.xx:5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ spring.rabbitmq.connection-timeout=15000 # 使用啟動訊息確認模式 spring.rabbitmq.publisher-confirms=true #設定return訊息模式,注意要與mandatory一起去配合使用 #spring.rabbitmq.publisher-returns=true #spring.rabbitmq.template.mandatory=true spring.http.encoding.charset=UTF-8 spring.jackson.date-format=yyyy-MM-dd HH:mm:ss spring.jackson.time-zone=GMT+8 spring.jackson.default-property-inclusion=non_null
4、傳送
@Component public class RabbitSender { @Autowired private RabbitTemplate rabbitTemplate; /** * 這裡就是確認訊息的回撥監聽介面,用於確認訊息是否被broker所收到 */ final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() { /** * @param correlationData 作為一個唯一的標識 * @param ack broker是否落盤成功 * @param cause 失敗的異常資訊 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("訊息ACK結果:" + ack + ",correlationData: " +correlationData.getId()) ; } }; /** * 對外發送訊息的方法 * @param message 具體的訊息內容 * @param properties 額外的附加屬性 * @throws Exception */ public void send(Object message, Map<String,Object> properties) throws Exception{ MessageHeaders mhs = new MessageHeaders(properties); Message<?> msg = MessageBuilder.createMessage(message, mhs); rabbitTemplate.setConfirmCallback(confirmCallback); //指定業務唯一的ID String uuid = UUID.randomUUID().toString(); System.out.println("生成業務唯一Id=" + uuid); CorrelationData correlationData = new CorrelationData(uuid); MessagePostProcessor mpp = new MessagePostProcessor() { @Override public org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message) throws AmqpException { System.out.println("postProcessMessage message: " +message); return message; } }; rabbitTemplate.convertAndSend("myexchange1", "myroutingkey.1", msg, mpp, correlationData); } }
5、測試傳送訊息
@RunWith(SpringRunner.class) @SpringBootTest public class RabbitProducerApplicationTests { @Autowired private RabbitSender rabbitSender; @Test public void testSender() throws Exception { Map<String,Object> properties = new HashMap<>(); properties.put("name","zhangsan"); properties.put("age","18"); rabbitSender.send("hello rabbit", properties); Thread.sleep(10000); } }
二、RabbitMQ消費者服務
1、建立RabbitMQ消費者服務 rabbit-consumer
spring boot版本為2.1.16.RELEASE
2、pom.xml
引入spring-boot-starter-amqp
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies>
3、配置application.properties
server.servlet.context-path=/ server.port=8002 # 叢集用逗號分割 spring.rabbitmq.addresses=xx.xx.xx.xx:5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ spring.rabbitmq.connection-timeout=15000 # 表示消費者消費成功訊息以後需要手工的資訊簽收(ack),預設為auto spring.rabbitmq.listener.simple.acknowledge-mode=manual spring.rabbitmq.listener.simple.concurrency=5 spring.rabbitmq.listener.simple.max-concurrency=10 spring.rabbitmq.listener.simple.prefetch=1 # RabbitListener 相關配置 spring.rabbitmq.listener.order.exchange.name=myexchange1 spring.rabbitmq.listener.order.exchange.durable=true spring.rabbitmq.listener.order.exchange.type=topic spring.rabbitmq.listener.order.exchange.key=myroutingkey.* spring.http.encoding.charset=UTF-8 spring.jackson.date-format=yyyy-MM-dd HH:mm:ss spring.jackson.time-zone=GMT+8 spring.jackson.default-property-inclusion=non_null
4、建立接收者類
import com.rabbitmq.client.Channel; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; /** * @description: 訊息接收者 * @author: * @create: 2020-08-01 09:35 */ @Component public class RabbitReceiver { @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "myqueue1", durable = "true"), exchange = @Exchange(name = "${spring.rabbitmq.listener.order.exchange.name}", durable= "${spring.rabbitmq.listener.order.exchange.durable}", type = "${spring.rabbitmq.listener.order.exchange.type}", ignoreDeclarationExceptions = "true"), key = "${spring.rabbitmq.listener.order.exchange.key}" ) ) @RabbitHandler public void onMessage(Message message, Channel channel) throws Exception{ // 步驟1、收到訊息後進行業務端消費處理 System.out.println("消費訊息" + message.getPayload()); //步驟2、處理成功後,獲取deliveryTag,並進行手工ACK操作,因為配置檔案配置的是手工簽收模式 // spring.rabbitmq.listener.simple.acknowledge-mode=manual Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); channel.basicAck(deliveryTag, false); } }
RabbitListener相關屬性配置在屬性檔案中。
消費者採用手工配置 channel.basicAck