1. 程式人生 > >RabbitMQ(二):mandatory標誌的作用

RabbitMQ(二):mandatory標誌的作用

獲取 操作 @override args 並不會 應該 發布者 http 具體實現

本文轉自:http://m.blog.csdn.net/article/details?id=54311277

在生產者通過channel的basicPublish方法發布消息時,通常有幾個參數需要設置,為此我們有必要了解清楚這些參數代表的具體含義及其作用,查看Channel接口,會發現存在3個重載的basicPublish方法

void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

void basicPublish(String exchange, String routingKey, boolean
mandatory, BasicProperties props, byte[] body) throws IOException; void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;

他們共有的參數分別是:
exchange:交換機名稱
routingKey:路由鍵
props:消息屬性字段,比如消息頭部信息等等
body:消息主體部分
除此之外,還有mandatory和immediate這兩個參數,鑒於RabbitMQ3.0不再支持immediate標誌,因此我們重點討論mandatory標誌
mandatory的作用:

當mandatory標誌位設置為true時,如果exchange根據自身類型和消息routingKey無法找到一個合適的queue存儲消息,那麽broker會調用basic.return方法將消息返還給生產者;當mandatory設置為false時,出現上述情況broker會直接將消息丟棄;通俗的講,mandatory標誌告訴broker代理服務器至少將消息route到一個隊列中,否則就將消息return給發送者;

下面我們通過幾個實例測試下mandatory標誌的作用:
測試1:設置mandatory標誌,且exchange未綁定隊列

public
class ProducerTest { public static void main(String[] args) { String exchangeName = "confirmExchange"; String queueName = "confirmQueue"; String routingKey = "confirmRoutingKey"; String bindingKey = "confirmBindingKey"; int count = 3; ConnectionFactory factory = new ConnectionFactory(); factory.setHost("172.16.151.74"); factory.setUsername("test"); factory.setPassword("test"); factory.setPort(5672); //創建生產者 Sender producer = new Sender(factory, count, exchangeName, routingKey); producer.run(); } } class Sender { private ConnectionFactory factory; private int count; private String exchangeName; private String routingKey; public Sender(ConnectionFactory factory,int count,String exchangeName,String routingKey) { this.factory = factory; this.count = count; this.exchangeName = exchangeName; this.routingKey = routingKey; } public void run() { try { Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //創建exchange channel.exchangeDeclare(exchangeName, "direct", true, false, null); //發送持久化消息 for(int i = 0;i < count;i++) { //第一個參數是exchangeName(默認情況下代理服務器端是存在一個""名字的exchange的,因此如果不創建exchange的話我們可以直接將該參數設置成"",如果創建了exchange的話我們需要將該參數設置成創建的exchange的名字),第二個參數是路由鍵 channel.basicPublish(exchangeName, routingKey, true, MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"條消息").getBytes()); } } catch (Exception e) { e.printStackTrace(); } } }

第45行我們將basicPublish的第三個參數mandatory設置成了true,表示開啟了mandatory標誌,但我們沒有為當前exchange綁定任何隊列;

通過wireshark抓包看到下面輸出: 技術分享

可以看到最後執行了basic.return方法,將發布者發出的消息返還給了發布者,查看協議的Arguments參數部分可以看到,Reply-Text字段值為:NO_ROUTE,表示消息並沒有路由到合適的隊列中;

那麽我們該怎麽獲取到沒有被正確路由到合適隊列的消息呢?這時候可以通過為channel信道設置ReturnListener監聽器來實現,具體實現代碼見下:

public class ProducerTest {
    public static void main(String[] args) {
        String exchangeName = "confirmExchange";
        String queueName = "confirmQueue";
        String routingKey = "confirmRoutingKey";
        String bindingKey = "confirmBindingKey";
        int count = 3;
        
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("172.16.151.74");
        factory.setUsername("test");
        factory.setPassword("test");
        factory.setPort(5672);
        
        //創建生產者
        Sender producer = new Sender(factory, count, exchangeName, routingKey);
        producer.run();
    }
}

class Sender
{
    private ConnectionFactory factory;
    private int count;
    private String exchangeName;
    private String routingKey;
    
    public Sender(ConnectionFactory factory,int count,String exchangeName,String routingKey) {
        this.factory = factory;
        this.count = count;
        this.exchangeName = exchangeName;
        this.routingKey = routingKey;
    }
    
    public void run() {
        try {
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            //創建exchange
            channel.exchangeDeclare(exchangeName, "direct", true, false, null);
            //發送持久化消息
            for(int i = 0;i < count;i++)
            {
                //第一個參數是exchangeName(默認情況下代理服務器端是存在一個""名字的exchange的,
                //因此如果不創建exchange的話我們可以直接將該參數設置成"",如果創建了exchange的話
                //我們需要將該參數設置成創建的exchange的名字),第二個參數是路由鍵
                channel.basicPublish(exchangeName, routingKey, true, MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"條消息").getBytes());
            }
            channel.addReturnListener(new ReturnListener() {
                
                @Override
                public void handleReturn(int arg0, String arg1, String arg2, String arg3, BasicProperties arg4, byte[] arg5)
                        throws IOException {
                    //此處便是執行Basic.Return之後回調的地方
                    String message = new String(arg5);
                    System.out.println("Basic.Return返回的結果:  "+message);
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

在設置了ReturnListener監聽器之後,broker(代理服務器)發出basic.return方法之後,就會回調第52行的handleReturn方法,在這個方法裏面我們就可以進行消息的重新發布操作啦;

測試2:設置mandatory標誌,且為exchange綁定隊列(路由鍵和綁定鍵一致)

public class ProducerTest {
    public static void main(String[] args) {
        String exchangeName = "confirmExchange";
        String queueName = "confirmQueue";
        String routingKey = "confirmRoutingKey";
        String bindingKey = "confirmRoutingKey";
        //String bindingKey = "confirmBindingKey";
        int count = 3;
        
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("172.16.151.74");
        factory.setUsername("test");
        factory.setPassword("test");
        factory.setPort(5672);
        
        //創建生產者
        Sender producer = new Sender(factory, count, exchangeName, queueName,routingKey,bindingKey);
        producer.run();
    }
}

class Sender
{
    private ConnectionFactory factory;
    private int count;
    private String exchangeName;
    private String     queueName;
    private String routingKey;
    private String bindingKey;
    
    public Sender(ConnectionFactory factory,int count,String exchangeName,String queueName,String routingKey,String bindingKey) {
        this.factory = factory;
        this.count = count;
        this.exchangeName = exchangeName;
        this.queueName = queueName;
        this.routingKey = routingKey;
        this.bindingKey = bindingKey;
    }
    
    public void run() {
        try {
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            //創建exchange
            channel.exchangeDeclare(exchangeName, "direct", true, false, null);
            //創建隊列
            channel.queueDeclare(queueName, true, false, false, null);
            //綁定exchange和queue
            channel.queueBind(queueName, exchangeName, bindingKey);
            //發送持久化消息
            for(int i = 0;i < count;i++)
            {
                //第一個參數是exchangeName(默認情況下代理服務器端是存在一個""名字的exchange的,
                //因此如果不創建exchange的話我們可以直接將該參數設置成"",如果創建了exchange的話
                //我們需要將該參數設置成創建的exchange的名字),第二個參數是路由鍵
                channel.basicPublish(exchangeName, routingKey, true, MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"條消息").getBytes());
            }
            channel.addReturnListener(new ReturnListener() {
                
                @Override
                public void handleReturn(int arg0, String arg1, String arg2, String arg3, BasicProperties arg4, byte[] arg5)
                        throws IOException {
                    //此處便是執行Basic.Return之後回調的地方
                    String message = new String(arg5);
                    System.out.println("Basic.Return返回的結果:  "+message);
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

通過抓包發現並不會有basic.return方法被調用,查看RabbitMQ管理界面發現消息已經到達了隊列;


技術分享

測試3:設置mandatory標誌,且exchange綁定隊列(路由鍵和綁定鍵不一致)

代碼就是把測試2中第6行註釋,第7行註釋打開,註意到此時的routingKey和bindingKey是不一致的,此時我們運行程序,同時抓包得到下面截圖:

技術分享

註意一點,我們發送了三條消息,那麽相應的應該執行三次basic.return,其中第一次和第二次basic.return顯示在一行上了,第三次是單獨一行,不要誤認為只執行了兩次,從協議的具體返回內容裏我們同樣看到了Reply-Text字段值是NO_ROUTE,這種現象在測試1中已經見過了;

到此,我們明白了mandatory標誌的作用:在消息沒有被路由到合適隊列情況下會將消息返還給消息發布者,同時我們測試了哪些情況下消息不會到達合適的隊列,測試1演示的是創建了exchange但是沒有為他綁定隊列導致的消息未到達合適隊列,測試3演示的是創建了exchange同時創建了queue,但是在將兩者綁定的時候,使用的bindingKey和消息發布者使用的rountingKey不一致導致的消息未到達合適隊列;

RabbitMQ(二):mandatory標誌的作用