1. 程式人生 > >RabbitMQ 工作模式二

RabbitMQ 工作模式二

之前寫了WORKQUEUES 跟   Publish/Subscribe 倆種模式 ,RabbitMQ 工作模式一

Routing 工作模式

 

 

特點

      每個消費者監聽自己的佇列,並且設定routingkey

      生產者將訊息發給交換機,由交換機根據routingkey來轉發訊息到指定的佇列

說白了,就是在publish/subscribe 工作模式的基礎上加一層篩選,判斷

 if(佇列.routingkey == 生產者.routingkey)    
        傳送訊息給duilie

 

下面是我寫的生產者

 

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer03 {

    //佇列名稱
    private static final String QUEUE_INFORM_TEST1 = "queue_inform_test1";
    private static final String QUEUE_INFORM_TEST2 = "queue_inform_test2";
    public final static String  EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";

    public static void main(String[] args) throws Exception{


        ConnectionFactory factory = new ConnectionFactory();

        //設定ip
        factory.setHost("localhost");

        //設定埠
        factory.setPort(5672);

        //設定賬號密碼
        factory.setUsername("guest");  //預設賬號密碼都是guest
        factory.setPassword("guest");

        //設定虛擬空間
        factory.setVirtualHost("/");//虛擬機器預設的虛擬名稱為/ , 虛擬機器相當於一個獨立的伺服器

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();//建立一個通道


        /**
         * 定義交換機
         *  param1 :  交換機名稱
         *  param2 :   交換機型別
         *
         */
        channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);

        /**
         *    定義訊息佇列
         *
         *    String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
         *
         */


        channel.queueDeclare(QUEUE_INFORM_TEST1,true ,false ,false ,null);
        channel.queueDeclare(QUEUE_INFORM_TEST2,true ,false ,false ,null);

        //繫結交換機跟佇列
        /*
            String queue, String exchange, String routingKey
         */
        channel.queueBind(QUEUE_INFORM_TEST1,EXCHANGE_ROUTING_INFORM ,QUEUE_INFORM_TEST1);
        channel.queueBind(QUEUE_INFORM_TEST2,EXCHANGE_ROUTING_INFORM ,QUEUE_INFORM_TEST2);


        String message = "";

        //給佇列傳送訊息

        for (int i = 0; i < 9; i++) {

//
//            String exchange, String routingKey, BasicProperties props, byte[] body


            message = "人間有百媚千紅,唯你是我情之所鍾。第"+i+"條訊息";

            //給test1 傳送9條訊息
            channel.basicPublish(EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_TEST1 ,null ,message.getBytes() );
        }


        for (int i = 0; i < 5; i++) {

//
//            String exchange, String routingKey, BasicProperties props, byte[] body


            message = "你別回頭看我了,走吧,山高水長,可別再碰到我這麼喜歡你的人了。第"+i+"條訊息";

            //給test2 傳送5條訊息
            channel.basicPublish(EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_TEST2 ,null ,message.getBytes() );
        }
        
        
        channel.close();
        
        connection.close();
    }


}

 

設定了交換機名稱,及路由的交換機型別

channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);

--------------------------------------------------------------------------------------------------------------------------------------

 聲明瞭倆個佇列...分別為 test1, test2     

**
 * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
 *
 * param1: 佇列名稱
 * param2: 是否持久化
 * param3 : 是否獨佔此佇列
 * param4 : 佇列不用是否自動刪除
 *  param5 : 引數
 */
channel.queueDeclare(QUEUE_INFORM_Test1,true ,false ,false ,null );
 channel.queueDeclare(QUEUE_INFORM_Test2,true ,false ,false ,null );

 

------------------------------------------------------------------------------------------------------------

佇列和交換機繫結  

//交換機和佇列繫結
/**
 * String queue, String exchange, String routingKey
 * param1 :  佇列名稱
 * exchange :  交換機
 * routingKey : 給佇列新增一個 路由key,交換機發送訊息時根據填寫的路由key 來判斷,如果填寫的key 跟 佇列的路由key 相同,那麼就會發送訊息給此佇列 

 */

 

  channel.queueBind(QUEUE_INFORM_TEST1,EXCHANGE_ROUTING_INFORM ,QUEUE_INFORM_TEST1);
  channel.queueBind(QUEUE_INFORM_TEST2,EXCHANGE_ROUTING_INFORM ,QUEUE_INFORM_TEST2);

 

 

-------------------------------------------------------------------------

傳送訊息

/**
 * String exchange, String routingKey, BasicProperties props, byte[] body
 *
 *   param1  交換機名稱
 *   param2  根據key名稱將訊息轉發到具體的佇列,這裡填寫佇列名稱表示訊息將發到此佇列
 *   param3  引數
 *   param4 傳遞的字串
 *
 *
 */
 channel.basicPublish(EXCHANGE_FANOUT_INFORM,"" , null, message.getBytes());

 

 

``````````````````````````````````````````````````````````````````````````````````````````````````````````````

 

以上是生產者的程式碼 ,下面提供消費者的程式碼

 

 

import com.rabbitmq.client.*;

import java.io.IOException;

public class ConsumerTest1 {

    //佇列名稱
    private static final String QUEUE_INFORM_TEST1 = "queue_inform_test1";


    public static void main(String[] args){

        Connection connection = null;
        Channel channel = null;

        try {

            
            ConnectionFactory factory = new ConnectionFactory();

            //設定ip
            factory.setHost("localhost");

            //設定埠
            factory.setPort(5672);

            //設定賬號密碼
            factory.setUsername("guest");  //預設賬號密碼都是guest
            factory.setPassword("guest");

            //設定虛擬空間
            factory.setVirtualHost("/");//虛擬機器預設的虛擬名稱為/ , 虛擬機器相當於一個獨立的伺服器

            //建立與RabbitMQ服務的TCP連線
            connection = factory.newConnection();

            //建立連線通道 ,每個連線可以建立多個通道,每個通道只有一個會話
            channel = connection.createChannel();

          

//            建立預設消費方法
            DefaultConsumer consumer = new DefaultConsumer(channel) {

//                重寫監聽方法


                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {


                    System.out.println("receive message.."+new String(body,"utf-8"));
                }
            };


//            監聽佇列
            /**
             * String queue, boolean autoAck, Consumer callback
             *
             * param1 :  佇列名稱
             * param2 :   是否自動回覆,接收到訊息會自動恢復mq收到了,mq會刪除訊息,如果拒絕的話需要手動回覆,不回覆的話會導致mq不刪除被消費過的訊息,一直存在
             * param3 : 消費物件,,包含消費方法
             *
             */
            channel.basicConsume(QUEUE_INFORM_TEST1,true,consumer);
        }catch (Exception e){

        }
    }
}

 

 

消費者二 

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;

import com.rabbitmq.client.*;

import java.io.IOException;

public class ConsumerTest2 {

    private static final String QUEUE_INFORM_TEST2 = "queue_inform_test2";

    public static void main(String[] args){

        Connection connection = null;
        Channel channel = null;

        try {



            ConnectionFactory factory = new ConnectionFactory();

            //設定ip
            factory.setHost("localhost");

            //設定埠
            factory.setPort(5672);

            //設定賬號密碼
            factory.setUsername("guest");  //預設賬號密碼都是guest
            factory.setPassword("guest");

            //設定虛擬空間
            factory.setVirtualHost("/");//虛擬機器預設的虛擬名稱為/ , 虛擬機器相當於一個獨立的伺服器

            //建立與RabbitMQ服務的TCP連線
            connection = factory.newConnection();

            //建立連線通道 ,每個連線可以建立多個通道,每個通道只有一個會話
            channel = connection.createChannel();

           


//            建立預設消費方法
            DefaultConsumer consumer = new DefaultConsumer(channel) {

//                重寫監聽方法


                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {


                    System.out.println("receive message.."+new String(body,"utf-8"));
                }
            };


//            監聽佇列
            /**
             * String queue, boolean autoAck, Consumer callback
             *
             * param1 :  佇列名稱
             * param2 :   是否自動回覆,接收到訊息會自動恢復mq收到了,mq會刪除訊息,如果拒絕的話需要手動回覆,不回覆的話會導致mq不刪除被消費過的訊息,一直存在
             * param3 : 消費物件,,包含消費方法
             *
             */
            channel.basicConsume(QUEUE_INFORM_TEST2,true , consumer);
        }catch (Exception e){

        }
    }
}

 

 

消費者的程式碼其實很簡單,就是監聽而已.只需要指定一下監聽的佇列就行並提供一個 執行的方法 ,就是下面這句

channel.basicConsume(QUEUE_INFORM_TEST2,true , consumer);

 

測試

   先啟動producer,否則會報錯,因為consumer 中沒有宣告佇列,並且沒有中rabbitmq中發現佇列,就會丟擲異常 

 

   生產者 會給 test1 傳送  9條訊息   給test 傳送 5條訊息

 

  然後我們依次啟動倆個消費者 

  然後看處理結果

 

 

 

 

 

 

Topics 工作模式 

 

萬用字元工作模式

      每個消費者監聽自己的佇列,並且設定帶統配符的routingkey

      生產者將訊息發給broker,由交換機根據routingkey來轉發訊息到指定的佇列。

 

 

統配符規則:

                     符號#可以匹配多個詞,

                     符號*可以匹配一個詞語。

 

 

因為萬用字元感覺比較難講,所以我在網上找了一個充值的案例

 場景 

        使用者充值完成, email 的使用者 接收email的提示, sms的使用者接收sms的提示.. 設定兩種通知型別都接收的則兩種通知都有效。

 

大致思路 

設定 路由匹配規則     

  郵件的匹配規則   "inform.#.email.#"          sms的匹配規則   "inform.#.sms.#"

給只接收email的使用者 傳送訊息  路由設定     inform.email 

給只接收 sm的使用者  傳送訊息 路由設定   inform.sms 

給 都接收的使用者 傳送訊息                 inform.sms.email         不懂的話 看一下匹配規則,,有點取巧來著 

 

 

下面是我寫的生產者 

 

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer03 {

    //佇列名稱
    private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    public final static String  EXCHANGE_ROUTING_INFORM = "exchange_topic_inform";

    public static void main(String[] args) throws Exception{


        ConnectionFactory factory = new ConnectionFactory();

        //設定ip
        factory.setHost("localhost");

        //設定埠
        factory.setPort(5672);

        //設定賬號密碼
        factory.setUsername("guest");  //預設賬號密碼都是guest
        factory.setPassword("guest");

        //設定虛擬空間
        factory.setVirtualHost("/");//虛擬機器預設的虛擬名稱為/ , 虛擬機器相當於一個獨立的伺服器

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();//建立一個通道


        /**
         * 定義交換機
         *  param1 :  交換機名稱
         *  param2 :   交換機型別
         *
         */
        channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.TOPIC);

        /**
         *    定義訊息佇列
         *
         *    String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
         *
         */


        channel.queueDeclare(QUEUE_INFORM_EMAIL,true ,false ,false ,null);
        channel.queueDeclare(QUEUE_INFORM_SMS,true ,false ,false ,null);

        //繫結交換機跟佇列
        /*
            String queue, String exchange, String routingKey
         */
        channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM ,"inform.#.email.#");
        channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM ,"inform.#.sms.#");


        String message = "";

        //給接收郵件的人發郵件
        for (int i = 0; i < 9; i++) {

//
//            String exchange, String routingKey, BasicProperties props, byte[] body


            message = "記憶最讓人崩潰的地方,也許就在於它的猝不及防在某個祥和的午後,你正吃著火鍋唱著歌,那些尖利的記憶碎片就像潮水突然湧進你到腦海裡,讓你閃躲不及。";

            //給test1 傳送9條訊息
            channel.basicPublish(EXCHANGE_ROUTING_INFORM,"inform.email" ,null ,message.getBytes() );
        }

        //給接收簡訊的人發簡訊
        for (int i = 0; i < 5; i++) {

//
//            String exchange, String routingKey, BasicProperties props, byte[] body


            message = "傳送了一百條簡訊,九十九條都是你";

            //給test2 傳送5條訊息
            channel.basicPublish(EXCHANGE_ROUTING_INFORM,"inform.sms" ,null ,message.getBytes() );
        }

        
        
//        給都接收的人傳送 

        for (int i = 0; i < 3; i++) {
            message = "我回你是秒回,你回我是輪迴";

            //給test2 傳送5條訊息
            channel.basicPublish(EXCHANGE_ROUTING_INFORM,"inform.sms.email" ,null ,message.getBytes() );
        }
        

        channel.close();

        connection.close();
    }


}

 

-----------------------

 

 定義交換機,並設定交換機的型別為萬用字元模式

/**
 * 定義交換機
 *  param1 :  交換機名稱
 *  param2 :   交換機型別
 *
 */
channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.TOPIC);

 

---------------------------------------------------

繫結交換機跟佇列,並配置佇列的路由萬用字元規則

  channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM ,"inform.#.email.#");
  channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM ,"inform.#.sms.#");

 

 

--------------------------------------------------------------------------------------------

 

傳送訊息給broker 併發送路由key

 

/*** 引數明細

* 1、交換機名稱,不指令使用預設交換機名稱 Default Exchang

* 2、routingKey(路由key),根據key名稱將訊息轉發到具體的佇列,這裡填寫佇列名稱表示訊息將發到此佇列

* 3、訊息屬性

* 4、訊息內容*/

 

 channel.basicPublish(EXCHANGE_ROUTING_INFORM,"inform.email" ,null ,message.getBytes() );
 channel.basicPublish(EXCHANGE_ROUTING_INFORM,"inform.sms" ,null ,message.getBytes() );
channel.basicPublish(EXCHANGE_ROUTING_INFORM,"inform.sms.email" ,null ,message.getBytes());

 

 

 

 

訊息監聽者1

import com.rabbitmq.client.*;

import java.io.IOException;

public class ConsumerTest1 {

    //佇列名稱
    private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";

    public static void main(String[] args){

        Connection connection = null;
        Channel channel = null;

        try {


            ConnectionFactory factory = new ConnectionFactory();

            //設定ip
            factory.setHost("localhost");

            //設定埠
            factory.setPort(5672);

            //設定賬號密碼
            factory.setUsername("guest");  //預設賬號密碼都是guest
            factory.setPassword("guest");

            //設定虛擬空間
            factory.setVirtualHost("/");//虛擬機器預設的虛擬名稱為/ , 虛擬機器相當於一個獨立的伺服器

            //建立與RabbitMQ服務的TCP連線
            connection = factory.newConnection();

            //建立連線通道 ,每個連線可以建立多個通道,每個通道只有一個會話
            channel = connection.createChannel();



//            建立預設消費方法
            DefaultConsumer consumer = new DefaultConsumer(channel) {

//                重寫監聽方法


                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {


                    System.out.println("receive message.."+new String(body,"utf-8"));
                }
            };


//            監聽佇列
            /**
             * String queue, boolean autoAck, Consumer callback
             *
             * param1 :  佇列名稱
             * param2 :   是否自動回覆,接收到訊息會自動恢復mq收到了,mq會刪除訊息,如果拒絕的話需要手動回覆,不回覆的話會導致mq不刪除被消費過的訊息,一直存在
             * param3 : 消費物件,,包含消費方法
             *
             */
            channel.basicConsume(QUEUE_INFORM_EMAIL,true,consumer);
        }catch (Exception e){

        }
    }
}

 

訊息監聽二

 

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;

import com.rabbitmq.client.*;

import java.io.IOException;

public class ConsumerTest2 {

    private static final String QUEUE_INFORM_SMS= "queue_inform_sms";

    public static void main(String[] args){

        Connection connection = null;
        Channel channel = null;

        try {



            ConnectionFactory factory = new ConnectionFactory();

            //設定ip
            factory.setHost("localhost");

            //設定埠
            factory.setPort(5672);

            //設定賬號密碼
            factory.setUsername("guest");  //預設賬號密碼都是guest
            factory.setPassword("guest");

            //設定虛擬空間
            factory.setVirtualHost("/");//虛擬機器預設的虛擬名稱為/ , 虛擬機器相當於一個獨立的伺服器

            //建立與RabbitMQ服務的TCP連線
            connection = factory.newConnection();

            //建立連線通道 ,每個連線可以建立多個通道,每個通道只有一個會話
            channel = connection.createChannel();




//            建立預設消費方法
            DefaultConsumer consumer = new DefaultConsumer(channel) {

//                重寫監聽方法


                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {


                    System.out.println("receive message.."+new String(body,"utf-8"));
                }
            };


//            監聽佇列
            /**
             * String queue, boolean autoAck, Consumer callback
             *
             * param1 :  佇列名稱
             * param2 :   是否自動回覆,接收到訊息會自動恢復mq收到了,mq會刪除訊息,如果拒絕的話需要手動回覆,不回覆的話會導致mq不刪除被消費過的訊息,一直存在
             * param3 : 消費物件,,包含消費方法
             *
             */
            channel.basicConsume(QUEUE_INFORM_SMS,true , consumer);
        }catch (Exception e){

        }
    }
}

 

其實監聽者程式碼都不要變來著,都是通用的

 

測試 

    先啟動生產者,後啟動監聽者.....理由是  如果先啟動監聽者,佇列沒建立,就會報錯

 

測試結果

sms的列印臺列印結果

 

 

email 列印臺列印結果

 

 

 

其實搞懂了路由模式後這個就很容易理解這個....

 

 

Header模式

header 模式其實跟路由模式很像,他們不同的是header模式取消routingkey,使用header中的 key/value(鍵值對)匹配佇列

瞭解了很多不同的模式,其實你會發現,程式碼很多都是相同的....這裡我就不貼上全部程式碼,就只給不同的程式碼了 

 

繫結交換機跟佇列的程式碼

  HashMap<String, Object> header_email = new HashMap<>();
        header_email.put("inform_type", "cms");

        HashMap<String, Object> header_sms = new HashMap<>();
        header_sms.put("inform_type", "sms");

        channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM ,"",header_email);
        channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM ,"",header_sms);

 

 

傳送訊息的程式碼


            message = "記憶最讓人崩潰的地方,也許就在於它的猝不及防在某個祥和的午後,你正吃著火鍋唱著歌,那些尖利的記憶碎片就像潮水突然湧進你到腦海裡,讓你閃躲不及。";


            HashMap<String, Object> header = new HashMap<>();

            header.put("inform_type", "sms");  //匹配sms通知消費者繫結的header


            AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();

            properties.headers(header)
            //給test1 傳送9條訊息
            channel.basicPublish(EXCHANGE_ROUTING_INFORM,"" ,properties.build() ,message.getBytes() );

 

消費者都是一樣的,改一下佇列名稱就行