RabbitMQ-----死信佇列
阿新 • • 發佈:2021-10-09
1.什麼是TTL?
a. time to live 訊息存活時間 b. 如果訊息在存活時間內未被消費,則會被清除 c. RabbitMQ支援兩種ttl設定 -單獨訊息進行配置ttl -整個佇列進行配置ttl(居多)
2.什麼是rabbitmq的死信佇列?
沒有被及時消費的訊息存放的佇列
3.什麼是rabbitmq的死信交換機?
Dead Letter Exchange(死信交換機,縮寫:DLX)當訊息成為死信後,會被重新發送到另一個交換機,這個交換機就是DLX死信交換機
4.訊息有哪幾種情況成為死信?
a. 消費者拒收訊息(basic.reject/ basic.nack),並且沒有重新入隊 requeue=falseb. 訊息在佇列中未被消費,且超過佇列或者訊息本身的過期時間TTL(time-to-live) c. 佇列的訊息長度達到極限
結果:訊息成為死信後,如果該佇列綁定了死信交換機,則訊息會被死信交換機重新路由到死信佇列
5.RabbitMQ管控臺訊息TTL測試
a. 佇列過期時間使用引數,對整個佇列訊息統一過期 x-message-ttl 單位ms(毫秒)
b. 訊息過期時間使用引數(如果佇列頭部訊息未過期,佇列中級訊息已經過期,則訊息會還在佇列裡面) expiration 單位ms(毫秒)
c. 兩者都配置的話,時間短的先觸發
6.如圖
7.什麼是延遲佇列?
種帶有延遲功能的訊息佇列,Producer 將訊息傳送到訊息佇列 服務端,但並不期望這條訊息立馬投遞,而是推遲到在當前時間點之後的某一個時間投遞到 Consumer 進行消費,該訊息即定時訊息
8.使用場景
1. 通過訊息觸發一些定時任務,比如在某一固定時間點向用戶傳送提醒訊息 b. 使用者登入之後5分鐘給使用者做分類推送、使用者多少天未登入給使用者做召回推送; c. 訊息生產和消費有時間視窗要求:比如在天貓電商交易中超時未支付關閉訂單的場景,在訂單建立時會發送一條延時訊息。這條訊息將會在30分鐘以後投遞給消費者,
消費者收到此訊息後需要判斷對應的訂單是否已完成支付。 如支付未完成,則關閉訂單。如已完成支付則忽略
9.業界的一些實現方式
a. 定時任務高精度輪訓 b. 採用RocketMQ自帶延遲訊息功能 c. RabbitMQ本身是不支援延遲佇列的,怎麼辦? 結合死信佇列的特性,就可以做到延遲訊息
10.程式碼
場景:
客戶提交商品訂單後,需要在30分鐘內完成支付,如未完成,則傳送訊息提醒訂單失敗
a. rabbitmq配置類程式碼,配置普通/死信佇列和交換機
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * Rabbitmq配置類 * 這裡普通佇列也可以叫它延時佇列,是沒有配置消費者(Listener)去監聽的 * * */ @Configuration public class RabbitmqConfig { public static final String LOCK_MERCHANT_DEAD_EXCHANGE = "lock_merchant_dead_exchange"; public static final String LOCK_MERCHANT_DEAD_QUEUE = "lock_merchant_dead_queue"; public static final String LOCK_MERCHANT_DEAD_ROUTING_KEY = "lock_merchant_dead_routing_key"; public static final String NEW_MERCHANT_EXCHANGE = "new_merchant_exchange"; public static final String NEW_MERCHANT_QUEUE= "new_merchant_queue"; public static final String NEW_MERCHANT_ROUTING_KEY = "new_merchant.#"; /** * 死信交換機(topic模式) * * */ @Bean public Exchange lockMerchantDeadExchange(){ //durable: 是否持久化, 佇列的宣告預設是存放到記憶體中的,如果rabbitmq重啟會丟失, // 如果想重啟之後還存在就要使佇列持久化,儲存到Erlang自帶的Mnesia資料庫中, // 當rabbitmq重啟之後會讀取該資料庫 return ExchangeBuilder.topicExchange(LOCK_MERCHANT_DEAD_EXCHANGE).durable(true).build(); } /** * 死信佇列 * * */ @Bean public Queue lockMerchantDeadQueue() { return QueueBuilder.durable(LOCK_MERCHANT_DEAD_QUEUE).build(); } /** * 繫結死信交換機和死信佇列 * 這裡不加@Qualifier的話會報錯:there is more than one bean of “xxx” type * 因為死信交換機和普通交換機都配置了Exchange, 無法區分哪種作為引數 * Queue同理 * * */ @Bean public Binding lockMerchantDeadBinding(@Qualifier("lockMerchantDeadExchange") Exchange exchange, @Qualifier("lockMerchantDeadQueue") Queue queue){ return BindingBuilder.bind(queue).to(exchange).with(LOCK_MERCHANT_DEAD_ROUTING_KEY).noargs(); } /** * 普通交換機(topic模式) * * */ @Bean public Exchange newMerchantExchange(){ //durable: 是否持久化, 佇列的宣告預設是存放到記憶體中的,如果rabbitmq重啟會丟失, // 如果想重啟之後還存在就要使佇列持久化,儲存到Erlang自帶的Mnesia資料庫中, // 當rabbitmq重啟之後會讀取該資料庫 return ExchangeBuilder.topicExchange(NEW_MERCHANT_EXCHANGE).durable(true).build(); } /** * 普通佇列 * * */ @Bean public Queue newMerchantQueue() { Map<String, Object> args = new HashMap<>(); //訊息過期後,進入死信交換機 args.put("x-dead-letter-exchange", LOCK_MERCHANT_DEAD_EXCHANGE); //訊息過期後,進入死信交換機的路由鍵 args.put("x-dead-letter-routing-key", LOCK_MERCHANT_DEAD_ROUTING_KEY); //訊息過期時間 單位:毫秒 訊息過期後,會從普通佇列轉入死信佇列 //這裡方便測試設定10秒後訊息過期 args.put("x-message-ttl",10000); return QueueBuilder.durable(NEW_MERCHANT_QUEUE).withArguments(args).build(); } /** * 繫結普通交換機和普通佇列 * * */ @Bean public Binding newMerchantBinding(){ return new Binding(NEW_MERCHANT_QUEUE, Binding.DestinationType.QUEUE, NEW_MERCHANT_EXCHANGE, NEW_MERCHANT_ROUTING_KEY, null); } }View Code
b. 監聽死信佇列程式碼
import com.rabbitmq.client.Channel; import com.theng.shopuser.config.RabbitmqConfig; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * 消費者監聽死信佇列 * */ @Component @RabbitListener(queues = RabbitmqConfig.LOCK_MERCHANT_DEAD_QUEUE) public class OrderMQListener { /** * body: 接收convertAndSend(String exchange, String routingKey, Object object)的object訊息 * * */ @RabbitHandler public void messageHandler(String body, Message message, Channel channel) throws IOException { long msgTag = message.getMessageProperties().getDeliveryTag(); System.out.println("body: " + body); System.out.println("msgTag: " + msgTag); System.out.println("message: " + message.toString()); //30分鐘後,從body中獲取買家資訊再從資料庫查詢搶購到的商品訂單是否處理 TODO //如果沒有處理,則向商家傳送提醒訊息 TODO //告訴broker(訊息佇列伺服器實體),訊息已經被確認 channel.basicAck(msgTag, false); //告訴broker,訊息拒絕確認(可以拒絕多條,把比當前msgTag值小的也拒絕) // channel.basicNack(msgTag, false, true); //告訴broker,訊息拒絕確認(只能拒絕當前msgTag的這條) // channel.basicReject(msgTag, true); } }View Code
c. application.ym配置檔案
spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: 123456 #虛擬主機 可在http://localhost:15672管理平臺進行配置 virtual-host: /dev #開啟訊息二次確認ConfirmCallback配置 publisher-confirms: true #開啟ReturnCallback配置 publisher-returns: true #修改交換機改投訊息遞到佇列失敗策略 #true:交換機處理訊息到佇列失敗,則返回給生產者 #和publisher-returns配合使用 template: mandatory: true #訊息手工確認ack listener: simple: acknowledge-mode: manualView Code
d. 控制器程式碼
@RestController @RequestMapping("/user-info") public class UserInfoController { @Autowired public RedisTemplate redisTemplate; //訊息生產者 @GetMapping("/send") public Object testSend(){ //object可儲存買家資訊 rabbitTemplate.convertAndSend(RabbitmqConfig.NEW_MERCHANT_EXCHANGE, "new_merchant.create", "買家搶購成功,請及時處理訂單!"); Map<String, Object> map = new HashMap<>(); map.put("code", 0); map.put("msg", "買家搶購成功,請在30分鐘內提交訂單!"); return "success"; } }View Code
結果:
生產者傳送訊息10秒後,訊息會進入死信交換機,通過死信佇列將訂單過期訊息傳送給消費者