1. 程式人生 > 其它 >RabbitMQ 優先順序佇列

RabbitMQ 優先順序佇列

一、概述

在實際應用場景中,我們推送訊息,希望給訊息設定優先順序,比如說京東雙 11 活動,它希望將訊息優先推送給京東的 vip,而對於非 vip 使用者訊息推送的優先順序就低一些,那麼怎麼實現呢?

其實很簡單,通過優先順序佇列就可以完美解決上述應用場景

二、原理圖

三、編碼

1、applicaiton.yml

spring:
  rabbitmq:
    host: 192.168.59.135
    port: 5672
    username: admin
    password: admin123
    publisher-confirm-type: correlated
    publisher-returns: true
    # 開啟 ack
    listener:
      direct:
        acknowledge-mode: manual
      simple:
        acknowledge-mode: manual #採取手動應答
        #concurrency: 1 # 指定最小的消費者數量
        #max-concurrency: 1 #指定最大的消費者數量
        retry:
          enabled: true # 是否支援重試

2、自定義配置類 PriorityConfig

@Configuration
public class PriorityConfig {
    private static final String PRIORITY_EXCHANGE = "priority_exchange";
    private static final String PRIORITY_QUEUE = "priority_queue";
    private static final String PRIORITY_KEY = "priority";

    // 宣告優先順序交換機(type = direct)
    @Bean(PRIORITY_EXCHANGE)
    public DirectExchange priorityExchange() {
        return ExchangeBuilder.directExchange(PRIORITY_EXCHANGE).durable(true).build();
    }

    // 宣告優先順序佇列
    @Bean(PRIORITY_QUEUE)
    public Queue priorityQueue() {
        /**
         * maxPriority(int maxPriority):設定佇列支援的最大優先順序數量,如果沒有設定,則佇列將不支援訊息優先順序
         * 官方支援的優先順序範圍是 0 ~ 255,超過 255 就會發生報錯,但是一般企業使用的優先順序是 0 ~ 10,如果 maxPriority 設定
         * 的太大,會浪費 cpu 和 記憶體,因為訊息是要在佇列中排隊的,佇列長度太大,排序的過程中會損耗效能
         */
        return QueueBuilder.durable(PRIORITY_QUEUE).maxPriority(10).build();
    }

    // 優先順序佇列繫結優先順序交換機
    @Bean
    public Binding priorityQueueBindingPriorityExchange(@Qualifier(PRIORITY_QUEUE) Queue queue,
                                                        @Qualifier(PRIORITY_EXCHANGE) DirectExchange exchange) {

        return BindingBuilder.bind(queue).to(exchange).with(PRIORITY_KEY);
    }
}

3、釋出確認自定義類 MyConfirmCallback

@Slf4j
@Component
public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
    /**
     * 交換機確認回撥方法
     * 1、Producer 傳送的訊息,交換機確認收到
     * correlationData:儲存訊息回撥 ID 及其它相關的資訊
     * ack:true
     * cause:null
     * <p>
     * 2、Producer 傳送的訊息,交換機沒有收到
     * correlationData:儲存訊息回撥 ID 及其它相關的資訊
     * ack:false
     * cause:交換機沒有收到訊息的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            log.info("交換機已經收到 id 為:{}的訊息", id);
        } else {
            log.info("交換機還未收到 id 為:{}訊息,由於原因:{}", id, cause);
        }
    }

    /**
     * 如果交換機沒有將訊息路由到佇列,會觸發該回調方法
     */
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        log.info("訊息: {} 被伺服器退回--->退回原因: {},交換機是: {},路由key是:{},退回編號是:{}",
                new String(returned.getMessage().getBody()), returned.getReplyText(), returned.getExchange(),
                returned.getRoutingKey(), returned.getReplyCode());
    }
}

4、Producer

@Slf4j
@RestController
public class Producer {
    private static final String PRIORITY_EXCHANGE = "priority_exchange";
    private static final String PRIORITY_QUEUE = "priority_queue";
    private static final String PRIORITY_KEY = "priority";

    // 1、依賴注入 rabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;
    // 2、依賴注入 myConfirmCallback
    @Autowired
    private MyConfirmCallback myConfirmCallback;

    // 3、完成了 1、2 的注入之後再設定 rabbitTemplate 的回撥物件
    @PostConstruct
    public void init() {
        // 訊息成功傳遞給交換機時會觸發 MyConfirmCallback 中的回撥方法 confirm()
        rabbitTemplate.setConfirmCallback(myConfirmCallback);
        // 訊息回退時會觸發 MyConfirmCallback 中的回撥方法 returnedMessage()
        rabbitTemplate.setReturnsCallback(myConfirmCallback);
    }

    @GetMapping("/priority/sendMessage/{msg}")
    public void sendMessage(@PathVariable("msg") String msg) {
        // 設定唯一 ID
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(UUID.randomUUID().toString());

        rabbitTemplate.convertAndSend(PRIORITY_EXCHANGE, PRIORITY_KEY, msg, correlationData);
        log.info("傳送一條未設定優先順序的訊息", msg);

        String msg1 = msg + 0;
        rabbitTemplate.convertAndSend(PRIORITY_EXCHANGE, PRIORITY_KEY, msg1, (message -> {
            message.getMessageProperties().setPriority(0);
            return message;
        }), correlationData);
        log.info("傳送一條優先順序為 0 的訊息", msg1);

        String msg2 = msg + 2;
        rabbitTemplate.convertAndSend(PRIORITY_EXCHANGE, PRIORITY_KEY, msg2, (message -> {
            message.getMessageProperties().setPriority(2);
            return message;
        }), correlationData);
        log.info("傳送一條優先順序為 2 的訊息", msg2);

        String msg3 = msg + 5;
        rabbitTemplate.convertAndSend(PRIORITY_EXCHANGE, PRIORITY_KEY, msg3, (message -> {
            message.getMessageProperties().setPriority(5);
            return message;
        }), correlationData);
        log.info("傳送一條優先順序為 5 的訊息", msg3);

        String msg4 = msg + 10;
        rabbitTemplate.convertAndSend(PRIORITY_EXCHANGE, PRIORITY_KEY, msg4, (message -> {
            message.getMessageProperties().setPriority(10);
            return message;
        }), correlationData);
        log.info("傳送一條優先順序為 10 的訊息", msg4);
    }
}

5、Consumer

@Slf4j
@Component
public class Consumer {
    private static final String PRIORITY_QUEUE = "priority_queue";

    @RabbitListener(queues = {PRIORITY_QUEUE})
    public void receivedMessage(Message message, Channel channel, CorrelationData correlationData) throws IOException {
        try {
            String msg = new String(message.getBody());
            log.info("消費者成功接收到訊息:{}", msg);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            log.info("訊息消費錯誤");
            // 出現異常之後拒絕訊息,並且訊息重新入隊
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }
    }
}

  

四、測試

要讓佇列實現優先順序需要做的事情如下

1、佇列需要設定為優先順序佇列

2、訊息需要設定訊息的優先順序

3、生產者必須先將訊息傳送到佇列中,讓佇列對設定了優先順序的訊息進行排隊

4、1、2、3 完成之後再啟動消費者進行消費即可

要想實現上述功能,我們先將 Consumer 的 @RabbitListener 註解註釋掉,然後啟動 Springboot 專案

瀏覽器傳送請求:http://localhost:8080/priority/sendMessage/小毛毛是最可愛的

訊息傳送完成之後,然後開啟 Consumer 的 @RabbitListener 註解,再次啟動 Springboot 專案

從消費者的消費結果可以看出,優先順序越高的訊息越早被消費,如果未設定訊息的優先順序,那麼該預設的優先順序看起來是 1