1. 程式人生 > 程式設計 >RabbitMQ高階特性-死信佇列

RabbitMQ高階特性-死信佇列

死信

什麼是死信

訊息沒有任何消費者去消費就變為死信

訊息變為死信有以下幾種情況

  1. 訊息被拒絕(basic.reject/basic.nack),並且requeue=false

  2. 訊息TTL過期

  3. 佇列達到最大長度

死信佇列 Dead Letter Exchange(DLX)

DLX

利用DLX,當訊息在一個佇列中變成死信之後,它能被重新publish到另外一個exchange,這個exchange就是DLX。

DLX也是一個正常的Exchange,和一般的Exchange沒有區別,它能在任何佇列上被指定,實際就是設定某個佇列的屬性。當佇列中有死信時,RabbitMQ會自動將死信訊息傳送到設定的DLX,進而被路由到另外一個佇列,可以監聽這個佇列,做後續處理。

死信佇列設定

  1. 申明死信佇列的Exchange和queue,然後進行繫結
  2. 申明正常佇列Exchange和queue繫結,只不過要在佇列上加引數 arguments.put(x-dead-letter-exchange","you dlx");

程式碼實現

producer

package com.wyg.rabbitmq.javaclient.dlx;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import
com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * 消費者手工ack和nack * * @author [email protected] * @date 2019-11-22 13:25 * @since JDK1.8 * @version V1.0 */ public class Producer { private static final String HOST = "localhost"; private static final int PORT = 5672
; private static final String USERNAME = "guset"; private static final String PASSWORD = "guset"; public static void main(String[] args) throws IOException,TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setVirtualHost("/"); factory.setPort(PORT); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_dlx_exchange"; String routingKey = "dlx.abc"; // 申明exchange channel.exchangeDeclare(exchangeName,"topic"); String msg = "正常訊息1,routingKey:" + routingKey; AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().deliveryMode(2).expiration("2000").build(); channel.basicPublish(exchangeName,routingKey,false,null,msg.getBytes("UTF-8")); // 該訊息無消費者消費 String msg2 = "過期死信訊息2,routingKey:" + routingKey; channel.basicPublish(exchangeName,props,msg2.getBytes("UTF-8")); String msg3 = "過期死信訊息3,routingKey:" + routingKey; channel.basicPublish(exchangeName,msg3.getBytes("UTF-8")); String msg4 = "過期死信訊息4,routingKey:" + routingKey; channel.basicPublish(exchangeName,msg4.getBytes("UTF-8")); channel.close(); connection.close(); } } 複製程式碼

producer可以採用訊息過期產生死信

正常consumer

package com.wyg.rabbitmq.javaclient.dlx;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.*;

/**
 * 消費者手工ack和nack
 * 
 * @author [email protected]
 * @date 2019-11-22 14:07
 * @since JDK1.8
 * @version V1.0
 */

public class Consumer {

    private static final String HOST = "localhost";
    private static final int PORT = 5672;
    private static final String USERNAME = "guset";
    private static final String PASSWORD = "guset";

    public static void main(String[] args) throws IOException,TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setVirtualHost("/");
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        // 定義死信隊的Exchange
        String dlxExchange = "dlx.exchange";
        channel.exchangeDeclare(dlxExchange,"topic");
        // 死信佇列名
        String dlxQueue = "dlx.queue";
        channel.queueDeclare(dlxQueue,true,null);
        // # 表示所有的key都可以路由到s死信佇列
        String dlxRoutingKey = "#";
        // 繫結死信佇列和exchange
        channel.queueBind(dlxQueue,dlxExchange,dlxRoutingKey,null);

        // 定義正常的消費者j監聽佇列
        String queueName = "test_dlx_queue";
        String exchangeName = "test_dlx_exchange";
        String routingKey = "dlx.#";
        // 申明exchange
        channel.exchangeDeclare(exchangeName,"topic");
        // 申明佇列

        Map<String,Object> arguments = new HashMap<>();
        // 設定死信佇列,arguments要設定到申明的佇列上
        arguments.put("x-dead-letter-exchange",dlxExchange);
        channel.queueDeclare(queueName,arguments);
        // 佇列繫結到exchange
        channel.queueBind(queueName,exchangeName,routingKey);
        channel.basicQos(1);

        DeliverCallback deliverCallback = new DeliverCallback() {
            @Override
            public void handle(String consumerTag,Delivery message) throws IOException {

                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("---消費者-- " + new String(message.getBody(),"UTF-8"));

            }
        };
        CancelCallback cancelCallback = new CancelCallback() {
            @Override
            public void handle(String consumerTag) throws IOException {
                System.out.println("---消費者--:cancelCallback ");
            }
        };

        // 消費訊息,autoAck一定要設為false,手工ack
        channel.basicConsume(queueName,deliverCallback,cancelCallback);
    }
}

複製程式碼

執行結果:只消費一條正常訊息,其他過期的未消費

DLXConusmer,監聽消費死信佇列中的訊息

package com.wyg.rabbitmq.javaclient.dlx;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.*;

/**
 * 監聽私信佇列
 * 
 * @author [email protected]
 * @date 2019-11-22 21:52
 * @since JDK1.8
 * @version V1.0
 */

public class DLXConusmer {
    private static final String HOST = "localhost";
    private static final int PORT = 5672;
    private static final String USERNAME = "guset";
    private static final String PASSWORD = "guset";

    public static void main(String[] args) throws IOException,TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setVirtualHost("/");
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        String queueName = "dlx.queue";
        String exchangeName = "dlx.exchange";
        String routingKey = "#";

        // 申明exchange
        channel.exchangeDeclare(exchangeName,"topic");
        // 申明佇列
        channel.queueDeclare(queueName,null);
        // 佇列繫結到exchange
        channel.queueBind(queueName,null);

        DeliverCallback deliverCallback = new DeliverCallback() {
            @Override
            public void handle(String consumerTag,Delivery message) throws IOException {
                try {
                    System.out.println("---死信佇列消費者---");
                    System.out.println(new String(message.getBody(),"UTF-8"));
                } finally {
                    // consumer手動 ack 給broker
                    channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
                }
            }
        };
        CancelCallback cancelCallback = new CancelCallback() {
            @Override
            public void handle(String consumerTag) throws IOException {
                System.out.println("---消費者:cancelCallback");
            }
        };

        // 消費訊息,autoAck一定要設定為false
        channel.basicConsume(queueName,cancelCallback);
    }
}

複製程式碼

執行結果:3條過期的訊息進入死信佇列,並被消費