1. 程式人生 > 實用技巧 >RabbitMQ 死信佇列是什麼鬼?

RabbitMQ 死信佇列是什麼鬼?

作者:海向
來源:https://www.cnblogs.com/haixiang/p/10905189.html

死信佇列

死信佇列:沒有被及時消費的訊息存放的佇列。

訊息沒有被及時消費的原因:

  • a.訊息被拒絕(basic.reject/ basic.nack)並且不再重新投遞 requeue=false
  • b.TTL(time-to-live) 訊息超時未消費
  • c.達到最大佇列長度

實現死信佇列步驟

首先需要設定死信佇列的 exchange 和 queue,然後進行繫結:

Exchange: dlx.exchange
Queue: dlx.queue
RoutingKey: # 代表接收所有路由 key

然後我們進行正常宣告交換機、佇列、繫結,只不過我們需要在普通佇列加上一個引數即可: arguments.put("x-dead-letter-exchange",' dlx.exchange' )

這樣訊息在過期、requeue失敗、 佇列在達到最大長度時,訊息就可以直接路由到死信佇列!

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class DlxProducer {
    public static void main(String[] args) throws Exception {
				//設定連線以及建立 channel 湖綠
        String exchangeName = "test_dlx_exchange";
        String routingKey = "item.update";
      
        String msg = "this is dlx msg";

        //我們設定訊息過期時間,10秒後再消費 讓訊息進入死信佇列
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                .deliveryMode(2)
                .expiration("10000")
                .build();

        channel.basicPublish(exchangeName, routingKey, true, properties, msg.getBytes());
        System.out.println("Send message : " + msg);

        channel.close();
        connection.close();
    }
}
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class DlxConsumer {
    public static void main(String[] args) throws Exception {
				//建立連線、建立channel忽略 內容可以在上面程式碼中獲取
        String exchangeName = "test_dlx_exchange";
        String queueName = "test_dlx_queue";
        String routingKey = "item.#";

        //必須設定引數到 arguments 中
        Map<String, Object> arguments = new HashMap<String, Object>();
        arguments.put("x-dead-letter-exchange", "dlx.exchange");

        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        //將 arguments 放入佇列的宣告中
        channel.queueDeclare(queueName, true, false, false, arguments);

        //一般不用程式碼繫結,在管理介面手動繫結
        channel.queueBind(queueName, exchangeName, routingKey);


        //宣告死信佇列
        channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
        channel.queueDeclare("dlx.queue", true, false, false, null);
        //路由鍵為 # 代表可以路由到所有訊息
        channel.queueBind("dlx.queue", "dlx.exchange", "#");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {

                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");

            }
        };

        //6. 設定 Channel 消費者繫結佇列
        channel.basicConsume(queueName, true, consumer);
    }
}

總結

DLX也是一個正常的 Exchange,和一般的 Exchange 沒有區別,它能在任何的佇列上被指定,實際上就是設定某個佇列的屬性。

當這個佇列中有死信時,RabbitMQ 就會自動的將這個訊息重新發布到設定的 Exchange 上去,進而被路由到另一個佇列。可以監聽這個佇列中訊息做相應的處理。
近期熱文推薦:

1.Java 15 正式釋出, 14 個新特性,重新整理你的認知!!

2.終於靠開源專案弄到 IntelliJ IDEA 啟用碼了,真香!

3.我用 Java 8 寫了一段邏輯,同事直呼看不懂,你試試看。。

4.吊打 Tomcat ,Undertow 效能很炸!!

5.《Java開發手冊(嵩山版)》最新發布,速速下載!

覺得不錯,別忘了隨手點贊+轉發哦!