1. 程式人生 > 實用技巧 >【RabbitMQ-7】RabbitMQ—交換機識別符號

【RabbitMQ-7】RabbitMQ—交換機識別符號

  1. 死信佇列概念
    死信佇列(Dead Letter Exchange),死信交換器。當業務佇列中的訊息被拒絕或者過期或者超過佇列的最大長度時,訊息會被丟棄,但若是配置了死信佇列,那麼訊息可以被重新發布到另一個交換器,這個交換器就是DLX,與DLX繫結的佇列稱為死信佇列。

若業務佇列想繫結死信佇列,那麼在宣告業務佇列時,需要指定DLX(死信Exchange)和DLK(死信RoutingKey)。

控制檯.png
2.訊息成為"死信"的前提
訊息被否定確認,使用 channel.basicNack或channel.basicReject ,並且此時requeue屬性被設定為false。
訊息在佇列的存活時間超過設定的TTL時間。
訊息佇列的訊息數量已經超過最大佇列長度。
“死信”訊息會被RabbitMQ進行特殊處理,如果配置了死信佇列資訊,那麼該訊息將會被丟進死信佇列中,如果沒有配置,則該訊息將會被丟棄。

  1. 如何為業務佇列配置死信佇列
    在宣告業務佇列時,指定死信配置。

@Configuration
public class RabbitMqConfig {

/**
 * 死信佇列 交換機識別符號
 */
public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";
/**
 * 死信佇列交換機繫結鍵識別符號
 */
public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";


/********************************************************
 * 建立死信佇列
 *******************************************************/

public final static String deadQueueName = "dead_queue";
public final static String deadRoutingKey = "dead_routing_key";
public final static String deadExchangeName = "dead_exchange";

/**
 * 建立死信佇列
 */
@Bean
public Queue deadQueue() {
    return new Queue(deadQueueName, true);
}

/**
 * 建立死信交換機
 */
@Bean
public DirectExchange deadExchange() {
    return new DirectExchange(deadExchangeName);
}

/**
 * 死信佇列與死信交換機繫結
 */
@Bean
public Binding bindingDeadExchange(Queue deadQueue, DirectExchange deadExchange) {
    return BindingBuilder.bind(deadQueue).to(deadExchange).with(deadRoutingKey);
}

/*************************************************
 * 建立業務佇列
 ************************************************/
public final static String kinsonQueueName = "kinson_queue";
public final static String kinsonRoutingKey = "kinson_routing_key";
public final static String kinsonExchangeName = "kinson_exchange";

/**
 * 建立業務交換機
 */
@Bean
public DirectExchange kinsonExchange() {
    return new DirectExchange(kinsonExchangeName);
}

/**
 * 建立業務佇列時——聲明瞭死信佇列
 */
@Bean
public Queue kinsonQueue() {
    // 將普通佇列繫結到死信佇列交換機上
    Map<String, Object> args = new HashMap<>(2);
    args.put(DEAD_LETTER_QUEUE_KEY, deadExchangeName);
    args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKey);
    return new Queue(kinsonQueueName, true, false, false, args);
}

/**
 * 繫結關係
 */
@Bean
public Binding kinsonRoutingKey(Queue kinsonQueue, DirectExchange kinsonExchange) {
    return BindingBuilder.bind(kinsonQueue).to(kinsonExchange).with(kinsonRoutingKey);
}

}
死信佇列並不是特殊的佇列,只是繫結到死信交換機上的佇列。死信交換機只是接受死信的普通交換機,它的型別也是[Direct、Fanout、Topic]。一般來說,會給每一個業務佇列都宣告一個獨有的路由key,並對應配置一個死信佇列進行監聽(但是一個專案可共用一個死信交換機)。

  1. 死信訊息的Header
    @Component
    @Slf4j
    public class CustomerRev {
    //業務佇列
    @RabbitListener(queues = {"kinson_queue"})
    public void receiver(Message msg, Channel channel) {
    try {
    //列印資料
    String message = new String(msg.getBody(), StandardCharsets.UTF_8);
    log.info("【業務佇列msg.getMessageProperties().getHeaders()】:{}",JSON.toJSONString(msg.getMessageProperties().getHeaders()));
    //(手動確認)丟棄訊息,且不重新回佇列,訊息會進入死信佇列
    channel.basicReject(msg.getMessageProperties().getDeliveryTag(), false);
    } catch (Exception e) {
    log.error("錯誤資訊:{}", e.getMessage());
    }
    }
    //死信佇列
    @RabbitListener(queues = {"dead_queue"})
    public void receiver2(Message msg, Channel channel) {
    try {
    //列印資料
    log.info("【死信佇列msg.getMessageProperties().getHeaders():{}】", JSON.toJSONString(msg.getMessageProperties().getHeaders()));
    log.info("【死信佇列msg.getMessageProperties().getXDeathHeader():{}】", JSON.toJSONString(msg.getMessageProperties().getXDeathHeader()));
    String message = new String(msg.getBody(), StandardCharsets.UTF_8);
    log.info("死信佇列收取訊息:"+message);
    channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);
    } catch (Exception e) {
    log.error("錯誤資訊:{}", e.getMessage());
    }
    }
    }
    執行訊息:

【業務佇列msg.getMessageProperties().getHeaders()】:{}
【死信佇列msg.getMessageProperties().getHeaders():{"x-first-death-exchange":"","x-death":[{"reason":"rejected","count":1,"exchange":"","time":1604549067000,"routing-keys":["kinson_queue"],"queue":"kinson_queue"}],"x-first-death-reason":"rejected","x-first-death-queue":"kinson_queue"}】
【死信佇列msg.getMessageProperties().getXDeathHeader():[{"reason":"rejected","count":1,"exchange":"","time":1604549067000,"routing-keys":["kinson_queue"],"queue":"kinson_queue"}]】
死信佇列收取訊息:Date:1604549064079
死信佇列消費的訊息的Header:

欄位名 含義
x-first-death-exchange 第一次被拋入的死信交換機的名稱
x-first-death-reason 第一次成為死信的原因,rejected:訊息在重新進入佇列時被佇列拒絕,由於default-requeue-rejected引數被設定為false。expired :訊息過期。maxlen: 佇列內訊息數量超過佇列最大容量
x-first-death-queue 第一次成為死信前所在佇列名稱
x-death 歷次被投入死信交換機的資訊列表,同一個訊息每次進入一個死信交換機,這個陣列的資訊就會被更新
5. 死信佇列使用場景
在較為重要的業務場景中,確保未被消費的訊息不被丟棄,一般傳送訊息異常可能原因主要有訊息本身存在錯誤導致業務處理異常,引數校驗異常,網路波動導致查詢或呼叫介面異常。為了不使訊息堆積,從而丟棄訊息。

當發生異常時,不能每次通過日誌來獲取原訊息,處理完問題後重新投遞訊息。通過配置死信佇列,可以讓未正確消費的訊息暫存到死信佇列中,後續排查清楚問題,編寫相應的程式碼來處理死信訊息(不在手動的恢復資料)。

推薦閱讀
一文了解死信佇列