1. 程式人生 > 實用技巧 >Rabbitmq的死信佇列和延時佇列

Rabbitmq的死信佇列和延時佇列

一、死信佇列

死信佇列其實和普通的佇列沒啥大的區別,都需要建立自己的QueueExchange,然後通過RoutingKey繫結到Exchange上去,只不過死信佇列的RoutingKeyExchange要作為引數,繫結到正常的佇列上去,一種應用場景是正常佇列裡面的訊息被basicNack或者reject時,訊息就會被路由到正常佇列繫結的死信佇列中,還有一種還有常用的場景就是開啟了自動簽收,然後消費者消費訊息時出現異常,超過了重試次數,那麼這條訊息也會進入死信佇列,如果配置了話,當然還有其他的應用場景,這裡不一一討論。

1.1、死信佇列和交換器配置

這裡有兩個佇列,正常的業務佇列emailQueue

和與之繫結的死信佇列,這裡只演示,手動簽收,消費者捕獲異常Nack

1.1.2、yml配置

spring:
 rabbitmq:
    host: 192.168.99.12
    port: 5672
    username: guest
    password: guest
    # 傳送確認
    publisher-confirms: true
    # 路由失敗回撥
    publisher-returns: true
    template:
        # 必須設定成true 訊息路由失敗通知監聽者,false 將訊息丟棄
        mandatory: true
    listener:
      simple:
        # 每次從RabbitMQ獲取的訊息數量
        prefetch: 1
        default-requeue-rejected: false
        # 每個佇列啟動的消費者數量
        concurrency: 1
        # 每個佇列最大的消費者數量
        max-concurrency: 1
        # 簽收模式為手動簽收-那麼需要在程式碼中手動ACK
        acknowledge-mode: manual
#郵件佇列
email:
  queue:
    name: demo.email
	
#郵件交換器名稱
exchange:
  name: demoTopicExchange

#死信佇列
dead:
  letter:
    queue:
      name: demo.dead.letter
    exchange:
      name: demoDeadLetterTopicExchange

1.1.3、死信佇列配置

/**
 * rabbitmq 配置
 *
 * @author DUCHONG
 * @since 2020-08-23 14:05
 **/
@Configuration
@Slf4j
public class RabbitmqConfig {


    @Value("${email.queue.name}")
    private String emailQueue;
    @Value("${exchange.name}")
    private String topicExchange;
    @Value("${dead.letter.queue.name}")
    private String deadLetterQueue;
    @Value("${dead.letter.exchange.name}")
    private String deadLetterExchange;

    @Bean
    public Queue emailQueue() {

        Map<String, Object> arguments = new HashMap<>(2);
        // 繫結死信交換機
        arguments.put("x-dead-letter-exchange", deadLetterExchange);
        // 繫結死信的路由key
        arguments.put("x-dead-letter-routing-key", deadLetterQueue+".#");

        return new Queue(emailQueue,true,false,false,arguments);
    }

	
    @Bean
    TopicExchange emailExchange() {
        return new TopicExchange(topicExchange);
    }


    @Bean
    Binding bindingEmailQueue() {
        return BindingBuilder.bind(emailQueue()).to(emailExchange()).with(emailQueue+".#");
    }

    
    //私信佇列和交換器
    @Bean
    public Queue deadLetterQueue() {
        return new Queue(deadLetterQueue);
    }

    @Bean
    TopicExchange deadLetterExchange() {
        return new TopicExchange(deadLetterExchange);
    }

    @Bean
    Binding bindingDeadLetterQueue() {
        return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(deadLetterQueue+".#");
    }

}

1.2、訊息傳送方

@Configuration
@EnableScheduling
@Slf4j
public class ScheduleController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Value("${exchange.name}")
    private String topicExchange;

    @Scheduled(cron = "0 0/2 * * * ?")
    public void sendEmailMessage() {

        String msg = RandomStringUtils.randomAlphanumeric(8);
        JSONObject email=new JSONObject();
        email.put("content",msg);
        email.put("to","[email protected]");
        CorrelationData correlationData=new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(topicExchange,"demo.email.x",email.toJSONString(),correlationData);
        log.info("---傳送 email 訊息---{}---messageId---{}",email,correlationData.getId());
    }


}

1.3、訊息消費方

@Component
@Slf4j
public class MessageHandler {

    
   /**
     * 郵件消費者
     * @param message
     * @param channel
     * @param headers
     * @throws IOException
     */
    @RabbitListener(queues ="demo.email")
    @RabbitHandler
    public void handleEmailMessage(Message message, Channel channel, @Headers Map<String,Object> headers) throws IOException {

        try {

            String msg=new String(message.getBody(), CharEncoding.UTF_8);
            JSONObject jsonObject = JSON.parseObject(msg);
            jsonObject.put("messageId",headers.get("spring_returned_message_correlation"));
            log.info("---接受到訊息---{}",jsonObject);
			//主動異常
			int m=1/0;
            //手動簽收
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }
        catch (Exception e) {
            log.info("handleEmailMessage捕獲到異常,拒絕重新入隊---訊息ID---{}",headers.get("spring_returned_message_correlation"));
            //異常,ture 重新入隊,或者false,進入死信佇列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);

        }
    }

    /**
     * 死信消費者,自動簽收開啟狀態下,超過重試次數,或者手動簽收,reject或者Nack
     * @param message
     */
    @RabbitListener(queues = "demo.dead.letter")
    public void handleDeadLetterMessage(Message message, Channel channel,@Headers Map<String,Object> headers) throws IOException {

        //可以考慮資料庫記錄,每次進來查數量,達到一定的數量,進行預警,人工介入處理
        log.info("接收到死信訊息:---{}---訊息ID---{}", new String(message.getBody()),headers.get("spring_returned_message_correlation"));
		//回覆ack
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }

 
}

1.4、結果

二、延時佇列

延時佇列顧名思義,不是及時的佇列,也就是傳送者發給的訊息要延時一段時間,消費者才能接受的到,這裡有個典型的應用場景就是訂單30分鐘內未支付就關閉訂單,當然死信佇列也是可以實現的,這裡只演示訊息的延時消費邏輯,訂單邏輯就一個判斷,這裡不做討論。

2.1、延時佇列和交換器配置

使用延時佇列之前,需要先安裝延時佇列外掛,安裝方法,前面已經介紹過了,這裡放個連結

延時佇列外掛安裝

2.1.1、yml配置

spring:
    rabbitmq:
        host: 192.168.99.12
        port: 5672
        username: guest
        password: guest
        # 傳送確認
        publisher-confirms: true
        # 路由失敗回撥
        publisher-returns: true
        template:
            # 必須設定成true 訊息路由失敗通知監聽者,false 將訊息丟棄
            mandatory: true
        #消費端
        listener:
            simple:
                # 每次從RabbitMQ獲取的訊息數量
                prefetch: 1
                default-requeue-rejected: false
                # 每個佇列啟動的消費者數量
                concurrency: 1
                # 每個佇列最大的消費者數量
                max-concurrency: 1
                # 簽收模式為手動簽收-那麼需要在程式碼中手動ACK
                acknowledge-mode: manual
#郵件佇列
email:
    queue:
        name: demo.email

#郵件交換器名稱
exchange:
    name: demoTopicExchange

#死信佇列
dead:
    letter:
        queue:
            name: demo.dead.letter
        exchange:
            name: demoDeadLetterTopicExchange

#延時佇列
delay:
    queue:
        name: demo.delay
    exchange:
        name: demoDelayTopicExchange

2.1.2、延時佇列配置

/**
 * rabbitmq 配置
 *
 * @author DUCHONG
 * @since 2020-08-23 14:05
 **/
@Configuration
@Slf4j
public class RabbitmqConfig {


    @Value("${email.queue.name}")
    private String emailQueue;
    @Value("${exchange.name}")
    private String topicExchange;
    @Value("${dead.letter.queue.name}")
    private String deadLetterQueue;
    @Value("${dead.letter.exchange.name}")
    private String deadLetterExchange;
    @Value("${delay.queue.name}")
    private String delayQueue;
    @Value("${delay.exchange.name}")
    private String delayExchange;

    @Bean
    public Queue emailQueue() {

        Map<String, Object> arguments = new HashMap<>(2);
        // 繫結死信交換機
        arguments.put("x-dead-letter-exchange", deadLetterExchange);
        // 繫結死信的路由key
        arguments.put("x-dead-letter-routing-key", deadLetterQueue+".#");

        return new Queue(emailQueue,true,false,false,arguments);
    }


    @Bean
    TopicExchange emailExchange() {
        return new TopicExchange(topicExchange);
    }


    @Bean
    Binding bindingEmailQueue() {
        return BindingBuilder.bind(emailQueue()).to(emailExchange()).with(emailQueue+".#");
    }


    //私信佇列和交換器
    @Bean
    public Queue deadLetterQueue() {
        return new Queue(deadLetterQueue);
    }

    @Bean
    TopicExchange deadLetterExchange() {
        return new TopicExchange(deadLetterExchange);
    }

    @Bean
    Binding bindingDeadLetterQueue() {
        return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(deadLetterQueue+".#");
    }
    //延時佇列
    @Bean
    public Queue delayQueue() {
        return new Queue(delayQueue);
    }

    @Bean
    CustomExchange delayExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "topic");
        //引數二為型別:必須是x-delayed-message
        return new CustomExchange(delayExchange, "x-delayed-message", true, false, args);

    }

    @Bean
    Binding bindingDelayQueue() {
        return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(delayQueue+".#").noargs();
    }
}

2.2、訊息傳送方

30分鐘時間太久了,這裡延時2分鐘來看效果

@Configuration
@EnableScheduling
@Slf4j
public class ScheduleController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Value("${exchange.name}")
    private String topicExchange;

    @Value("${delay.exchange.name}")
    private String delayTopicExchange;

    @Scheduled(cron = "0 0/1 * * * ?")
    public void sendEmailMessage() {

        String msg = RandomStringUtils.randomAlphanumeric(8);
        JSONObject email=new JSONObject();
        email.put("content",msg);
        email.put("to","[email protected]");
        CorrelationData correlationData=new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(topicExchange,"demo.email.x",email.toJSONString(),correlationData);
        log.info("---傳送 email 訊息---{}---messageId---{}",email,correlationData.getId());
    }


    @Scheduled(cron = "0 0/1 * * * ?")
    public void sendDelayOrderMessage() throws Exception{

        //訂單號 id實際是儲存訂單後返回的,這裡用uuid代替
        String orderId = UUID.randomUUID().toString();
        // 模擬訂單資訊
        JSONObject order=new JSONObject();
        order.put("orderId",orderId);
        order.put("goodsName","vip充值");
        order.put("orderAmount","99.00");
        CorrelationData correlationData=new CorrelationData(orderId);
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setMessageId(orderId);
        //30分鐘時間太長,這裡延時120s消費
        messageProperties.setHeader("x-delay", 120000);
        Message message = new Message(order.toJSONString().getBytes(CharEncoding.UTF_8), messageProperties);

        rabbitTemplate.convertAndSend(delayTopicExchange,"demo.delay.x",message,correlationData);

        log.info("---傳送 order 訊息---{}---orderId---{}",order,correlationData.getId());
        //睡一會,為了看延遲效果
        TimeUnit.MINUTES.sleep(10);
    }
}

2.3、訊息消費方

@Component
@Slf4j
public class MessageHandler {


    /**
     * 郵件傳送
     * @param message
     * @param channel
     * @param headers
     * @throws IOException
     */
    @RabbitListener(queues ="demo.email")
    @RabbitHandler
    public void handleEmailMessage(Message message, Channel channel, @Headers Map<String,Object> headers) throws IOException {

        try {

            String msg=new String(message.getBody(), CharEncoding.UTF_8);
            JSONObject jsonObject = JSON.parseObject(msg);
            jsonObject.put("messageId",headers.get("spring_returned_message_correlation"));
            log.info("---接受到訊息---{}",jsonObject);
			//主動異常
			int m=1/0;
            //手動簽收
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }
        catch (Exception e) {
            log.info("handleEmailMessage捕獲到異常,拒絕重新入隊---訊息ID---{}", headers.get("spring_returned_message_correlation"));
            //異常,ture 重新入隊,或者false,進入死信佇列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);

        }
    }

    /**
     * 死信消費者,自動簽收開啟狀態下,超過重試次數,或者手動簽收,reject或者Nack
     * @param message
     */
    @RabbitListener(queues = "demo.dead.letter")
    public void handleDeadLetterMessage(Message message, Channel channel,@Headers Map<String,Object> headers) throws IOException {

        //可以考慮資料庫記錄,每次進來查數量,達到一定的數量,進行預警,人工介入處理
        log.info("接收到死信訊息:---{}---訊息ID---{}", new String(message.getBody()),headers.get("spring_returned_message_correlation"));
		//回覆ack
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }

    /**
     * 延時佇列消費
     * @param message
     * @param channel
     * @param headers
     * @throws IOException
     */
    @RabbitListener(queues ="demo.delay")
    @RabbitHandler
    public void handleOrderDelayMessage(Message message, Channel channel, @Headers Map<String,Object> headers) throws IOException {

        try {

            String msg=new String(message.getBody(), CharEncoding.UTF_8);
            JSONObject jsonObject = JSON.parseObject(msg);
            log.info("---接受到訂單訊息---orderId---{}",message.getMessageProperties().getMessageId());
            log.info("---訂單資訊---order---{}",jsonObject);
            //業務邏輯,根據訂單id獲取訂單資訊,如果還未支付,設定關閉狀態,如果已支付,不做任何處理
            //手動簽收
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }
        catch (Exception e) {
            log.info("handleOrderDelayMessage捕獲到異常,重新入隊---orderId---{}", headers.get("spring_returned_message_correlation"));
            //異常,ture 重新入隊,或者false,進入死信佇列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);

        }
    }

}

2.4、結果

執行結果顯示,同一個訂單號的訊息,傳送過後2分鐘,消費者才接受到,符合預期。