1. 程式人生 > 實用技巧 >RabbitMQ工作模式-Routing路由模式

RabbitMQ工作模式-Routing路由模式

RabbitMQ工作模式-Routing路由模式

路由模式: 1、每個消費者監聽自己的佇列,並且設定routingkey。 2、生產者將訊息發給交換機,由交換機根據routingkey來轉發訊息到指定的佇列,消費者佇列中的routingkey(佇列和交換機繫結時指定,可指定多個)只有匹配上才可以消費。

  • 生產者
package com.xuecheng.test.rabbitmq;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/** * @author Administrator * @version 1.0 * @create 2018-06-17 19:23 **/ public class Producer03_routing { //佇列名稱 private static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; private static final String QUEUE_INFORM_SMS = "queue_inform_sms"; private static final String EXCHANGE_ROUTING_INFORM="
exchange_routing_inform"; private static final String ROUTINGKEY_EMAIL="inform_email"; private static final String ROUTINGKEY_SMS="inform_sms"; public static void main(String[] args) { //通過連線工廠建立新的連線和mq建立連線 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(
"127.0.0.1"); connectionFactory.setPort(5672);// connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //設定虛擬機器,一個mq服務可以設定多個虛擬機器,每個虛擬機器就相當於一個獨立的mq connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //建立新連線 connection = connectionFactory.newConnection(); //建立會話通道,生產者和mq服務所有通訊都在channel通道中完成 channel = connection.createChannel(); //宣告佇列,如果佇列在mq 中沒有則要建立 //引數:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments /** * 引數明細 * 1、queue 佇列名稱 * 2、durable 是否持久化,如果持久化,mq重啟後佇列還在 * 3、exclusive 是否獨佔連線,佇列只允許在該連線中訪問,如果connection連線關閉佇列則自動刪除,如果將此引數設定true可用於臨時佇列的建立 * 4、autoDelete 自動刪除,佇列不再使用時是否自動刪除此佇列,如果將此引數和exclusive引數設定為true就可以實現臨時佇列(佇列不用了就自動刪除) * 5、arguments 引數,可以設定一個佇列的擴充套件引數,比如:可設定存活時間 */ channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null); channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null); //宣告一個交換機 //引數:String exchange, String type /** * 引數明細: * 1、交換機的名稱 * 2、交換機的型別 * fanout:對應的rabbitmq的工作模式是 publish/subscribe * direct:對應的Routing 工作模式 * topic:對應的Topics工作模式 * headers: 對應的headers工作模式 */ channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT); //進行交換機和佇列繫結 //引數:String queue, String exchange, String routingKey /** * 引數明細: * 1、queue 佇列名稱 * 2、exchange 交換機名稱 * 3、routingKey 路由key,作用是交換機根據路由key的值將訊息轉發到指定的佇列中,在釋出訂閱模式中調協為空字串 */ channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL); channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,"inform"); channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,ROUTINGKEY_SMS); channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,"inform"); //傳送訊息 //引數:String exchange, String routingKey, BasicProperties props, byte[] body /** * 引數明細: * 1、exchange,交換機,如果不指定將使用mq的預設交換機(設定為"") * 2、routingKey,路由key,交換機根據路由key來將訊息轉發到指定的佇列,如果使用預設交換機,routingKey設定為佇列的名稱 * 3、props,訊息的屬性 * 4、body,訊息內容 */ /* for(int i=0;i<5;i++){ //傳送訊息的時候指定routingKey String message = "send email inform message to user"; channel.basicPublish(EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL,null,message.getBytes()); System.out.println("send to mq "+message); } for(int i=0;i<5;i++){ //傳送訊息的時候指定routingKey String message = "send sms inform message to user"; channel.basicPublish(EXCHANGE_ROUTING_INFORM,ROUTINGKEY_SMS,null,message.getBytes()); System.out.println("send to mq "+message); }*/ for(int i=0;i<5;i++){ //傳送訊息的時候指定routingKey String message = "send inform message to user"; channel.basicPublish(EXCHANGE_ROUTING_INFORM,"inform",null,message.getBytes()); System.out.println("send to mq "+message); } } catch (Exception e) { e.printStackTrace(); } finally { //關閉連線 //先關閉通道 try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }

  • 消費者-email
package com.xuecheng.test.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Administrator
 * @version 1.0
 * @create 2018-06-17 18:22
 **/
public class Consumer03_routing_email {
    //佇列名稱
    private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform";
    private static final String ROUTINGKEY_EMAIL="inform_email";

    public static void main(String[] args) throws IOException, TimeoutException {
        //通過連線工廠建立新的連線和mq建立連線
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);//
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //設定虛擬機器,一個mq服務可以設定多個虛擬機器,每個虛擬機器就相當於一個獨立的mq
        connectionFactory.setVirtualHost("/");

        //建立新連線
        Connection connection = connectionFactory.newConnection();
        //建立會話通道,生產者和mq服務所有通訊都在channel通道中完成
        Channel channel = connection.createChannel();

        /**
         * 引數明細
         * 1、queue 佇列名稱
         * 2、durable 是否持久化,如果持久化,mq重啟後佇列還在
         * 3、exclusive 是否獨佔連線,佇列只允許在該連線中訪問,如果connection連線關閉佇列則自動刪除,如果將此引數設定true可用於臨時佇列的建立
         * 4、autoDelete 自動刪除,佇列不再使用時是否自動刪除此佇列,如果將此引數和exclusive引數設定為true就可以實現臨時佇列(佇列不用了就自動刪除)
         * 5、arguments 引數,可以設定一個佇列的擴充套件引數,比如:可設定存活時間
         */
        channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
        //宣告一個交換機
        //引數:String exchange, String type
        /**
         * 引數明細:
         * 1、交換機的名稱
         * 2、交換機的型別
         * fanout:對應的rabbitmq的工作模式是 publish/subscribe
         * direct:對應的Routing    工作模式
         * topic:對應的Topics工作模式
         * headers: 對應的headers工作模式
         */
        channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
        //進行交換機和佇列繫結
        //引數:String queue, String exchange, String routingKey
        /**
         * 引數明細:
         * 1、queue 佇列名稱
         * 2、exchange 交換機名稱
         * 3、routingKey 路由key,作用是交換機根據路由key的值將訊息轉發到指定的佇列中,在釋出訂閱模式中調協為空字串
         */
        channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL);

        //實現消費方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){

            /**
             * 當接收到訊息後此方法將被呼叫
             * @param consumerTag  消費者標籤,用來標識消費者的,在監聽佇列時設定channel.basicConsume
             * @param envelope 信封,通過envelope
             * @param properties 訊息屬性
             * @param body 訊息內容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //交換機
                String exchange = envelope.getExchange();
                //訊息id,mq在channel中用來標識訊息的id,可用於確認訊息已接收
                long deliveryTag = envelope.getDeliveryTag();
                //訊息內容
                String message= new String(body,"utf-8");
                System.out.println("receive message:"+message);
            }
        };

        //監聽佇列
        //引數:String queue, boolean autoAck, Consumer callback
        /**
         * 引數明細:
         * 1、queue 佇列名稱
         * 2、autoAck 自動回覆,當消費者接收到訊息後要告訴mq訊息已接收,如果將此引數設定為tru表示會自動回覆mq,如果設定為false要通過程式設計實現回覆
         * 3、callback,消費方法,當消費者接收到訊息要執行的方法
         */
        channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);

    }
}

  • 消費者sms:
package com.xuecheng.test.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Administrator
 * @version 1.0
 * @create 2018-06-17 18:22
 **/
public class Consumer03_routing_sms {
    //佇列名稱
    private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform";
    private static final String ROUTINGKEY_SMS="inform_sms";

    public static void main(String[] args) throws IOException, TimeoutException {
        //通過連線工廠建立新的連線和mq建立連線
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);//
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //設定虛擬機器,一個mq服務可以設定多個虛擬機器,每個虛擬機器就相當於一個獨立的mq
        connectionFactory.setVirtualHost("/");

        //建立新連線
        Connection connection = connectionFactory.newConnection();
        //建立會話通道,生產者和mq服務所有通訊都在channel通道中完成
        Channel channel = connection.createChannel();

        /**
         * 引數明細
         * 1、queue 佇列名稱
         * 2、durable 是否持久化,如果持久化,mq重啟後佇列還在
         * 3、exclusive 是否獨佔連線,佇列只允許在該連線中訪問,如果connection連線關閉佇列則自動刪除,如果將此引數設定true可用於臨時佇列的建立
         * 4、autoDelete 自動刪除,佇列不再使用時是否自動刪除此佇列,如果將此引數和exclusive引數設定為true就可以實現臨時佇列(佇列不用了就自動刪除)
         * 5、arguments 引數,可以設定一個佇列的擴充套件引數,比如:可設定存活時間
         */
        channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
        //宣告一個交換機
        //引數:String exchange, String type
        /**
         * 引數明細:
         * 1、交換機的名稱
         * 2、交換機的型別
         * fanout:對應的rabbitmq的工作模式是 publish/subscribe
         * direct:對應的Routing    工作模式
         * topic:對應的Topics工作模式
         * headers: 對應的headers工作模式
         */
        channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
        //進行交換機和佇列繫結
        //引數:String queue, String exchange, String routingKey
        /**
         * 引數明細:
         * 1、queue 佇列名稱
         * 2、exchange 交換機名稱
         * 3、routingKey 路由key,作用是交換機根據路由key的值將訊息轉發到指定的佇列中,在釋出訂閱模式中調協為空字串
         */
        channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_ROUTING_INFORM,ROUTINGKEY_SMS);

        //實現消費方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){

            /**
             * 當接收到訊息後此方法將被呼叫
             * @param consumerTag  消費者標籤,用來標識消費者的,在監聽佇列時設定channel.basicConsume
             * @param envelope 信封,通過envelope
             * @param properties 訊息屬性
             * @param body 訊息內容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //交換機
                String exchange = envelope.getExchange();
                //訊息id,mq在channel中用來標識訊息的id,可用於確認訊息已接收
                long deliveryTag = envelope.getDeliveryTag();
                //訊息內容
                String message= new String(body,"utf-8");
                System.out.println("receive message:"+message);
            }
        };

        //監聽佇列
        //引數:String queue, boolean autoAck, Consumer callback
        /**
         * 引數明細:
         * 1、queue 佇列名稱
         * 2、autoAck 自動回覆,當消費者接收到訊息後要告訴mq訊息已接收,如果將此引數設定為tru表示會自動回覆mq,如果設定為false要通過程式設計實現回覆
         * 3、callback,消費方法,當消費者接收到訊息要執行的方法
         */
        channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer);

    }
}


經過測試 路由模式具備釋出訂閱模式,即當兩個佇列在繫結交換機的時候配置了相同的routingKey,兩者都會接受相同的訊息