1. 程式人生 > 其它 >RabbitMQ五種工作模式

RabbitMQ五種工作模式

技術標籤:rabbitmq交換機佇列

RabbitMQ五種工作模式

RabbitMQ五種工作模式

在SpringBoot環境下做的程式碼測試,RabbitMQ的包是用SpringBoot的starter-amqp包引入的。

1、簡單佇列

在這裡插入圖片描述

一個生產者對應一個消費者!!!

1、pom檔案

SpringBoot匯入rabbitmq 啟動包

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、工具類

/**
 * 〈簡述〉<br>
 * 〈連線RabbitMQ的工具類〉
 *
 * @create 2020/7/1
 * @since 1.0.0
 */
public class ConnectionUtil {
    public static Connection getConnection() throws Exception {
        return getConnection(new Properties());
    }

    private static Connection getConnection(Properties properties) throws
Exception { return getConnection(properties.getHost(), properties.getPort(), properties.getvHost(), properties.getUserName(), properties.getPassWord()); } public static Connection getConnection(String host, int port, String vHost,
String userName, String passWord) throws Exception { //1、定義連線工廠 ConnectionFactory factory = new ConnectionFactory(); //2、設定伺服器地址 factory.setHost(host); //3、設定埠 factory.setPort(port); //4、設定虛擬主機、使用者名稱、密碼 factory.setVirtualHost(vHost); factory.setUsername(userName); factory.setPassword(passWord); //5、通過連線工廠獲取連線 Connection connection = factory.newConnection(); return connection; } public static class Properties implements Serializable { String host = "192.168.1.103"; int port = 5672; String vHost = "/"; String userName = "guest"; String passWord = "guest"; public Properties() { } public Properties(String host, int port, String vHost, String userName, String passWord) { this.host = host; this.port = port; this.vHost = vHost; this.userName = userName; this.passWord = passWord; } public String getHost() { return host; } public Properties setHost(String host) { this.host = host; return self(); } public int getPort() { return port; } public Properties setPort(int port) { this.port = port; return self(); } public String getvHost() { return vHost; } public Properties setvHost(String vHost) { this.vHost = vHost; return self(); } public String getUserName() { return userName; } public Properties setUserName(String userName) { this.userName = userName; return self(); } public String getPassWord() { return passWord; } public Properties setPassWord(String passWord) { this.passWord = passWord; return self(); } private Properties self(){ return this; } } }

3、生產者 Producer

/**
 * 〈簡述〉<br> 
 * 〈簡單佇列——訊息生產者〉
 *
 * @create 2020/7/1
 * @since 1.0.0
 */
public class Producer {
    private final static String QUEUE_NAME = QueueName.test_simple_queue.toString();

    public static void main(String[] args) throws Exception {
        sendMessage();
    }

    public static void sendMessage() throws Exception {
        //1、獲取連線
        Connection connection = ConnectionUtil.getConnection();
        //2、宣告通道
        Channel channel = connection.createChannel();
        //3、宣告(建立)佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //4、定義訊息內容
        String message = "hello rabbitmq ";
        //5、釋出訊息
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println("[x] Sent'" + message + "'");
        //6、關閉通道
        channel.close();
        //7、關閉連線
        connection.close();
    }
}

4、消費者Consumer

/**
 * 〈簡述〉<br>
 * 〈訊息消費者〉
 *
 * @create 2020/7/1
 * @since 1.0.0
 */
public class Customer {

    private final static String QUEUE_NAME = QueueName.test_simple_queue.toString();

    public static void main(String[] args) throws Exception {
        getMessage();

    }

    public static void getMessage() throws Exception {
        //1、獲取連線
        Connection connection = ConnectionUtil.getConnection();
        //2、宣告通道
        Channel channel = connection.createChannel();
        //3、宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //4、定義佇列的消費者
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msgString = new String(body, "utf-8");
                System.out.println("接收的訊息:" + msgString);
            }
        };
        //5、監聽佇列
     /*
		 true:表示自動確認,只要訊息從佇列中獲取,無論消費者獲取到訊息後是否成功消費,都會認為訊息已經成功消費
		 false:表示手動確認,消費者獲取訊息後,伺服器會將該訊息標記為不可用狀態,等待消費者的反饋,
				如果消費者一直沒有反饋,那麼該訊息將一直處於不可用狀態,並且伺服器會認為該消費者已經掛掉,不會再給其
				傳送訊息,直到該消費者反饋。
	  */
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

注意這裡消費者有自動確認訊息和手動確認訊息兩種模式。

2、work 模式

在這裡插入圖片描述

一個生產者對應多個消費者,但是一條訊息只能有一個消費者獲得訊息!!!

2.1輪詢分發
 
  1、生產者

/**
 * 〈簡述〉<br>
 * 〈輪詢分發——生產者〉
 *
 * @create 2020/7/3
 * @since 1.0.0
 */
public class Send {
    private static final String QUEUE_NAME = QueueName.test_work_queue.toString();

    public static void main(String args[]) throws Exception {
        //獲取連線
        Connection connection = ConnectionUtil.getConnection();
        //獲取channel
        Channel channel = connection.createChannel();
        //宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        for (int i = 0; i < 50; i++) {
            String msg = "hello " + i;
            System.out.println("[mq] send:" + msg);
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            Thread.sleep(i * 20);
        }
        channel.close();
        connection.close();
    }

}

2、消費者

這裡建立兩個消費者

消費者1:每接收一條訊息後休眠1秒

/**
 * 〈簡述〉<br>
 * 〈接收者〉
 *
 * @create 2020/7/3
 * @since 1.0.0
 */
public class Receive1 {
    private static final String QUEUE_NAME = QueueName.test_work_queue.toString();

    public static void main(String args[]) throws Exception {
        //獲取連線
        Connection connection = ConnectionUtil.getConnection();
        //獲取channel、
        Channel channel = connection.createChannel();
        //宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //定義一個消費這
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("[1] Receive1 msg:" + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("[1] done");
                }
            }
        };
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }

}

消費者2:每接收一條訊息後休眠2秒

/**
 * 〈簡述〉<br>
 * 〈接收者〉
 *
 * @create 2020/7/3
 * @since 1.0.0
 */
public class Receive2 {
    private static final String QUEUE_NAME = QueueName.test_work_queue.toString();

    public static void main(String args[]) throws Exception {
        //獲取連線
        Connection connection = ConnectionUtil.getConnection();
        //獲取channel
        Channel channel = connection.createChannel();
        //宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //定義一個消費這
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("[2] Receive2 msg:" + msg);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("[2] done");
                }
            }
        };
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }

}

3、測試結果

	生產者從0-49依次傳送訊息 
	消費者1:接收到偶數 	
	消費者2:接收到奇數

4、結論

輪詢分發就是將訊息佇列中的訊息,依次傳送給所有消費者。一個訊息只能被一個消費者獲取。

2.2公平分發
消費者關閉自動應答,開啟手動回執

/**
 * 〈簡述〉<br>
 * 〈接收者〉
 *
 * @create 2020/7/3
 * @since 1.0.0
 */
public class Receive2 {
    private static final String QUEUE_NAME = QueueName.test_work_queue.toString();

    public static void main(String args[]) throws Exception {
        //獲取連線
        Connection connection = ConnectionUtil.getConnection();
        //獲取channel
        Channel channel = connection.createChannel();
        //宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicQos(1);//保證一次只分發一個訊息
        //定義一個消費這
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("[2] Receive2 msg:" + msg);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("[2] done");
                    //手動回執
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };
        boolean autoAck = false;//自動應答
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }

}

手動回執:消費者完成業務介面方法後可以告知訊息佇列處理完成,訊息佇列從佇列中取一條訊息傳送給消費者。
  能者多勞:效率高的消費者消費訊息多。

3、釋出/訂閱模式

在這裡插入圖片描述

一個消費者將訊息首先發送到交換器,交換器繫結到多個佇列,然後被監聽該佇列的消費者所接收並消費。

ps:X表示交換器,在RabbitMQ中,交換器主要有四種類型:direct、fanout、topic、headers,這裡的交換器是 fanout。下面我們會詳細介紹這幾種交換器。

1、生產者

/**
 * 〈簡述〉<br>
 * 〈訂閱模式——生產者〉
 *
 * @create 2020/7/3
 * @since 1.0.0
 */
public class Send {
    private static final String EXCHANGE_NAME = MqName.exchange_fanout.toString();

    public static void main(String args[]) throws Exception {
        //獲取連線
        Connection connection = ConnectionUtil.getConnection();
        //獲取channel
        Channel channel = connection.createChannel();
        //宣告交換機
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//分發
        //傳送訊息
        String msg = "hello exchange";
        System.out.println("[mq] send:" + msg);
        channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
        channel.close();
        connection.close();
    }

}

2、消費者

​ 注意:兩個消費者繫結不同的佇列,繫結相同的交換機;

消費者1:繫結佇列名=queue_fanout_email1

/**
 * 〈簡述〉<br>
 * 〈接收者〉
 *
 * @create 2020/7/3
 * @since 1.0.0
 */
public class Receive1 {
    private static final String QUEUE_NAME = MqName.queue_fanout_email.toString() + "1";
    private static final String EXCHANGE_NAME = MqName.exchange_fanout.toString();

    public static void main(String args[]) throws Exception {
        //獲取連線
        Connection connection = ConnectionUtil.getConnection();
        //獲取channel
        Channel channel = connection.createChannel();
        //宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //繫結到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        //定義一個消費這
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             * No-op implementation of {@link Consumer#handleDelivery}.
             *
             * @param consumerTag
             * @param envelope
             * @param properties
             * @param body
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("[1] Receive1 msg:" + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("[1] done");
                }
            }
        };
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }

}

消費者2:繫結佇列名=queue_fanout_email2

/**
 * 〈簡述〉<br>
 * 〈接收者〉
 *
 * @create 2020/7/3
 * @since 1.0.0
 */
public class Receive2 {
    private static final String QUEUE_NAME = MqName.queue_fanout_email.toString() + "2";
    private static final String EXCHANGE_NAME = MqName.exchange_fanout.toString();

    public static void main(String args[]) throws Exception {
        //獲取連線
        Connection connection = ConnectionUtil.getConnection();
        //獲取channel
        Channel channel = connection.createChannel();
        //宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //繫結到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        //定義一個消費這
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             * No-op implementation of {@link Consumer#handleDelivery}.
             *
             * @param consumerTag
             * @param envelope
             * @param properties
             * @param body
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("[2] Receive2 msg:" + msg);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("[2] done");
                }
            }
        };
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }

}

3、測試結果
如上圖,兩個消費者獲得了同一條訊息。即就是,一個訊息從交換機同時傳送給了兩個佇列中,監聽這兩個佇列的消費者消費了這個訊息;
如果沒有佇列繫結交換機,則訊息將丟失。因為交換機沒有儲存能力,訊息只能儲存在佇列中。

4、路由模式

在這裡插入圖片描述

生產者將訊息傳送到direct交換器,在繫結佇列和交換器的時候有一個路由key,生產者傳送的訊息會指定一個路由key,那麼訊息只會傳送到相應key相同的佇列,接著監聽該佇列的消費者消費訊息。

也就是讓消費者有選擇性的接收訊息。

1、生產者

/**
 * 〈簡述〉<br> 
 * 〈路由模式-訊息傳送者〉
 *
 * @create 2020/7/8
 * @since 1.0.0
 */
public class Send {

    public static final String EXCHANGE_NAME = MqName.exchange_routing.toString();
    public static final String ROUTING_KEY = MqName.routing_world.toString();

    public static void main(String args[]) throws Exception {
        //獲取連線
        Connection connection = ConnectionUtil.getConnection();
        //獲取channel
        Channel channel = connection.createChannel();
        //宣告交換機
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String msg = "route message ->" + ROUTING_KEY;
        System.out.println("對 " + ROUTING_KEY + " 傳送訊息:" + msg);
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, msg.getBytes());
        //關閉連線
        channel.close();
        connection.close();
    }

}

2、消費者

​ 注意:兩個消費者,繫結相同的交換機,不同的佇列,不一樣的路由

消費者1:路由=routing_hello

/**
 * 〈簡述〉<br>
 * 〈接收訊息1〉
 *
 * @create 2020/7/8
 * @since 1.0.0
 */
public class Receive1 {

    public static final String QUEUE_NAME = MqName.queue_routing_001.toString();
    public static final String EXCHANGE_NAME = MqName.exchange_routing.toString();
    public static final String ROUTING_KEY_hello = MqName.routing_hello.toString();

    public static void main(String args[]) throws Exception {
        //獲取連線
        Connection connection = ConnectionUtil.getConnection();
        //獲取channel
        Channel channel = connection.createChannel();
        //宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //設定預讀取數
        channel.basicQos(1);
        //繫結交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_hello);
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             * No-op implementation of {@link Consumer#handleDelivery}.
             *
             * @param consumerTag
             * @param envelope
             * @param properties
             * @param body
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println(envelope.getRoutingKey() + " [1] Receive1 msg:" + msg);
            }
        };
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }

}

消費者2:路由1=routing_world 路由2=routing_hello

/**
 * 〈簡述〉<br>
 * 〈接收訊息2〉
 *
 * @create 2020/7/8
 * @since 1.0.0
 */
public class Receive2 {

    public static final String QUEUE_NAME = MqName.queue_routing_002.toString();
    public static final String EXCHANGE_NAME = MqName.exchange_routing.toString();
    public static final String ROUTING_KEY_world = MqName.routing_world.toString();
    public static final String ROUTING_KEY_hello = MqName.routing_hello.toString();

    public static void main(String args[]) throws Exception {
        //獲取連線
        Connection connection = ConnectionUtil.getConnection();
        //獲取channel
        Channel channel = connection.createChannel();
        //宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //設定預讀取數
        channel.basicQos(1);
        //繫結交換機和路由器,可以繫結多個路由
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_world);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_hello);
        //定義訊息消費者
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             * No-op implementation of {@link Consumer#handleDelivery}.
             *
             * @param consumerTag
             * @param envelope
             * @param properties
             * @param body
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println(envelope.getRoutingKey() + " [2] Receive1 msg:" + msg);
            }
        };
        //接收訊息
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }

}

3、測試結果

生產者傳送:routing_world

​ 消費者1:沒有接收到

​ 消費者2:接收到了

​ 生產者傳送:routing_hello

​ 消費者1:接收到了

​ 消費者2:接收到了

路由模式,是以路由規則為導向,引導訊息存入符合規則的佇列中。再由佇列的消費者進行消費的。

5、主題模式

在這裡插入圖片描述

上面的路由模式是根據路由key進行完整的匹配(完全相等才傳送訊息),這裡的萬用字元模式通俗的來講就是模糊匹配。

符號“#”表示匹配一個或多個詞,符號“*”表示匹配一個詞。

1、生產者

/**
 * 〈簡述〉<br> 
 * 〈主題模式-訊息傳送者〉
 *
 * @create 2020/7/8
 * @since 1.0.0
 */
public class Send {

    public static final String EXCHANGE_NAME = MqName.exchange_topic.toString();
    public static final String ROUTING_KEY = MqName.routing_goods.toString();

    public static void main(String args[]) throws Exception {
        //獲取連線
        Connection connection = ConnectionUtil.getConnection();
        //獲取channel
        Channel channel = connection.createChannel();
        //宣告交換機
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//        String routingKey = ROUTING_KEY + ".add";
        String routingKey = ROUTING_KEY + ".publish";
//        String routingKey = ROUTING_KEY + ".update";

        String msg = "route message ->" + routingKey;
        channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
        System.out.println("對 " + routingKey + " 傳送訊息:" + msg);
        //關閉連線
        channel.close();
        connection.close();
    }

}

2、消費者

​ 注意兩個消費者,路由的不同

消費者1:路由1=routing_goods.add 路由2=routing_goods.update

/**
 * 〈簡述〉<br>
 * 〈接收訊息1〉
 *
 * @create 2020/7/8
 * @since 1.0.0
 */
public class Receive1 {

    public static final String QUEUE_NAME = MqName.queue_topic_001.toString();
    public static final String EXCHANGE_NAME = MqName.exchange_topic.toString();
    public static final String ROUTING_KEY = MqName.routing_goods.toString();

    public static void main(String args[]) throws Exception {
        //獲取連線
        Connection connection = ConnectionUtil.getConnection();
        //獲取channel
        Channel channel = connection.createChannel();
        //宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //設定預讀取數
        channel.basicQos(1);
        //繫結交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY + ".add");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY + ".update");
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             * No-op implementation of {@link Consumer#handleDelivery}.
             *
             * @param consumerTag
             * @param envelope
             * @param properties
             * @param body
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println(envelope.getRoutingKey() + " [1] Receive1 msg:" + msg);
            }
        };
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }

}

消費者2:路由=routing_goods.*

/**
 * 〈簡述〉<br>
 * 〈接收訊息2〉
 *
 * @create 2020/7/8
 * @since 1.0.0
 */
public class Receive2 {

    public static final String QUEUE_NAME = MqName.queue_routing_002.toString();
    public static final String EXCHANGE_NAME = MqName.exchange_topic.toString();
    public static final String ROUTING_KEY = MqName.routing_goods.toString();

    public static void main(String args[]) throws Exception {
        //獲取連線
        Connection connection = ConnectionUtil.getConnection();
        //獲取channel
        Channel channel = connection.createChannel();
        //宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //設定預讀取數
        channel.basicQos(1);
        //繫結交換機和路由器,可以繫結多個路由
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY + ".*");
        //定義訊息消費者
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             * No-op implementation of {@link Consumer#handleDelivery}.
             *
             * @param consumerTag
             * @param envelope
             * @param properties
             * @param body
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println(envelope.getRoutingKey() + " [2] Receive1 msg:" + msg);
            }
        };
        //接收訊息
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }

}

3、測試結果

消費者1只能接收到.add.update的訊息

​ 消費者2可以接收到.add .publish.update的訊息

​ 與路由模式相似,但是,主題模式是一種模糊的匹配方式。

6.工作模式總結

這五種工作模式,可以歸為三類: 生產者,訊息佇列,一個消費者; 生產者,訊息佇列,多個消費者;
生產者,交換機,多個訊息佇列,多個消費者;

7、四種交換器

1、direct 如果路由鍵完全匹配的話,訊息才會被投放到相應的佇列。  
  2、fanout 當傳送一條訊息到fanout交換器上時,它會把訊息投放到所有附加在此交換器上的佇列。
  3、topic 設定模糊的繫結方式,“*”操作符將“.”視為分隔符,匹配單個字元;“#”操作符沒有分塊的概念,它將任意“.”均視為關鍵字的匹配部分,能夠匹配多個字元。
​ 4、header headers 交換器允許匹配 AMQP 訊息的 header 而非路由鍵,除此之外,header 交換器和
direct 交換器完全一致,但是效能卻差很多,因此基本上不會用到該交換器

**