1. 程式人生 > 其它 >RabbitMQ之路由模式

RabbitMQ之路由模式

RabbitMQ之路由模式

概念

簡單來說就是控制消費者拿到特定條件的訊息
比如一個情景:生產者生產日誌訊息 然後低級別的日誌交給一號消費者處理 嚴重的交給二號消費者處理

簡單例子

生產者程式碼和之前的訂閱模式 區別在於交換機模式改為DIRECT 同時要給出routekey 即判斷的標準
然後生成訊息傳送時需要給出routekey

public class RouteProducer {
    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.198.129");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            //建立連線
            connection = connectionFactory.newConnection("生產者");
            //獲取通道
            channel = connection.createChannel();
            //建立交換機以及兩個佇列 同時繫結關係
            String exchangeName = "test_direct";
            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,false,false,false,null);
            String queue1Name = "test_direct_queue1";
            String queue2Name = "test_direct_queue2";
            channel.queueDeclare(queue1Name,false,false,false,null);
            channel.queueDeclare(queue2Name,false,false,false,null);
            //繫結關係 第三個引數為routingKey 繫結規則 fanout使用""
            channel.queueBind(queue1Name,exchangeName,"error");

            channel.queueBind(queue2Name,exchangeName,"info");
            channel.queueBind(queue2Name,exchangeName,"error");
            channel.queueBind(queue2Name,exchangeName,"warning");
            //測試傳送兩條資訊 給出對應的routeKey
            String message = "error訊息測試";
            channel.basicPublish(exchangeName,"error",null,message.getBytes(StandardCharsets.UTF_8));
            message = "warning訊息測試";
            channel.basicPublish(exchangeName,"warning",null,message.getBytes(StandardCharsets.UTF_8));
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        finally {
            //關閉通道
            if(channel != null && channel.isOpen()){
                try {
                    channel.close();
                }
                catch (Exception e){
                    e.printStackTrace();
                }
            }

            if(connection != null && connection.isOpen()){
                try {
                    connection.close();
                }
                catch (Exception e){
                    e.printStackTrace();
                }
            }
        }
    }
}

消費者這邊寫法沒改變 佇列名字換一下即可

public class RouteConsumer {
    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.198.129");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            //建立連線
            connection = connectionFactory.newConnection("消費者");
            //獲取通道
            channel = connection.createChannel();
            //通過通道宣告佇列,建立交換機等一系列事情
            channel.basicConsume("test_direct_queue1", true, new DeliverCallback() {
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
                    System.out.println("1號消費者接受到的訊息為 " + new String(delivery.getBody(), "UTF-8"));
                }
            }, new CancelCallback() {
                @Override
                public void handle(String s) throws IOException {
                    System.out.println("收取訊息失敗");
                }
            });
            //卡一下
            System.out.println("鍵盤輸入關閉消費者");
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        finally {
            //關閉通道
            if(channel != null && channel.isOpen()){
                try {
                    channel.close();
                }
                catch (Exception e){
                    e.printStackTrace();
                }
            }

            if(connection != null && connection.isOpen()){
                try {
                    connection.close();
                }
                catch (Exception e){
                    e.printStackTrace();
                }
            }
        }
    }
}

執行程式:
可以看到消費者只能拿到對應routekey規則的資訊