1. 程式人生 > 其它 >RabbitMQ系列-- 路由

RabbitMQ系列-- 路由

技術標籤:中介軟體# RabbitMQ

在上一篇部落格中,我們詳細的介紹了廣播模型,在Fanout模式中,一條訊息會被所有訂閱的佇列都消費。但是,在某些場景下,我們希望不同的訊息被不同的佇列消費。這時就要用到Direct型別的Exchange(交換機)。

在Direct模型下

  • 佇列與交換機的繫結,不能是任意綁定了,而是要指定一個RoutingKey(路由key)
  • 訊息的傳送方在向Exchange傳送訊息時,也必須指定訊息的RoutingKey
  • Exchange不再把訊息交給每一個繫結的佇列,而是根據訊息的Routing Key進行判斷,只有佇列的RoutingKey與訊息的RoutingKey完全一致,才會接收到訊息

如下圖所示:
在這裡插入圖片描述

圖解:

  • P:生產者,向Exchange傳送訊息,傳送訊息時,會指定一個routingKey
  • X: Exchange(交換機),接收生產者的訊息,然後把訊息遞交給與routingkey完全匹配的佇列
  • C1:訊息者,其所在佇列指定了需要routing key為error的訊息
  • C2:消費者,其所在佇列指定了需要routing key為info、error、warning的訊息

程式碼實現

1.定義生產者


public class Provider {

    public static void main(String[] args) throws IOException {
        //獲取連線物件
Connection connection = RabbitMQUtils.getConnection(); //獲取通道 Channel channel = connection.createChannel(); //將通道宣告指定交換機 //引數1:交換機名稱 引數2:交換機的型別 direct 路由型別 //沒有交換機會建立一共名為logs的交換機 channel.exchangeDeclare("logs_direct","direct");
//路由Key String routerKey = "info"; //傳送訊息 channel.basicPublish("logs_direct",routerKey,null,("這是基於direct的 [ "+ routerKey+"]的訊息").getBytes()); //關閉連線和通道 RabbitMQUtils.closeChannelAndConnection(channel,connection); } }

在生產者程式碼,我們指定路由Key為info,也就是說,訊息者與佇列繫結的時候,需要指定routerKey為info,才會接收到生產者傳送的訊息

2.定義消費者1

public class Customer1 {
    public static void main(String[] args) throws IOException {
        //獲取連線物件
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        //通道繫結交換機
        channel.exchangeDeclare("logs_direct","direct");

        //建立一個臨時的、唯一的佇列
        //返回的是 臨時佇列名
        String queueName = channel.queueDeclare().getQueue();

        //繫結交換機和佇列
        //引數1: 佇列名稱  引數2:交換機名稱  引數3:路由名稱
        channel.queueBind(queueName,"logs_direct","info");
        channel.queueBind(queueName,"logs_direct","error");
        channel.queueBind(queueName,"logs_direct","warning");

        //消費訊息
        channel.basicConsume(queueName,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者1:"+new String(body));
            }
        });
    }
}

在消費者1中,綁定了路由Key為info、error、warning的訊息,也就是說,消費者1能接收生產者釋出路由Key為info、error、warning的訊息。

3.定義消費者2

public class Customer2 {
    public static void main(String[] args) throws IOException {
        //獲取連線物件
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        //通道繫結交換機
        channel.exchangeDeclare("logs_direct","direct");

        //建立一個臨時的、唯一的佇列
        //返回的是 臨時佇列名
        String queueName = channel.queueDeclare().getQueue();

        //繫結交換機和佇列
        //引數1: 佇列名稱  引數2:交換機名稱  引數3:路由名稱
        channel.queueBind(queueName,"logs_direct","error");

        //消費訊息
        channel.basicConsume(queueName,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者2:"+new String(body));
            }
        });
    }
}

在消費者2中,只能接收路由key為error的訊息。

程式碼驗證,先執行消費者1和消費者2,在執行生產者,可以發現,只有消費者1接收到了訊息,因為只有消費者1的路由Key和生產者的路由Key一致。
在這裡插入圖片描述

在這裡插入圖片描述

接下來,我們修改生產者釋出訊息的路由Key為error,也就是說,現在兩個消費者都能接收到訊息。

String routerKey = "error";

在這裡插入圖片描述
在這裡插入圖片描述