1. 程式人生 > 其它 >【夏目鬼鬼分享】RabbitMQ路由模式

【夏目鬼鬼分享】RabbitMQ路由模式

技術標籤:rabbitmq

路由模式

路由模式是可以根據路由鍵選擇性給多個消費者傳送訊息的模式,它包含一個生產者、兩個消費者、兩個佇列和一個交換機。兩個消費者同時繫結到不同的佇列上去,兩個佇列通過路由鍵繫結到交換機上去,生產者傳送訊息到交換機,交換機通過路由鍵轉發到不同佇列,佇列繫結的消費者接收並消費訊息。

生產者

/**
 * Created by wzy on 2020/12/7
 * 訊息傳送者
 */
public class RouteSender {
    private final static String EXCHANGE_NAME = "myRoute";
    private final static String ROUTE_KEY = "key2";
    private final static String EXCHANGE_TYPE = "direct";

    public static void main (String[] arsg) throws Exception{
        //獲取連線
        Connection connection = ConnectionUtil.getConnection();
        //建立通道
        Channel channel =connection.createChannel();
        //宣告交換機
        channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE);//direct路由模式
        //傳送內容
        channel.basicPublish(EXCHANGE_NAME,ROUTE_KEY,null,("傳送"+ROUTE_KEY+"訊息").getBytes());
        //關閉連線
        channel.close();
        connection.close();
    }
}

消費者1

/**
 * Created by wzy on 2020/12/7
 * 訊息接收者1
 */
public class RouteRecver1 {

    private final static String QUEUE = "routequeue1";
    private final static String EXCHANGE_NAME = "myRoute";

    public static void main (String[] arsg) throws Exception{
        //獲取連線
        Connection connection = ConnectionUtil.getConnection();
        //建立通道
        Channel channel =connection.createChannel();
        //宣告佇列
        channel.queueDeclare(QUEUE,false,false,false,null);
        channel.basicQos(1);//執行的快就多幹,執行的慢就少幹
        //將交換器與佇列繫結
        channel.queueBind(QUEUE,EXCHANGE_NAME,"key1");
        channel.queueBind(QUEUE,EXCHANGE_NAME,"key3");
        //定義接收者
        boolean autoAck = false;
        channel.basicConsume(QUEUE, autoAck, "myConsumerTag",
                new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        System.err.println("1body: " + new String(body));
                        //確認,false為確認收到訊息,true為拒絕收到訊息
                        channel.basicAck(envelope.getDeliveryTag(),false);
                    }
                });

    }
}

消費者2

/**
 * Created by wzy on 2020/12/7
 * 訊息接收者2
 */
public class RouteRecver2 {

    private final static String QUEUE = "routequeue2";
    private final static String EXCHANGE_NAME = "myRoute";

    public static void main (String[] arsg) throws Exception{
        //獲取連線
        Connection connection = ConnectionUtil.getConnection();
        //建立通道
        Channel channel =connection.createChannel();
        //宣告佇列
        channel.queueDeclare(QUEUE,false,false,false,null);
        channel.basicQos(1);//執行的快就多幹,執行的慢就少幹
        //將交換器與佇列繫結
        channel.queueBind(QUEUE,EXCHANGE_NAME,"key1");
        channel.queueBind(QUEUE,EXCHANGE_NAME,"key2");
        //定義接收者
        boolean autoAck = false;
        channel.basicConsume(QUEUE, autoAck, "myConsumerTag",
                new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        System.err.println("2body: " + new String(body));
                        //確認,false為確認收到訊息,true為拒絕收到訊息
                        channel.basicAck(envelope.getDeliveryTag(),false);
                    }
                });

    }
}

執行生產者後,消費者1和消費者2都收到訊息

修改路由key為了key2後,重新執行生產者後,只有消費者2收到訊息