1. 程式人生 > 其它 >springboot整合rabbitmq利用延遲外掛傳送延時訊息

springboot整合rabbitmq利用延遲外掛傳送延時訊息

技術標籤:訊息佇列高階特性篇佇列rabbitmqspring boot交換機

1.需求

利用rabbitmq的死信交換機構建延時佇列,在容器啟動的時候往死信交換機發送一條訊息,一分鐘後監聽者接收到訊息並消費。

2.依賴

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>

3.配置檔案

spring.rabbitmq.host=121.199.31.160
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
#預設情況下訊息消費者是自動確認訊息的,如果要手動確認訊息則需要修改確認模式為manual
spring.rabbitmq.listener.simple.cknowledge-mode=manual
# 消費者每次從佇列獲取的訊息數量。此屬性當不設定時為:輪詢分發,設定為1為:公平分發
spring.rabbitmq.listener.simple.prefetch=1

4.編寫配置類

/**
 * @author yhd
 * @createtime 2021/1/22 14:37
 */
@SpringBootApplication
public class DelayConfig {

	//延時交換機
    public static final String EXCHANGE_DIRECT_ORDER_CANCEL = "spring.boot.test.delay.exchange";
    //路由鍵
    public static final String ROUTING_ORDER_CANCEL = "spring.boot.test.delay.routing"
; //延遲佇列 public static final String QUEUE_ORDER_CANCEL = "spring.boot.test.delay.queue"; // 延遲時間 單位:秒 public static final int DELAY_TIME = 60; @Bean //宣告死信佇列 public Queue delayQueue() { // 第一個引數是建立的queue的名字,第二個引數是是否支援持久化 return new Queue(QUEUE_ORDER_CANCEL,
true); } @Bean //宣告私信交換機 public CustomExchange delayExchange() { Map<String, Object> args = new HashMap<String, Object>(); args.put("x-delayed-type", "direct"); return new CustomExchange(EXCHANGE_DIRECT_ORDER_CANCEL, "x-delayed-message", true, false, args); } @Bean //死信交換機繫結死信佇列並設定路由鍵 public Binding bindingDelay() { return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(ROUTING_ORDER_CANCEL).noargs(); } }

5.編寫訊息傳送方和接收方

/**
 * @author yhd
 * @createtime 2021/1/22 14:34
 * 測試springboot整合mq利用死信佇列傳送訊息並接收
 */
@Component
@Slf4j
public class SpringBootDelayQueueTest {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Resource
    private AmqpTemplate amqpTemplate;

    /**
     * 傳送訊息
     */
    public void sendMessage() {
        amqpTemplate.convertAndSend(
                DelayConfig.EXCHANGE_DIRECT_ORDER_CANCEL,DelayConfig.ROUTING_ORDER_CANCEL,
                "try send message to delay queue !",
                msg -> {
                    msg.getMessageProperties().setDelay(DelayConfig.DELAY_TIME * 1000);
                    return msg;
                });

    }

    /**
     * 接收訊息
     */
    @RabbitListener(queues = DelayConfig.QUEUE_ORDER_CANCEL)
    public void receiveMessage(String msg, Message message, Channel channel) throws Exception {
        log.info("the delaty queue received message : {}", msg);
        //log.info("the delaty queue received message : {}", new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

6.容器啟動傳送訊息

/**
 * @author yhd
 * @createtime 2021/1/22 14:56
 */
@Component
@Slf4j
public class BootMq implements ApplicationRunner {

    @Resource
    private SpringBootDelayQueueTest springBootDelayQueueTest;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        log.info("傳送私信");
        springBootDelayQueueTest.sendMessage();
    }
}

7.結果

在這裡插入圖片描述