1. 程式人生 > >RabbitMQ的六種工作模式總結

RabbitMQ的六種工作模式總結

最近學習RabbitMQ的使用方式,記錄下來,方便以後使用,也方便和大家共享,相互交流。

RabbitMQ的六種工作模式:

1、Work queues
2、Publish/subscribe
3、Routing
4、Topics
5、Header 模式
6、RPC

一、Work queues

多個消費端消費同一個佇列中的訊息,佇列採用輪詢的方式將訊息是平均傳送給消費者;

 

 特點:

1、一條訊息只會被一個消費端接收;

2、佇列採用輪詢的方式將訊息是平均傳送給消費者的;

3、消費者在處理完某條訊息後,才會收到下一條訊息

生產端:

1、宣告佇列

2、建立連線

3、建立通道

4、通道宣告佇列

5、制定訊息

6、傳送訊息,使用預設交換機

消費端:

1、宣告佇列

2、建立連線

3、建立通道

4、通道宣告佇列

5、重寫訊息消費方法

6、執行訊息方法

新建兩個maven工程,生產訊息的生產端,消費訊息的消費端;

pom.xml檔案中依賴座標如下:

<dependencies>
    <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-logging</artifactId>
            <version>2.1.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.7.0</version>
        </dependency>
</dependencies>

 生產端的程式碼如下:

package com.xyfer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*
1、宣告佇列
2、建立連線
3、建立通道
4、通道宣告佇列
5、制定訊息
6、傳送訊息,使用預設交換機
*/
public class Producer02 {
    //宣告佇列
    private static final String QUEUE ="queue";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");//mq服務ip地址
            connectionFactory.setPort(5672);//mq client連線埠
            connectionFactory.setUsername("guest");//mq登入使用者名稱
            connectionFactory.setPassword("guest");//mq登入密碼
            connectionFactory.setVirtualHost("/");//rabbitmq預設虛擬機器名稱為“/”,虛擬機器相當於一個獨立的mq伺服器
            //建立與RabbitMQ服務的TCP連線
            connection = connectionFactory.newConnection();
            //建立與Exchange的通道,每個連線可以建立多個通道,每個通道代表一個會話任務
            channel = connection.createChannel();

            //通道繫結佇列
            /**
             * 宣告佇列,如果Rabbit中沒有此佇列將自動建立
             * param1:佇列名稱
             * param2:是否持久化
             * param3:佇列是否獨佔此連線
             * param4:佇列不再使用時是否自動刪除此佇列
             * param5:佇列引數
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE,true,false,false,null);//通道繫結郵件佇列

            for(int i = 0;i<10;i++){
                String message = new String("mq 傳送訊息。。。");
                /**
                  * 訊息釋出方法
                  * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange
                  * param2:routingKey,訊息的路由Key,是用於Exchange(交換機)將訊息轉發到指定的訊息佇列
                  * param3:訊息包含的屬性
                  * param4:訊息體
                  * 這裡沒有指定交換機,訊息將傳送給預設交換機,每個佇列也會繫結那個預設的交換機,但是不能顯示繫結或解除繫結
                  * 預設的交換機,routingKey等於佇列名稱
                 */
                //String exchange, String routingKey, BasicProperties props, byte[] body
                channel.basicPublish("",QUEUE,null,message.getBytes("utf-8"));
                System.out.println("mq訊息傳送成功!");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

消費端的程式碼如下:

package com.xyfer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*
1、宣告佇列
2、建立連線
3、建立通道
4、通道宣告佇列
5、重寫訊息消費方法
6、執行訊息方法
*/
public class Consumer02 {
    private static final String QUEUE ="queue";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            //通道繫結佇列
            /**
             * 宣告佇列,如果Rabbit中沒有此佇列將自動建立
             * param1:佇列名稱
             * param2:是否持久化
             * param3:佇列是否獨佔此連線
             * param4:佇列不再使用時是否自動刪除此佇列
             * param5:佇列引數
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE,true,false,false,null);//通道繫結郵件佇列

            //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                /**
                  * 消費者接收訊息呼叫此方法
                  * @param consumerTag 消費者的標籤,在channel.basicConsume()去指定
                  * @param envelope 訊息包的內容,可從中獲取訊息id,訊息routingkey,交換機,訊息和重傳標誌
                    (收到訊息失敗後是否需要重新發送)
                  * @param properties
                  * @param body
                  * @throws IOException
                 * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //交換機
                    String exchange = envelope.getExchange();
                    //路由key
                    String routingKey = envelope.getRoutingKey();
                    envelope.getDeliveryTag();
                    String msg = new String(body,"utf-8");
                    System.out.println("mq收到的訊息是:"+msg );
                }

            };
            System.out.println("消費者啟動成功!");
            channel.basicConsume(QUEUE,true,consumer);

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

生產端啟動後,控制檯列印資訊如下:

 RabbitMQ中的已有訊息:

 queue中的訊息正是生產端傳送的訊息:

 二、Publish/subscribe 模式

這種模式又稱為釋出訂閱模式,相對於Work queues模式,該模式多了一個交換機,生產端先把訊息傳送到交換機,再由交換機把訊息傳送到繫結的佇列中,每個繫結的佇列都能收到由生產端傳送的訊息。

釋出訂閱模式:

1、每個消費者監聽自己的佇列;

2、生產者將訊息發給broker,由交換機將訊息轉發到繫結此交換機的每個佇列,每個繫結交換機的佇列都將接收
到訊息

應用場景:使用者通知,當用戶充值成功或轉賬完成系統通知使用者,通知方式有簡訊、郵件多種方法;

生產端:

1、宣告佇列,宣告交換機

2、建立連線

3、建立通道

4、通道宣告交換機

5、通道宣告佇列

6、通過通道使佇列繫結到交換機

7、制定訊息

8、傳送訊息

消費端:

1、宣告佇列,宣告交換機

2、建立連線

3、建立通道

4、通道宣告交換機

5、通道宣告佇列

6、通過通道使佇列繫結到交換機

7、重寫訊息消費方法

8、執行訊息方法

Publish/subscribe 模式繫結兩個消費端,因此需要有兩個消費端,一個郵件消費端,一個簡訊消費端;

生產端的程式碼如下:

package com.xyfer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer01 {
    //宣告兩個佇列和一個交換機
    //Publish/subscribe釋出訂閱模式
    private static final String QUEUE_EMAIL ="queueEmail";
    private static final String QUEUE_SMS ="queueSms";
    private static final String EXCHANGE = "messageChange";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");//mq服務ip地址
            connectionFactory.setPort(5672);//mq client連線埠
            connectionFactory.setUsername("guest");//mq登入使用者名稱
            connectionFactory.setPassword("guest");//mq登入密碼
            connectionFactory.setVirtualHost("/");//rabbitmq預設虛擬機器名稱為“/”,虛擬機器相當於一個獨立的mq伺服器
            //建立與RabbitMQ服務的TCP連線
            connection = connectionFactory.newConnection();
            //建立與Exchange的通道,每個連線可以建立多個通道,每個通道代表一個會話任務
            channel = connection.createChannel();
            //通道繫結交換機
            /**
              * 引數明細
              * 1、交換機名稱
              * 2、交換機型別,fanout、topic、direct、headers
              */
            //Publish/subscribe釋出訂閱模式
            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT);
            //通道繫結佇列
            /**
             * 宣告佇列,如果Rabbit中沒有此佇列將自動建立
             * param1:佇列名稱
             * param2:是否持久化
             * param3:佇列是否獨佔此連線
             * param4:佇列不再使用時是否自動刪除此佇列
             * param5:佇列引數
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道繫結郵件佇列
            channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道繫結簡訊佇列
            //交換機和佇列繫結
            /**
             * 引數明細
             * 1、佇列名稱
             * 2、交換機名稱
             * 3、路由key
             */
            //Publish/subscribe釋出訂閱模式
            channel.queueBind(QUEUE_EMAIL,EXCHANGE,"");
            channel.queueBind(QUEUE_SMS,EXCHANGE,"");
            for(int i = 0;i<10;i++){
                String message = new String("mq 傳送訊息。。。");
                /**
                  * 訊息釋出方法
                  * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange
                  * param2:routingKey,訊息的路由Key,是用於Exchange(交換機)將訊息轉發到指定的訊息佇列
                  * param3:訊息包含的屬性
                  * param4:訊息體
                  * 這裡沒有指定交換機,訊息將傳送給預設交換機,每個佇列也會繫結那個預設的交換機,但是不能顯示繫結或解除繫結
                  * 預設的交換機,routingKey等於佇列名稱
                 */
                //String exchange, String routingKey, BasicProperties props, byte[] body
                //Publish/subscribe釋出訂閱模式
                channel.basicPublish(EXCHANGE,"",null,message.getBytes());
                System.out.println("mq訊息傳送成功!");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

郵件消費端的程式碼如下:

package com.xyfer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer01 {
    //Publish/subscribe釋出訂閱模式
    private static final String QUEUE_EMAIL ="queueEmail";
    private static final String EXCHANGE = "messageChange";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            //通道繫結交換機
            /**
              * 引數明細
              * 1、交換機名稱
              * 2、交換機型別,fanout、topic、direct、headers
              */
            //Publish/subscribe釋出訂閱模式
            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT);
            //通道繫結佇列
            /**
             * 宣告佇列,如果Rabbit中沒有此佇列將自動建立
             * param1:佇列名稱
             * param2:是否持久化
             * param3:佇列是否獨佔此連線
             * param4:佇列不再使用時是否自動刪除此佇列
             * param5:佇列引數
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道繫結郵件佇列
            //交換機和佇列繫結
            /**
             * 引數明細
             * 1、佇列名稱
             * 2、交換機名稱
             * 3、路由key
             */
            //Publish/subscribe釋出訂閱模式
            channel.queueBind(QUEUE_EMAIL,EXCHANGE,"");
            //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
            DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
              * 消費者接收訊息呼叫此方法
              * @param consumerTag 消費者的標籤,在channel.basicConsume()去指定
              * @param envelope 訊息包的內容,可從中獲取訊息id,訊息routingkey,交換機,訊息和重傳標誌
                (收到訊息失敗後是否需要重新發送)
              * @param properties
              * @param body
              * @throws IOException
              * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
              */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //交換機
                String exchange = envelope.getExchange();
                //路由key
                String routingKey = envelope.getRoutingKey();
                envelope.getDeliveryTag();
                String msg = new String(body,"utf-8");
                System.out.println("mq收到的訊息是:"+msg );
            }
            };
            System.out.println("消費者啟動成功!");
            channel.basicConsume(QUEUE_EMAIL,true,consumer);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

簡訊消費端的程式碼如下:

package xyfer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer01 {
    //Publish/subscribe釋出訂閱模式
    private static final String QUEUE_SMS ="queueSms";
    private static final String EXCHANGE = "messageChange";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            //通道繫結交換機
            /**
              * 引數明細
              * 1、交換機名稱
              * 2、交換機型別,fanout、topic、direct、headers
              */
            //Publish/subscribe釋出訂閱模式
            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT);
            //通道繫結佇列
            /**
             * 宣告佇列,如果Rabbit中沒有此佇列將自動建立
             * param1:佇列名稱
             * param2:是否持久化
             * param3:佇列是否獨佔此連線
             * param4:佇列不再使用時是否自動刪除此佇列
             * param5:佇列引數
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道繫結簡訊佇列
            //交換機和佇列繫結
            /**
             * 引數明細
             * 1、佇列名稱
             * 2、交換機名稱
             * 3、路由key
             */
            //Publish/subscribe釋出訂閱模式
            channel.queueBind(QUEUE_SMS,EXCHANGE,"");
            DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
              * 消費者接收訊息呼叫此方法
              * @param consumerTag 消費者的標籤,在channel.basicConsume()去指定
              * @param envelope 訊息包的內容,可從中獲取訊息id,訊息routingkey,交換機,訊息和重傳標誌
                (收到訊息失敗後是否需要重新發送)
              * @param properties
              * @param body
              * @throws IOException
              * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
              */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //交換機
                String exchange = envelope.getExchange();
                //路由key
                String routingKey = envelope.getRoutingKey();
                envelope.getDeliveryTag();
                String msg = new String(body,"utf-8");
                System.out.println("mq收到的訊息是:"+msg );
            }

            };
            System.out.println("消費者啟動成功!");
            channel.basicConsume(QUEUE_SMS,true,consumer);

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

三、Routing 路由模式

Routing 模式又稱路由模式,該種模式除了要繫結交換機外,發訊息的時候還要制定routing key,即路由key,佇列通過通道繫結交換機的時候,需要指定自己的routing key,這樣,生產端傳送訊息的時候也會指定routing key,通過routing key就可以把相應的訊息傳送到繫結相應routing key的佇列中去。

路由模式:

1、每個消費者監聽自己的佇列,並且設定routingkey;
2、生產者將訊息發給交換機,由交換機根據routingkey來轉發訊息到指定的佇列;

應用場景:使用者通知,當用戶充值成功或轉賬完成系統通知使用者,通知方式有簡訊、郵件多種方法;

生產端:

1、宣告佇列,宣告交換機

2、建立連線

3、建立通道

4、通道宣告交換機

5、通道宣告佇列

6、通過通道使佇列繫結到交換機並指定該佇列的routingkey

7、制定訊息

8、傳送訊息並指定routingkey

消費端:

1、宣告佇列,宣告交換機

2、建立連線

3、建立通道

4、通道宣告交換機

5、通道宣告佇列

6、通過通道使佇列繫結到交換機並指定routingkey

7、重寫訊息消費方法

8、執行訊息方法

按照假設的應用場景,同樣,Routing 路由模式也是一個生產端,兩個消費端,所不同的是,宣告交換機的型別不同,佇列繫結交換機的時候需要指定Routing key,傳送訊息的時候也需要指定Routing key,這樣根據Routing key就能把相應的訊息傳送到相應的佇列中去。

生產端的程式碼如下:

package com.xyfer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer03 {
    //宣告兩個佇列和一個交換機
    //Routing 路由模式
    private static final String QUEUE_EMAIL ="queueEmail";
    private static final String QUEUE_SMS ="queueSms";
    private static final String EXCHANGE = "messageChange";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");//mq服務ip地址
            connectionFactory.setPort(5672);//mq client連線埠
            connectionFactory.setUsername("guest");//mq登入使用者名稱
            connectionFactory.setPassword("guest");//mq登入密碼
            connectionFactory.setVirtualHost("/");//rabbitmq預設虛擬機器名稱為“/”,虛擬機器相當於一個獨立的mq伺服器
            //建立與RabbitMQ服務的TCP連線
            connection = connectionFactory.newConnection();
            //建立與Exchange的通道,每個連線可以建立多個通道,每個通道代表一個會話任務
            channel = connection.createChannel();
            //通道繫結交換機
            /**
             * 引數明細
             * 1、交換機名稱
             * 2、交換機型別,fanout、topic、direct、headers
             */
            //Routing 路由模式
            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);
            //通道繫結佇列
            /**
             * 宣告佇列,如果Rabbit中沒有此佇列將自動建立
             * param1:佇列名稱
             * param2:是否持久化
             * param3:佇列是否獨佔此連線
             * param4:佇列不再使用時是否自動刪除此佇列
             * param5:佇列引數
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道繫結郵件佇列
            channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道繫結簡訊佇列
            //交換機和佇列繫結
            /**
             * 引數明細
             * 1、佇列名稱
             * 2、交換機名稱
             * 3、路由key
             */
            //Routing 路由模式
            channel.queueBind(QUEUE_EMAIL,EXCHANGE,QUEUE_EMAIL);
            channel.queueBind(QUEUE_SMS,EXCHANGE,QUEUE_SMS);
            //給email佇列發訊息
            for(int i = 0;i<10;i++){
                String message = new String("mq 傳送email訊息。。。");
                /**
                  * 訊息釋出方法
                  * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange
                  * param2:routingKey,訊息的路由Key,是用於Exchange(交換機)將訊息轉發到指定的訊息佇列
                  * param3:訊息包含的屬性
                  * param4:訊息體
                  * 這裡沒有指定交換機,訊息將傳送給預設交換機,每個佇列也會繫結那個預設的交換機,但是不能顯示繫結或解除繫結
                  * 預設的交換機,routingKey等於佇列名稱
                 */
                //String exchange, String routingKey, BasicProperties props, byte[] body
                //Routing 路由模式
                channel.basicPublish(EXCHANGE,QUEUE_EMAIL,null,message.getBytes());
                System.out.println("mq訊息傳送成功!");
            }
            //給sms佇列發訊息
            for(int i = 0;i<10;i++){
                String message = new String("mq 傳送sms訊息。。。");
                /**
                  * 訊息釋出方法
                  * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange
                  * param2:routingKey,訊息的路由Key,是用於Exchange(交換機)將訊息轉發到指定的訊息佇列
                  * param3:訊息包含的屬性
                  * param4:訊息體
                  * 這裡沒有指定交換機,訊息將傳送給預設交換機,每個佇列也會繫結那個預設的交換機,但是不能顯示繫結或解除繫結
                  * 預設的交換機,routingKey等於佇列名稱
                 */
                //String exchange, String routingKey, BasicProperties props, byte[] body
                //Routing 路由模式
                channel.basicPublish(EXCHANGE,QUEUE_SMS,null,message.getBytes());
                System.out.println("mq訊息傳送成功!");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

郵件消費端的程式碼如下:

package com.xyfer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer03 {
    //Routing 路由模式
    private static final String QUEUE_EMAIL ="queueEmail";
    private static final String EXCHANGE = "messageChange";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            //通道繫結交換機
            /**
              * 引數明細
              * 1、交換機名稱
              * 2、交換機型別,fanout、topic、direct、headers
              */
            //Routing 路由模式
            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);
            //通道繫結佇列
            /**
             * 宣告佇列,如果Rabbit中沒有此佇列將自動建立
             * param1:佇列名稱
             * param2:是否持久化
             * param3:佇列是否獨佔此連線
             * param4:佇列不再使用時是否自動刪除此佇列
             * param5:佇列引數
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道繫結郵件佇列
            //交換機和佇列繫結
            /**
             * 引數明細
             * 1、佇列名稱
             * 2、交換機名稱
             * 3、路由key
             */
            //Routing 路由模式
            channel.queueBind(QUEUE_EMAIL,EXCHANGE,QUEUE_EMAIL);
            //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                /**
                  * 消費者接收訊息呼叫此方法
                  * @param consumerTag 消費者的標籤,在channel.basicConsume()去指定
                  * @param envelope 訊息包的內容,可從中獲取訊息id,訊息routingkey,交換機,訊息和重傳標誌
                    (收到訊息失敗後是否需要重新發送)
                  * @param properties
                  * @param body
                  * @throws IOException
                 * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //交換機
                    String exchange = envelope.getExchange();
                    //路由key
                    String routingKey = envelope.getRoutingKey();
                    envelope.getDeliveryTag();
                    String msg = new String(body,"utf-8");
                    System.out.println("mq收到的訊息是:"+msg );
                }

            };
            System.out.println("消費者啟動成功!");
            channel.basicConsume(QUEUE_EMAIL,true,consumer);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

簡訊消費端的程式碼如下:

package xyfer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer03 {
    //Routing 路由模式
    private static final String QUEUE_SMS ="queueSms";
    private static final String EXCHANGE = "messageChange";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            //通道繫結交換機
            /**
              * 引數明細
              * 1、交換機名稱
              * 2、交換機型別,fanout、topic、direct、headers
              */
            //Routing 路由模式
            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);
            //通道繫結佇列
            /**
             * 宣告佇列,如果Rabbit中沒有此佇列將自動建立
             * param1:佇列名稱
             * param2:是否持久化
             * param3:佇列是否獨佔此連線
             * param4:佇列不再使用時是否自動刪除此佇列
             * param5:佇列引數
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道繫結簡訊佇列
            //交換機和佇列繫結
            /**
             * 引數明細
             * 1、佇列名稱
             * 2、交換機名稱
             * 3、路由key
             */
            //Routing 路由模式
            channel.queueBind(QUEUE_SMS,EXCHANGE,QUEUE_SMS);
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                /**
                  * 消費者接收訊息呼叫此方法
                  * @param consumerTag 消費者的標籤,在channel.basicConsume()去指定
                  * @param envelope 訊息包的內容,可從中獲取訊息id,訊息routingkey,交換機,訊息和重傳標誌
                    (收到訊息失敗後是否需要重新發送)
                  * @param properties
                  * @param body
                  * @throws IOException
                 * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //交換機
                    String exchange = envelope.getExchange();
                    //路由key
                    String routingKey = envelope.getRoutingKey();
                    envelope.getDeliveryTag();
                    String msg = new String(body,"utf-8");
                    System.out.println("mq收到的訊息是:"+msg );
                }
            };
            System.out.println("消費者啟動成功!");
            channel.basicConsume(QUEUE_SMS,true,consumer);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

四、Topics 模式

Topics 模式和Routing 路由模式最大的區別就是,Topics 模式傳送訊息和消費訊息的時候是通過萬用字元去進行匹配的。

路由模式:

1、每個消費者監聽自己的佇列,並且設定帶統配符的routingkey

2、生產者將訊息發給broker,由交換機根據routingkey來轉發訊息到指定的佇列

應用場景:使用者通知,當用戶充值成功或轉賬完成系統通知使用者,通知方式有簡訊、郵件多種方法;

生產端:

1、宣告佇列,宣告交換機

2、建立連線

3、建立通道

4、通道宣告交換機

5、通道宣告佇列

6、通過通道使佇列繫結到交換機並指定該佇列的routingkey(萬用字元)

7、制定訊息

8、傳送訊息並指定routingkey(萬用字元)

消費端:

1、宣告佇列,宣告交換機

2、建立連線

3、建立通道

4、通道宣告交換機

5、通道宣告佇列

6、通過通道使佇列繫結到交換機並指定routingkey(萬用字元)

7、重寫訊息消費方法

8、執行訊息方法

按照假設的應用場景,Topics 模式也是一個生產端,兩個消費端,生產端佇列繫結交換機的時候,需要指定的routingkey是萬用字元,傳送訊息的時候繫結的routingkey也是萬用字元,消費端佇列繫結交換機的時候routingkey也是萬用字元,這樣就能根據萬用字元匹配到訊息了。

生產端的程式碼如下:

package com.xyfer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer04 {
    //宣告兩個佇列和一個交換機
    //Topics 模式
    private static final String QUEUE_EMAIL ="queueEmail";
    private static final String QUEUE_SMS ="queueSms";
    private static final String EXCHANGE = "messageChange";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");//mq服務ip地址
            connectionFactory.setPort(5672);//mq client連線埠
            connectionFactory.setUsername("guest");//mq登入使用者名稱
            connectionFactory.setPassword("guest");//mq登入密碼
            connectionFactory.setVirtualHost("/");//rabbitmq預設虛擬機器名稱為“/”,虛擬機器相當於一個獨立的mq伺服器
            //建立與RabbitMQ服務的TCP連線
            connection = connectionFactory.newConnection();
            //建立與Exchange的通道,每個連線可以建立多個通道,每個通道代表一個會話任務
            channel = connection.createChannel();
            //通道繫結交換機
            /**
              * 引數明細
              * 1、交換機名稱
              * 2、交換機型別,fanout、topic、direct、headers
              */
            //Topics 模式
            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC);
            //通道繫結佇列
            /**
             * 宣告佇列,如果Rabbit中沒有此佇列將自動建立
             * param1:佇列名稱
             * param2:是否持久化
             * param3:佇列是否獨佔此連線
             * param4:佇列不再使用時是否自動刪除此佇列
             * param5:佇列引數
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道繫結郵件佇列
            channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道繫結簡訊佇列
            //交換機和佇列繫結
            /**
             * 引數明細
             * 1、佇列名稱
             * 2、交換機名稱
             * 3、路由key
             */
            channel.queueBind(QUEUE_EMAIL,EXCHANGE,"inform.#.email.#");
            channel.queueBind(QUEUE_SMS,EXCHANGE,"inform.#.sms.#");
            //給email佇列發訊息
            for(int i = 0;i<10;i++){
                String message = new String("mq 傳送email訊息。。。");
                /**
                  * 訊息釋出方法
                  * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange
                  * param2:routingKey,訊息的路由Key,是用於Exchange(交換機)將訊息轉發到指定的訊息佇列
                  * param3:訊息包含的屬性
                  * param4:訊息體
                  * 這裡沒有指定交換機,訊息將傳送給預設交換機,每個佇列也會繫結那個預設的交換機,但是不能顯示繫結或解除繫結
                  * 預設的交換機,routingKey等於佇列名稱
                 */
                //String exchange, String routingKey, BasicProperties props, byte[] body
                channel.basicPublish(EXCHANGE,"inform.email",null,message.getBytes());
                System.out.println("mq email 訊息傳送成功!");
            }
            //給sms佇列發訊息
            for(int i = 0;i<10;i++){
                String message = new String("mq 傳送sms訊息。。。");
                /**
                  * 訊息釋出方法
                  * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange
                  * param2:routingKey,訊息的路由Key,是用於Exchange(交換機)將訊息轉發到指定的訊息佇列
                  * param3:訊息包含的屬性
                  * param4:訊息體
                  * 這裡沒有指定交換機,訊息將傳送給預設交換機,每個佇列也會繫結那個預設的交換機,但是不能顯示繫結或解除繫結
                  * 預設的交換機,routingKey等於佇列名稱
                 */
                //String exchange, String routingKey, BasicProperties props, byte[] body
                channel.basicPublish(EXCHANGE,"inform.sms",null,message.getBytes());
                System.out.println("mq sms 訊息傳送成功!");
            }
            //給email和sms佇列發訊息
            for(int i = 0;i<10;i++){
                String message = new String("mq 傳送email sms訊息。。。");
                /**
                  * 訊息釋出方法
                  * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange
                  * param2:routingKey,訊息的路由Key,是用於Exchange(交換機)將訊息轉發到指定的訊息佇列
                  * param3:訊息包含的屬性
                  * param4:訊息體
                  * 這裡沒有指定交換機,訊息將傳送給預設交換機,每個佇列也會繫結那個預設的交換機,但是不能顯示繫結或解除繫結
                  * 預設的交換機,routingKey等於佇列名稱
                 */
                //String exchange, String routingKey, BasicProperties props, byte[] body
                channel.basicPublish(EXCHANGE,"inform.email.sms",null,message.getBytes());
                System.out.println("mq email sms 訊息傳送成功!");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

郵件消費端的程式碼如下:

package com.xyfer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer04 {
    private static final String QUEUE_EMAIL ="queueEmail";
    private static final String EXCHANGE = "messageChange";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            //通道繫結交換機
            /**
              * 引數明細
              * 1、交換機名稱
              * 2、交換機型別,fanout、topic、direct、headers
              */
            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC);
            //通道繫結佇列
            /**
             * 宣告佇列,如果Rabbit中沒有此佇列將自動建立
             * param1:佇列名稱
             * param2:是否持久化
             * param3:佇列是否獨佔此連線
             * param4:佇列不再使用時是否自動刪除此佇列
             * param5:佇列引數
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道繫結郵件佇列
            //交換機和佇列繫結
            /**
             * 引數明細
             * 1、佇列名稱
             * 2、交換機名稱
             * 3、路由key
             */
            channel.queueBind(QUEUE_EMAIL,EXCHANGE,"inform.#.email.#");
            //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                /**
                  * 消費者接收訊息呼叫此方法
                  * @param consumerTag 消費者的標籤,在channel.basicConsume()去指定
                  * @param envelope 訊息包的內容,可從中獲取訊息id,訊息routingkey,交換機,訊息和重傳標誌
                    (收到訊息失敗後是否需要重新發送)
                  * @param properties
                  * @param body
                  * @throws IOException
                 * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //交換機
                    String exchange = envelope.getExchange();
                    //路由key
                    String routingKey = envelope.getRoutingKey();
                    envelope.getDeliveryTag();
                    String msg = new String(body,"utf-8");
                    System.out.println("mq收到的訊息是:"+msg );
                }

            };
            System.out.println("消費者啟動成功!");
            channel.basicConsume(QUEUE_EMAIL,true,consumer);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

簡訊消費端的程式碼如下:

package xyfer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer04 {
    private static final String QUEUE_SMS ="queueSms";
    private static final String EXCHANGE = "messageChange";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            //通道繫結交換機
            /**
              * 引數明細
              * 1、交換機名稱
              * 2、交換機型別,fanout、topic、direct、headers
              */
            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC);
            //通道繫結佇列
            /**
             * 宣告佇列,如果Rabbit中沒有此佇列將自動建立
             * param1:佇列名稱
             * param2:是否持久化
             * param3:佇列是否獨佔此連線
             * param4:佇列不再使用時是否自動刪除此佇列
             * param5:佇列引數
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道繫結郵件佇列
            //交換機和佇列繫結
            /**
             * 引數明細
             * 1、佇列名稱
             * 2、交換機名稱
             * 3、路由key
             */
            channel.queueBind(QUEUE_SMS,EXCHANGE,"inform.#.sms.#");
            //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                /**
                  * 消費者接收訊息呼叫此方法
                  * @param consumerTag 消費者的標籤,在channel.basicConsume()去指定
                  * @param envelope 訊息包的內容,可從中獲取訊息id,訊息routingkey,交換機,訊息和重傳標誌
                    (收到訊息失敗後是否需要重新發送)
                  * @param properties
                  * @param body
                  * @throws IOException
                 * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //交換機
                    String exchange = envelope.getExchange();
                    //路由key
                    String routingKey = envelope.getRoutingKey();
                    envelope.getDeliveryTag();
                    String msg = new String(body,"utf-8");
                    System.out.println("mq收到的訊息是:"+msg );
                }

            };
            System.out.println("消費者啟動成功!");
            channel.basicConsume(QUEUE_SMS,true,consumer);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

由於生產端同時傳送了email的訊息(10條),sms的訊息(10條),email和sms同時收到的訊息(10條),所以每個消費端都應收到各自的10條訊息,加上同時都能收到的10條訊息,每個消費端應該收到20條訊息;

生產端控制檯列印:

 郵件消費端控制檯列印:

 簡訊消費端的控制檯列印:

 生產端執行後,RabbitMQ上的訊息佇列情況:

 兩個消費端執行完後,RabbitMQ上的訊息佇列情況:

 五、Header 模式

header模式與routing不同的地方在於,header模式取消routingkey,使用header中的 key/value(鍵值對)匹配佇列。

案例:

根據使用者的通知設定去通知使用者,設定接收Email的使用者只接收Email,設定接收sms的使用者只接收sms,設定兩種通知型別都接收的則兩種通知都有效。

根據假設使用場景,需要一個生產端,兩個消費端,不同的是,生產端宣告交換機時,交換機的型別不同,是headers型別,生產端佇列繫結交換機時,不使用routingkey,而是使用header中的 key/value(鍵值對)匹配佇列,傳送訊息時也是使用header中的 key/value(鍵值對)匹配佇列。

消費端同樣是宣告交換機時,交換機的型別不同,是headers型別,消費端佇列繫結交換機時,不使用routingkey,而是使用header中的 key/value(鍵值對)匹配佇列,消費訊息時也是使用header中的 key/value(鍵值對)匹配佇列。

生產端的程式碼如下:

package com.xyfer;

import com.rabbitmq.client.*;


import java.io.IOException;
import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class Producer05 {
    //宣告兩個佇列和一個交換機
    //Header 模式
    private static final String QUEUE_EMAIL ="queueEmail";
    private static final String QUEUE_SMS ="queueSms";
    private static final String EXCHANGE = "messageChange";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");//mq服務ip地址
            connectionFactory.setPort(5672);//mq client連線埠
            connectionFactory.setUsername("guest");//mq登入使用者名稱
            connectionFactory.setPassword("guest");//mq登入密碼
            connectionFactory.setVirtualHost("/");//rabbitmq預設虛擬機器名稱為“/”,虛擬機器相當於一個獨立的mq伺服器
            //建立與RabbitMQ服務的TCP連線
            connection = connectionFactory.newConnection();
            //建立與Exchange的通道,每個連線可以建立多個通道,每個通道代表一個會話任務
            channel = connection.createChannel();
            //通道繫結交換機
            /**
              * 引數明細
              * 1、交換機名稱
              * 2、交換機型別,fanout、topic、direct、headers
              */
            //Header 模式
            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.HEADERS);
            //通道繫結佇列
            /**
             * 宣告佇列,如果Rabbit中沒有此佇列將自動建立
             * param1:佇列名稱
             * param2:是否持久化
             * param3:佇列是否獨佔此連線
             * param4:佇列不再使用時是否自動刪除此佇列
             * param5:佇列引數
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道繫結郵件佇列
            channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道繫結簡訊佇列
            //交換機和佇列繫結
            /**
             * 引數明細
             * 1、佇列名稱
             * 2、交換機名稱
             * 3、路由key
             * 4、
             * String queue, String exchange, String routingKey, Map<String, Object> arguments
             */
            Map<String,Object> headers_email = new Hashtable<String,Object>();
            headers_email.put("inform_type","email");
            Map<String,Object> headers_sms = new Hashtable<String, Object>();
            headers_sms.put("inform_type","sms");
            channel.queueBind(QUEUE_EMAIL,EXCHANGE,"",headers_email);
            channel.queueBind(QUEUE_SMS,EXCHANGE,"",headers_sms);
            //給email佇列發訊息
            for(int i = 0;i<10;i++){
                String message = new String("mq 傳送email訊息。。。");
                Map<String,Object> headers = new Hashtable<String,Object>();
                headers.put("inform_type","email");//匹配email通知消費者繫結的header
                /**
                  * 訊息釋出方法
                  * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange
                  * param2:routingKey,訊息的路由Key,是用於Exchange(交換機)將訊息轉發到指定的訊息佇列
                  * param3:訊息包含的屬性
                  * param4:訊息體
                  * 這裡沒有指定交換機,訊息將傳送給