RabbitMQ 優先順序佇列
阿新 • • 發佈:2021-11-18
一、概述
在實際應用場景中,我們推送訊息,希望給訊息設定優先順序,比如說京東雙 11 活動,它希望將訊息優先推送給京東的 vip,而對於非 vip 使用者訊息推送的優先順序就低一些,那麼怎麼實現呢?
其實很簡單,通過優先順序佇列就可以完美解決上述應用場景
二、原理圖
三、編碼
1、applicaiton.yml
spring: rabbitmq: host: 192.168.59.135 port: 5672 username: admin password: admin123 publisher-confirm-type: correlated publisher-returns: true # 開啟 ack listener: direct: acknowledge-mode: manual simple: acknowledge-mode: manual #採取手動應答 #concurrency: 1 # 指定最小的消費者數量 #max-concurrency: 1 #指定最大的消費者數量 retry: enabled: true # 是否支援重試
2、自定義配置類 PriorityConfig
@Configuration public class PriorityConfig { private static final String PRIORITY_EXCHANGE = "priority_exchange"; private static final String PRIORITY_QUEUE = "priority_queue"; private static final String PRIORITY_KEY = "priority"; // 宣告優先順序交換機(type = direct) @Bean(PRIORITY_EXCHANGE) public DirectExchange priorityExchange() { return ExchangeBuilder.directExchange(PRIORITY_EXCHANGE).durable(true).build(); } // 宣告優先順序佇列 @Bean(PRIORITY_QUEUE) public Queue priorityQueue() { /** * maxPriority(int maxPriority):設定佇列支援的最大優先順序數量,如果沒有設定,則佇列將不支援訊息優先順序 * 官方支援的優先順序範圍是 0 ~ 255,超過 255 就會發生報錯,但是一般企業使用的優先順序是 0 ~ 10,如果 maxPriority 設定 * 的太大,會浪費 cpu 和 記憶體,因為訊息是要在佇列中排隊的,佇列長度太大,排序的過程中會損耗效能 */ return QueueBuilder.durable(PRIORITY_QUEUE).maxPriority(10).build(); } // 優先順序佇列繫結優先順序交換機 @Bean public Binding priorityQueueBindingPriorityExchange(@Qualifier(PRIORITY_QUEUE) Queue queue, @Qualifier(PRIORITY_EXCHANGE) DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(PRIORITY_KEY); } }
3、釋出確認自定義類 MyConfirmCallback
@Slf4j @Component public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback { /** * 交換機確認回撥方法 * 1、Producer 傳送的訊息,交換機確認收到 * correlationData:儲存訊息回撥 ID 及其它相關的資訊 * ack:true * cause:null * <p> * 2、Producer 傳送的訊息,交換機沒有收到 * correlationData:儲存訊息回撥 ID 及其它相關的資訊 * ack:false * cause:交換機沒有收到訊息的原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String id = correlationData != null ? correlationData.getId() : ""; if (ack) { log.info("交換機已經收到 id 為:{}的訊息", id); } else { log.info("交換機還未收到 id 為:{}訊息,由於原因:{}", id, cause); } } /** * 如果交換機沒有將訊息路由到佇列,會觸發該回調方法 */ @Override public void returnedMessage(ReturnedMessage returned) { log.info("訊息: {} 被伺服器退回--->退回原因: {},交換機是: {},路由key是:{},退回編號是:{}", new String(returned.getMessage().getBody()), returned.getReplyText(), returned.getExchange(), returned.getRoutingKey(), returned.getReplyCode()); } }
4、Producer
@Slf4j
@RestController
public class Producer {
private static final String PRIORITY_EXCHANGE = "priority_exchange";
private static final String PRIORITY_QUEUE = "priority_queue";
private static final String PRIORITY_KEY = "priority";
// 1、依賴注入 rabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
// 2、依賴注入 myConfirmCallback
@Autowired
private MyConfirmCallback myConfirmCallback;
// 3、完成了 1、2 的注入之後再設定 rabbitTemplate 的回撥物件
@PostConstruct
public void init() {
// 訊息成功傳遞給交換機時會觸發 MyConfirmCallback 中的回撥方法 confirm()
rabbitTemplate.setConfirmCallback(myConfirmCallback);
// 訊息回退時會觸發 MyConfirmCallback 中的回撥方法 returnedMessage()
rabbitTemplate.setReturnsCallback(myConfirmCallback);
}
@GetMapping("/priority/sendMessage/{msg}")
public void sendMessage(@PathVariable("msg") String msg) {
// 設定唯一 ID
CorrelationData correlationData = new CorrelationData();
correlationData.setId(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(PRIORITY_EXCHANGE, PRIORITY_KEY, msg, correlationData);
log.info("傳送一條未設定優先順序的訊息", msg);
String msg1 = msg + 0;
rabbitTemplate.convertAndSend(PRIORITY_EXCHANGE, PRIORITY_KEY, msg1, (message -> {
message.getMessageProperties().setPriority(0);
return message;
}), correlationData);
log.info("傳送一條優先順序為 0 的訊息", msg1);
String msg2 = msg + 2;
rabbitTemplate.convertAndSend(PRIORITY_EXCHANGE, PRIORITY_KEY, msg2, (message -> {
message.getMessageProperties().setPriority(2);
return message;
}), correlationData);
log.info("傳送一條優先順序為 2 的訊息", msg2);
String msg3 = msg + 5;
rabbitTemplate.convertAndSend(PRIORITY_EXCHANGE, PRIORITY_KEY, msg3, (message -> {
message.getMessageProperties().setPriority(5);
return message;
}), correlationData);
log.info("傳送一條優先順序為 5 的訊息", msg3);
String msg4 = msg + 10;
rabbitTemplate.convertAndSend(PRIORITY_EXCHANGE, PRIORITY_KEY, msg4, (message -> {
message.getMessageProperties().setPriority(10);
return message;
}), correlationData);
log.info("傳送一條優先順序為 10 的訊息", msg4);
}
}
5、Consumer
@Slf4j
@Component
public class Consumer {
private static final String PRIORITY_QUEUE = "priority_queue";
@RabbitListener(queues = {PRIORITY_QUEUE})
public void receivedMessage(Message message, Channel channel, CorrelationData correlationData) throws IOException {
try {
String msg = new String(message.getBody());
log.info("消費者成功接收到訊息:{}", msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
log.info("訊息消費錯誤");
// 出現異常之後拒絕訊息,並且訊息重新入隊
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
}
四、測試
要讓佇列實現優先順序需要做的事情如下
1、佇列需要設定為優先順序佇列
2、訊息需要設定訊息的優先順序
3、生產者必須先將訊息傳送到佇列中,讓佇列對設定了優先順序的訊息進行排隊
4、1、2、3 完成之後再啟動消費者進行消費即可
要想實現上述功能,我們先將 Consumer 的 @RabbitListener 註解註釋掉,然後啟動 Springboot 專案
瀏覽器傳送請求:http://localhost:8080/priority/sendMessage/小毛毛是最可愛的
訊息傳送完成之後,然後開啟 Consumer 的 @RabbitListener 註解,再次啟動 Springboot 專案
從消費者的消費結果可以看出,優先順序越高的訊息越早被消費,如果未設定訊息的優先順序,那麼該預設的優先順序看起來是 1