1. 程式人生 > 其它 >RabbitMQ-----死信佇列

RabbitMQ-----死信佇列

1.什麼是TTL?

a. time to live 訊息存活時間

b. 如果訊息在存活時間內未被消費,則會被清除

c. RabbitMQ支援兩種ttl設定
  -單獨訊息進行配置ttl
  -整個佇列進行配置ttl(居多)

2.什麼是rabbitmq的死信佇列?

沒有被及時消費的訊息存放的佇列

3.什麼是rabbitmq的死信交換機?

Dead Letter Exchange(死信交換機,縮寫:DLX)當訊息成為死信後,會被重新發送到另一個交換機,這個交換機就是DLX死信交換機

4.訊息有哪幾種情況成為死信?

a. 消費者拒收訊息(basic.reject/ basic.nack),並且沒有重新入隊 requeue=false
b. 訊息在佇列中未被消費,且超過佇列或者訊息本身的過期時間TTL(time-to-live) c. 佇列的訊息長度達到極限
結果:訊息成為死信後,如果該佇列綁定了死信交換機,則訊息會被死信交換機重新路由到死信佇列

5.RabbitMQ管控臺訊息TTL測試

a. 佇列過期時間使用引數,對整個佇列訊息統一過期
x-message-ttl
單位ms(毫秒)
b. 訊息過期時間使用引數(如果佇列頭部訊息未過期,佇列中級訊息已經過期,則訊息會還在佇列裡面) expiration 單位ms(毫秒)
c. 兩者都配置的話,時間短的先觸發

6.如圖

7.什麼是延遲佇列?

種帶有延遲功能的訊息佇列,Producer 將訊息傳送到訊息佇列 服務端,但並不期望這條訊息立馬投遞,而是推遲到在當前時間點之後的某一個時間投遞到 Consumer 進行消費,該訊息即定時訊息

8.使用場景

1. 通過訊息觸發一些定時任務,比如在某一固定時間點向用戶傳送提醒訊息
b. 使用者登入之後5分鐘給使用者做分類推送、使用者多少天未登入給使用者做召回推送;
c. 訊息生產和消費有時間視窗要求:比如在天貓電商交易中超時未支付關閉訂單的場景,在訂單建立時會發送一條延時訊息。這條訊息將會在30分鐘以後投遞給消費者,
消費者收到此訊息後需要判斷對應的訂單是否已完成支付。 如支付未完成,則關閉訂單。如已完成支付則忽略

9.業界的一些實現方式

a. 定時任務高精度輪訓

b. 採用RocketMQ自帶延遲訊息功能

c. RabbitMQ本身是不支援延遲佇列的,怎麼辦?
結合死信佇列的特性,就可以做到延遲訊息

10.程式碼

場景:

客戶提交商品訂單後,需要在30分鐘內完成支付,如未完成,則傳送訊息提醒訂單失敗

a. rabbitmq配置類程式碼,配置普通/死信佇列和交換機

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * Rabbitmq配置類
 * 這裡普通佇列也可以叫它延時佇列,是沒有配置消費者(Listener)去監聽的
 *
 * */
@Configuration
public class RabbitmqConfig {

    public static final String LOCK_MERCHANT_DEAD_EXCHANGE = "lock_merchant_dead_exchange";
    public static final String LOCK_MERCHANT_DEAD_QUEUE = "lock_merchant_dead_queue";
    public static final String LOCK_MERCHANT_DEAD_ROUTING_KEY = "lock_merchant_dead_routing_key";
    public static final String NEW_MERCHANT_EXCHANGE = "new_merchant_exchange";
    public static final String NEW_MERCHANT_QUEUE= "new_merchant_queue";
    public static final String NEW_MERCHANT_ROUTING_KEY = "new_merchant.#";

    /**
     * 死信交換機(topic模式)
     *
     * */
    @Bean
    public Exchange lockMerchantDeadExchange(){
        //durable: 是否持久化, 佇列的宣告預設是存放到記憶體中的,如果rabbitmq重啟會丟失,
        // 如果想重啟之後還存在就要使佇列持久化,儲存到Erlang自帶的Mnesia資料庫中,
        // 當rabbitmq重啟之後會讀取該資料庫
        return ExchangeBuilder.topicExchange(LOCK_MERCHANT_DEAD_EXCHANGE).durable(true).build();
    }

    /**
     * 死信佇列
     *
     * */
    @Bean
    public Queue lockMerchantDeadQueue() {
        return QueueBuilder.durable(LOCK_MERCHANT_DEAD_QUEUE).build();
    }

    /**
     * 繫結死信交換機和死信佇列
     * 這裡不加@Qualifier的話會報錯:there is more than one bean of “xxx” type
     * 因為死信交換機和普通交換機都配置了Exchange, 無法區分哪種作為引數
     * Queue同理
     *
     * */
    @Bean
    public Binding lockMerchantDeadBinding(@Qualifier("lockMerchantDeadExchange") Exchange exchange, @Qualifier("lockMerchantDeadQueue") Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with(LOCK_MERCHANT_DEAD_ROUTING_KEY).noargs();
    }

    /**
     * 普通交換機(topic模式)
     *
     * */
    @Bean
    public Exchange newMerchantExchange(){
        //durable: 是否持久化, 佇列的宣告預設是存放到記憶體中的,如果rabbitmq重啟會丟失,
        // 如果想重啟之後還存在就要使佇列持久化,儲存到Erlang自帶的Mnesia資料庫中,
        // 當rabbitmq重啟之後會讀取該資料庫
        return ExchangeBuilder.topicExchange(NEW_MERCHANT_EXCHANGE).durable(true).build();
    }

    /**
     * 普通佇列
     *
     * */
    @Bean
    public Queue newMerchantQueue() {
        Map<String, Object> args = new HashMap<>();
        //訊息過期後,進入死信交換機
        args.put("x-dead-letter-exchange", LOCK_MERCHANT_DEAD_EXCHANGE);
        //訊息過期後,進入死信交換機的路由鍵
        args.put("x-dead-letter-routing-key", LOCK_MERCHANT_DEAD_ROUTING_KEY);
        //訊息過期時間 單位:毫秒 訊息過期後,會從普通佇列轉入死信佇列
        //這裡方便測試設定10秒後訊息過期
        args.put("x-message-ttl",10000);
        return QueueBuilder.durable(NEW_MERCHANT_QUEUE).withArguments(args).build();
    }

    /**
     * 繫結普通交換機和普通佇列
     *
     * */
    @Bean
    public Binding newMerchantBinding(){
        return new Binding(NEW_MERCHANT_QUEUE, Binding.DestinationType.QUEUE,
                NEW_MERCHANT_EXCHANGE, NEW_MERCHANT_ROUTING_KEY, null);
    }
}
View Code

b. 監聽死信佇列程式碼

import com.rabbitmq.client.Channel;
import com.theng.shopuser.config.RabbitmqConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * 消費者監聽死信佇列
 *
 */
@Component
@RabbitListener(queues = RabbitmqConfig.LOCK_MERCHANT_DEAD_QUEUE)
public class OrderMQListener {

    /**
     * body: 接收convertAndSend(String exchange, String routingKey, Object object)的object訊息
     *
     * */
    @RabbitHandler
    public void messageHandler(String body, Message message, Channel channel) throws IOException {

        long msgTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("body: " + body);
        System.out.println("msgTag: " + msgTag);
        System.out.println("message: " + message.toString());

        //30分鐘後,從body中獲取買家資訊再從資料庫查詢搶購到的商品訂單是否處理 TODO
        //如果沒有處理,則向商家傳送提醒訊息 TODO

        //告訴broker(訊息佇列伺服器實體),訊息已經被確認
        channel.basicAck(msgTag, false);
        //告訴broker,訊息拒絕確認(可以拒絕多條,把比當前msgTag值小的也拒絕)
//        channel.basicNack(msgTag, false, true);
        //告訴broker,訊息拒絕確認(只能拒絕當前msgTag的這條)
//        channel.basicReject(msgTag, true);
    }
}
View Code

c. application.ym配置檔案

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: 123456
    #虛擬主機 可在http://localhost:15672管理平臺進行配置
    virtual-host: /dev
    #開啟訊息二次確認ConfirmCallback配置
    publisher-confirms: true
    #開啟ReturnCallback配置
    publisher-returns: true
    #修改交換機改投訊息遞到佇列失敗策略
    #true:交換機處理訊息到佇列失敗,則返回給生產者
    #和publisher-returns配合使用
    template:
      mandatory: true
    #訊息手工確認ack
    listener:
      simple:
        acknowledge-mode: manual
View Code

d. 控制器程式碼

@RestController
@RequestMapping("/user-info")
public class UserInfoController {

    @Autowired
    public RedisTemplate redisTemplate;  

    //訊息生產者
    @GetMapping("/send")
    public Object testSend(){
        //object可儲存買家資訊
        rabbitTemplate.convertAndSend(RabbitmqConfig.NEW_MERCHANT_EXCHANGE, "new_merchant.create", "買家搶購成功,請及時處理訂單!");

        Map<String, Object> map = new HashMap<>();
        map.put("code", 0);
        map.put("msg", "買家搶購成功,請在30分鐘內提交訂單!");
        return "success";
    }
}
View Code

結果:

生產者傳送訊息10秒後,訊息會進入死信交換機,通過死信佇列將訂單過期訊息傳送給消費者