1. 程式人生 > 程式設計 >RabbitMQ高階特性-生產端Return返回訊息機制

RabbitMQ高階特性-生產端Return返回訊息機制

為什麼需要Return

Return Listener用於處理一些不可路由的訊息

生產者通過指定Exchange和RoutingKey將訊息傳送達指定佇列,消費者只需監聽這個佇列,進行消費操作。

但是在某些情況下,我們在傳送訊息的時候,當前Exchange不存在或者指定的路由key路由不到,這個時候我們如果需要監聽這種達不到的訊息,我們就需要使用Return Listener。

實現

流程圖:

生產端Return

在API中配置項:

Mandatory:如果為true,則會監聽路由不可達訊息,然後進行後續處理;如果為false,那麼broker自動刪除這條訊息。

程式碼實現

Producer

package com.wyg.rabbitmq.javaclient.producer_return;

import
java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.*; /** * 生產端return機制 * * @author [email protected] * @date 2019-11-22 13:25 * @since JDK1.8 * @version V1.0 */ public class Producer { private static final String HOST = "localhost"; private static
final int PORT = 5672; private static final String USERNAME = "guset"; private static final String PASSWORD = "guset"; public static void main(String[] args) throws IOException,TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setVirtualHost("/"
); factory.setPort(PORT); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_return_exchange"; String routingKey = "return.ok"; String routingKeyError = "return.error"; // 申明exchange channel.exchangeDeclare(exchangeName,"topic"); // 新增監聽 channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replyCode,String replyText,String exchange,String routingKey,AMQP.BasicProperties properties,byte[] body) throws IOException { System.out.println("---生產端---"); System.out.println("replyCode:" + replyCode); System.out.println("replyText:" + replyText); System.out.println("exchange:" + exchange); System.out.println("routingKey:" + routingKey); System.out.println("properties:" + properties); System.out.println("body:" + new String(body)); } }); // 設定 mandatory 為 true boolean mandatory = true; String msg = "這是一條return確認訊息1,routingKey " + routingKey; channel.basicPublish(exchangeName,routingKey,mandatory,null,msg.getBytes("UTF-8")); String msg2 = "這是一條return確認訊息2,routingKey " + routingKeyError; channel.basicPublish(exchangeName,routingKeyError,msg2.getBytes("UTF-8")); } // 注意,因為要等待broker的return訊息,暫時不關閉channel和connection } 複製程式碼

Conusumer

package com.wyg.rabbitmq.javaclient.producer_return;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.*;

/**
 * 生產端return機制
 * 
 * @author [email protected]
 * @date 2019-11-22 14:07
 * @since JDK1.8
 * @version V1.0
 */

public class Consumer {

    private static final String HOST = "localhost";
    private static final int PORT = 5672;
    private static final String USERNAME = "guset";
    private static final String PASSWORD = "guset";

    public static void main(String[] args) throws IOException,TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setVirtualHost("/");
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        String queueName = "test_return_queue";
        String exchangeName = "test_return_exchange";
        String routingKey = "return.ok";

        // 申明exchange
        channel.exchangeDeclare(exchangeName,"topic");
        // 申明佇列
        channel.queueDeclare(queueName,true,false,null);
        // 佇列繫結到exchange
        channel.queueBind(queueName,exchangeName,null);

        DeliverCallback deliverCallback = new DeliverCallback() {
            @Override
            public void handle(String consumerTag,Delivery message) throws IOException {
                try {
                    System.out.println("---消費者---:" + new String(message.getBody(),"UTF-8"));
                } finally {
                    // consumer手動 ack 給broker
                    channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
                }
            }
        };
        CancelCallback cancelCallback = new CancelCallback() {
            @Override
            public void handle(String consumerTag) throws IOException {
                System.out.println("---消費者:cancelCallback");
            }
        };

        // 消費訊息
        channel.basicConsume(queueName,deliverCallback,cancelCallback);
    }
}

複製程式碼

注意

先執行Consumer,繫結正確的exchange和routingKey,再啟動生產者

執行結果

Consumer

Consumer執行結果

Producer,其中routingKey 為 return.error的訊息無法路由,被Return Listener監聽到

Producer執行結果