1. 程式人生 > >RabbitMQ學習(四)——訊息分發機制

RabbitMQ學習(四)——訊息分發機制

在前面的一篇博文中,我們對RabbitMQ中的交換機有了大致的瞭解,同時結合Spring Boot的例項,讓我們對RabbitMQ的用法有了更清晰的認識。如果忘記了可以去複習一下,RabbitMQ學習(三)——探索交換機(Exchange),結合SpringBoot實戰

今天我們將要對RabbitMQ的訊息機制進行更詳細的探究,在上一篇文章中其實也涉及到了訊息機制的問題。只是沒有深入探究,今天我們將要開始詳細瞭解該機制的特點。

我們知道每個訊息處理的時間是不同的,換句話說訊息的複雜度是不同的,有些訊息很複雜需要很久的時間,有些訊息很簡單,只需要耗時一會兒就可以完成。而在實際情況下如何考慮分配資源,讓效率達到最大化,從而實現按能力分配任務,達到物盡其用。這就需要了解訊息的分發機制。

在這裡我們使用Thread.Sleep()方法來模擬耗時,時間越久,任務就越複雜。

這裡我們結合前面的例子,在spring boot的程式碼架構下使用RabbitMQ來探究一下,分發機制。首先我們結合前面第三節文章的例子,在sender和receiver下新建DistributionSender.java和DistributionReceiver.java來模擬傳送者和接收者。

DistributionSender.java:

@Component
public class DistributionSender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public
void send(int i) { // 傳送的訊息 String message = "This is a task, and the complexity is " + i + "。" + StringUtils.repeat(".", i); this.rabbitTemplate.convertAndSend("distribu", message); } }

這裡採用的是預設交換機,佇列為“distribu”。需要在config包下面的RabbitConfig.java裡面新增如下佇列:

    /**
     * 申明distribu佇列
     * 
     * @return
*/
@Bean public Queue DistribuQueue() { return new Queue("distribu"); }

DistributionReceiver.java:

@Component
public class DistributionReceiver {


    /**
     * 消費者A
     * 
     * @param msg
     */
    @SuppressWarnings("deprecation")
    @RabbitListener(queues = "distribu")
    public void processA(Message message) {
        String msg = new String(message.getBody());
        System.out.println(" DistributionReceiverA  : " + msg);
        SimpleDateFormat time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSSS");
        System.out.println(" ProccessingA... at " + time.format(new Date()));

        try {
            for (char ch : msg.toCharArray()) {
                if (ch == '.') {
                    doWork(1000);
                }
            }
        } catch (InterruptedException e) {
        } finally {
            System.out.println(" A Done! at " + time.format(new Date()));
        }
    }

    private void doWork(long time) throws InterruptedException {
        Thread.sleep(time);
    }

}

然後在controller包的RabbitTest.java檔案裡新增Restful介面,模擬傳送請求。

    @Autowired
    private DistributionSender distributionSender;

    /**
     * 分發機制訊息傳送測試
     */
    @GetMapping("/distribu")
    public void distribu() {
        distributionSender.send(3);
    }
 DistributionReceiverA  : This is a task, and the complexity is 3...
 ProccessingA... at 2018-05-23 21:29:18:0628
 A Done! at 2018-05-23 21:29:21:0639

從列印的資訊可以看出這裡就模擬了完成任務需要3秒鐘的時間任務實現。

下面我們更改傳送的訊息數量,在controller控制器裡面進行更改,如下:

    /**
     * 分發機制訊息傳送測試
     */
    @GetMapping("/distribu")
    public void distribu() {
        for (int i = 0; i < 5; i++) {
            //傳送任務複雜度都為1的訊息
            distributionSender.send(1);
        }
    }

模擬傳送5條訊息,並且每條傳送的訊息的複雜度都是相同的,複雜度都為1。

然後再在receiver包中DistributionReceiver.java新增一個消費者B,如下:

    /**
     * 消費者B
     * 
     * @param msg
     */
    @SuppressWarnings("deprecation")
    @RabbitListener(queues = "distribu")
    public void processB(Message message) {
        String msg = new String(message.getBody());
        System.out.println(" DistributionReceiverB  : " + msg);
        SimpleDateFormat time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSSS");
        System.out.println(" ProccessingB... at " + time.format(new Date()));

        try {
            for (char ch : msg.toCharArray()) {
                if (ch == '.') {
                    doWork(1000);
                }
            }
        } catch (InterruptedException e) {
        } finally {
            System.out.println(" B Done! at " + time.format(new Date()));
        }
    }

再次執行程式,訪問介面,結果如下:

 DistributionReceiverA  : This is a task, and the complexity is 1。.
 ProccessingA... at 2018-05-22 23:23:43:0014
 DistributionReceiverB  : This is a task, and the complexity is 1。.
 ProccessingB... at 2018-05-22 23:23:43:0014
 A Done! at 2018-05-22 23:23:44:0017
 B Done! at 2018-05-22 23:23:44:0017
 DistributionReceiverA  : This is a task, and the complexity is 1。.
 DistributionReceiverB  : This is a task, and the complexity is 1。.
 ProccessingA... at 2018-05-22 23:23:44:0093
 ProccessingB... at 2018-05-22 23:23:44:0093
 A Done! at 2018-05-22 23:23:45:0095
 B Done! at 2018-05-22 23:23:45:0095
 DistributionReceiverB  : This is a task, and the complexity is 1。.
 ProccessingB... at 2018-05-22 23:23:45:0143
 B Done! at 2018-05-22 23:23:46:0148

在訊息相同,A、B處理能力一樣情況下,我們可以發現A、B幾乎是同時處理訊息,訊息傳送順序為A->B->A->B->B。可以看出這裡並沒有實現A與B平均輪詢的情況,在最後的情況B執行了兩次。

接著現在我們把A處理能力更改為每個點要Thread.sleep(4000), B為Thread.sleep(1000),就是B的處理能力是A的四倍。執行一下,我們看一下列印的結果:

 DistributionReceiverB  : This is a task, and the complexity is 1。.
 ProccessingB... at 2018-05-22 23:24:48:0623
 DistributionReceiverA  : This is a task, and the complexity is 1。.
 ProccessingA... at 2018-05-22 23:24:48:0623
 B Done! at 2018-05-22 23:24:49:0624
 DistributionReceiverB  : This is a task, and the complexity is 1。.
 ProccessingB... at 2018-05-22 23:24:49:0663
 B Done! at 2018-05-22 23:24:50:0664
 DistributionReceiverB  : This is a task, and the complexity is 1。.
 ProccessingB... at 2018-05-22 23:24:50:0704
 B Done! at 2018-05-22 23:24:51:0709
 DistributionReceiverB  : This is a task, and the complexity is 1。.
 ProccessingB... at 2018-05-22 23:24:51:0748
 A Done! at 2018-05-22 23:24:52:0629
 B Done! at 2018-05-22 23:24:52:0749

現在我們可以清晰地看到在這裡B處理了4條訊息,而A只處理了1條訊息。這裡就是按公平分發的機制來發送訊息的,即按消費者處理能力來分發訊息。

注意現在重點來了,RabbitMQ的分發機制

1、Round-robin dispatch(輪詢分發)

這個是RabbitMQ預設的訊息分發機制,使用任務佇列的優點之一就是可以輕易的並行工作。如果我們有很多要分發的訊息,可以通過增加工作者(消費者)來解決這種狀況,使得系統的伸縮性更加容易擴充套件。

在預設情況下,RabbitMQ不會顧慮訊息者處理訊息的能力,即使其中有的消費者閒置有的消費者高負荷。RabbitMQ會逐個傳送訊息到在序列中的下一個消費者(而不考慮每個任務的時長等等,且是提前一次性分配,並非一個一個分配)。平均每個消費者獲得相同數量的訊息,這種方式分發訊息機制稱為Round-Robin(輪詢)。

2、Fair dispatch (公平分發)

而公平分發,則是根據消費者的處理能力來進行分發處理的。這裡主要是通過設定prefetchCount 引數來實現的。這樣RabbitMQ就會使得每個Consumer在同一個時間點最多處理規定的數量級個數的Message。換句話說,在接收到該Consumer的ack前,它不會將新的Message分發給它。 比如prefetchCount=1,則在同一時間下,每個Consumer在同一個時間點最多處理1個Message,同時在收到Consumer的ack前,它不會將新的Message分發給它。

注意:使用公平分發,必須關閉自動應答,改為手動應答。

說完了概念,我們再來思考一下,前面我們的例項。在使用Spring Boot結合RabbitMQ時,我們並沒有手動去應答,那麼這為啥是採用的公平分發機制?

這個是因為Spring Boot封裝的RabbitMQ方法,預設ACK機制是使用手工應答機制,當@RabbitListener修飾的方法被呼叫且沒有丟擲異常時, Spring Boot會為我們自動應答。

我們可以在@RabbitListener原始碼的註解裡看到,

RabbitListener

如果沒有指定containerFactory,將採用預設的containerFactory。然後我們在RabbitListenerContainerFactory中檢視到這個介面與MessageListenerContainer有關,
RabbitListenerContainerFactory

接著檢視MessageListenerContainer,這是一個介面,我們檢視實現了該介面的類,在原始碼包了提供了一個SimpleMessageListenerContainer的類,在裡面我們找到了DEFAULT_PREFETCH_COUNT,這下就清晰明瞭了。
SimpleMessageListenerContainer

預設情況下,Spring Boot中的RabbitMQ採用手動確認機制,只要如果不是程式設計師程式設計實現應答,框架就會為我們自動去確認。並且prefetchCount=1,這下就可以解釋為啥上面的例項出現的結果了。

下面我們再通過原生的RabbitMQ客戶端java程式碼來講解一下訊息分發機制的情況:

輪詢分發

還是在上面的例項裡,我們在sender包下,新建檔案DistributionSender2.java,程式碼如下:

DistributionSender2.java :

package net.anumbrella.rabbitmq.sender;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang.StringUtils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class DistributionSender2 {

    private final static String QUEUE_NAME = "test";

    public static void main(String[] args) throws IOException, TimeoutException {
        /**
         * 建立連線連線到MabbitMQ
         */
        ConnectionFactory factory = new ConnectionFactory();

        // 設定MabbitMQ所在主機ip或者主機名
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setHost("127.0.0.1");
        factory.setVirtualHost("/");
        factory.setPort(5672);

        // 建立一個連線
        Connection connection = factory.newConnection();

        // 建立一個頻道
        Channel channel = connection.createChannel();

        // 指定一個佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        for (int i = 0; i < 8; i++) {
            // 傳送的訊息
            String message = "This is a task, and the complexity is " + i + "。" + StringUtils.repeat(".", i);
            // 往佇列中發出一條訊息
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

            System.out.println(" DistributionSender2 Sent '" + message + "'");
        }
        // 關閉頻道和連線
        channel.close();
        connection.close();
    }

}

模擬傳送8條訊息,然後再receiver包下,新建DistributionReceiver2.java和DistributionReceiver3.java,程式碼如下:

DistributionReceiver2.java:

package net.anumbrella.rabbitmq.receiver;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class DistributionReceiver2 {

    private final static String QUEUE_NAME = "test";

    public static void main(String[] argv) throws IOException, InterruptedException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();

        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setHost("127.0.0.1");
        factory.setVirtualHost("/");
        factory.setPort(5672);
        // 開啟連線和建立頻道,與傳送端一樣

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 宣告佇列,主要為了防止訊息接收者先執行此程式,佇列還不存在時建立佇列。
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println("Receiver2 waiting for messages. To exit press CTRL+C");

        // 建立佇列消費者
        final Consumer consumer = new DefaultConsumer(channel) {
              @Override
              public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");

                System.out.println(" DistributionReceiver2  : " + message);
                SimpleDateFormat time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSSS");
                System.out.println(" Proccessing2... at " + time.format(new Date()));
                try {
                    for (char ch: message.toCharArray()) {
                        if (ch == '.') {
                            doWork(1000);
                        }
                    }
                } catch (InterruptedException e) {
                } finally {
                  System.out.println(" DistributionReceiver2 Done! at " +time.format(new Date()));
                }
              }
            };
            channel.basicConsume(QUEUE_NAME, true, consumer);
    }


    private static void doWork(long time) throws InterruptedException {
        Thread.sleep(time);
    }

}

DistributionReceiver3.java:

package net.anumbrella.rabbitmq.receiver;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class DistributionReceiver3 {

    private final static String QUEUE_NAME = "test";

    public static void main(String[] argv) throws IOException, InterruptedException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();

        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setHost("127.0.0.1");
        factory.setVirtualHost("/");
        factory.setPort(5672);
        // 開啟連線和建立頻道,與傳送端一樣

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 宣告佇列,主要為了防止訊息接收者先執行此程式,佇列還不存在時建立佇列。
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println("Receiver3 waiting for messages. To exit press CTRL+C");

        // 建立佇列消費者
        final Consumer consumer = new DefaultConsumer(channel) {
              @Override
              public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");

                System.out.println(" DistributionReceiver3  : " + message);
                SimpleDateFormat time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSSS");
                System.out.println(" Proccessing3... at " + time.format(new Date()));
                try {
                    for (char ch: message.toCharArray()) {
                        if (ch == '.') {
                            doWork(4000);
                        }
                    }
                } catch (InterruptedException e) {
                } finally {
                  System.out.println(" DistributionReceiver3 Done! at " +time.format(new Date()));
                }
              }
            };
            channel.basicConsume(QUEUE_NAME, true, consumer);
    }


    private static void doWork(long time) throws InterruptedException {
        Thread.sleep(time);
    }

}

我們可以看到DistributionReceiver2是DistributionReceiver3處理訊息的4倍。

現在我們先啟動DistributionReceiver2.java與DistributionReceiver3.java監聽訊息,然後再啟動DistributionSender2.java傳送訊息,可以檢視結果如下:

傳送訊息:

 DistributionSender2 Sent 'This is a task, and the complexity is 0。'
 DistributionSender2 Sent 'This is a task, and the complexity is 1。.'
 DistributionSender2 Sent 'This is a task, and the complexity is 2。..'
 DistributionSender2 Sent 'This is a task, and the complexity is 3。...'
 DistributionSender2 Sent 'This is a task, and the complexity is 4。....'
 DistributionSender2 Sent 'This is a task, and the complexity is 5。.....'
 DistributionSender2 Sent 'This is a task, and the complexity is 6。......'
 DistributionSender2 Sent 'This is a task, and the complexity is 7。.......'

DistributionReceiver2.java收到的訊息:

Receiver2 waiting for messages. To exit press CTRL+C
 DistributionReceiver2  : This is a task, and the complexity is 0。
 Proccessing2... at 2018-05-23 23:12:50:0874
 DistributionReceiver2 Done! at 2018-05-23 23:12:50:0876
 DistributionReceiver2  : This is a task, and the complexity is 2。..
 Proccessing2... at 2018-05-23 23:12:50:0876
 DistributionReceiver2 Done! at 2018-05-23 23:12:52:0880
 DistributionReceiver2  : This is a task, and the complexity is 4。....
 Proccessing2... at 2018-05-23 23:12:52:0880
 DistributionReceiver2 Done! at 2018-05-23 23:12:56:0890
 DistributionReceiver2  : This is a task, and the complexity is 6。......
 Proccessing2... at 2018-05-23 23:12:56:0890
 DistributionReceiver2 Done! at 2018-05-23 23:13:02:0910

DistributionReceiver3.java收到的訊息:

Receiver3 waiting for messages. To exit press CTRL+C
 DistributionReceiver3  : This is a task, and the complexity is 1。.
 Proccessing3... at 2018-05-23 23:12:50:0879
 DistributionReceiver3 Done! at 2018-05-23 23:12:54:0884
 DistributionReceiver3  : This is a task, and the complexity is 3。...
 Proccessing3... at 2018-05-23 23:12:54:0885
 DistributionReceiver3 Done! at 2018-05-23 23:13:06:0887
 DistributionReceiver3  : This is a task, and the complexity is 5。.....
 Proccessing3... at 2018-05-23 23:13:06:0888
 DistributionReceiver3 Done! at 2018-05-23 23:13:26:0903
 DistributionReceiver3  : This is a task, and the complexity is 7。.......
 Proccessing3... at 2018-05-23 23:13:26:0904
 DistributionReceiver3 Done! at 2018-05-23 23:13:54:0921

我們知道DistributionReceiver2處理速度是DistributionReceiver3處理速度的4倍,但是結果兩者收到的訊息數是相同的。
DistributionReceiver2:0,2,4,6
DistributionReceiver3:1,3,5,7
這裡就是採用的輪詢分發,與前面講解的剛剛符合。

公平分發

為了使用公平分發,我們需要通過basicQos( prefetchCount = 1)方法,來設定prefetchCount引數,同時更改確認方式為手動確認。

int prefetchCount = 1;
channel.basicQos(prefetchCount);

同理我們在sender包下新建DistributionSender3.java,程式碼如下:

package net.anumbrella.rabbitmq.sender;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang.StringUtils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class DistributionSender3 {

    private final static String QUEUE_NAME = "test";

    public static void main(String[] args) throws IOException, TimeoutException {
        /**
         * 建立連線連線到MabbitMQ
         */
        ConnectionFactory factory = new ConnectionFactory();

        // 設定MabbitMQ所在主機ip或者主機名
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setHost("127.0.0.1");
        factory.setVirtualHost("/");
        factory.setPort(5672);

        // 建立一個連線
        Connection connection = factory.newConnection();

        // 建立一個頻道
        Channel channel = connection.createChannel();

        // 指定一個佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        int prefetchCount = 1;
        // 限制發給同一個消費者不得超過1條訊息
        channel.basicQos(prefetchCount);

        for (int i = 0; i < 8; i++) {
            // 傳送的訊息
            String message = "This is a task, and the complexity is " + i + "。" + StringUtils.repeat(".", 1);
            // 往佇列中發出一條訊息
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

            System.out.println(" DistributionSender3 Sent '" + message + "'");
        }
        // 關閉頻道和連線
        channel.close();
        connection.close();
    }

}

更改為手動確認需要新增如下程式碼,basicAck 方法的第二個引數 multiple 取值為 false 時,表示通知 RabbitMQ 當前訊息被確認;如果為 true,則額外將比第一個引數指定的 delivery tag 小的訊息一併確認。(在RabbitMQ的每個通道中,每條訊息的 Delivery Tag 從 1 開始遞增,這裡的批量處理是指同一通道中的訊息):

channel.basicAck(envelope.getDeliveryTag(), false);

通過更改auto為false,即更改自動確認為false。如下:

channel.basicConsume(QUEUE_NAME, false, consumer);

DistributionReceiver4.java:

package net.anumbrella.rabbitmq.receiver;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class DistributionReceiver4 {

    private final static String QUEUE_NAME = "test";

    public static void main(String[] argv) throws IOException, InterruptedException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();


        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setHost("127.0.0.1");
        factory.setVirtualHost("/");
        factory.setPort(5672);
        // 開啟連線和建立頻道,與傳送端一樣

        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        // 宣告佇列,主要為了防止訊息接收者先執行此程式,佇列還不存在時建立佇列。
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println("Receiver4 waiting for messages. To exit press CTRL+C");

        //保證一次只分發一個
        channel.basicQos(1);

        // 建立佇列消費者
        final Consumer consumer = new DefaultConsumer(channel) {
              @Override
              public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");

                System.out.println(" DistributionReceiver4  : " + message);
                SimpleDateFormat time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSSS");
                System.out.println(" Proccessing4... at " + time.format(new Date()));
                try {
                    for (char ch: message.toCharArray()) {
                        if (ch == '.') {
                            doWork(1000);
                        }
                    }
                } catch (InterruptedException e) {
                } finally {
                  System.out.println(" DistributionReceiver4 Done! at " +time.format(new Date()));
                  channel.basicAck(envelope.getDeliveryTag(), false);
                }
              }
            };
            channel.basicConsume(QUEUE_NAME, false, consumer);
    }


    private static void doWork(long time) throws InterruptedException {
        Thread.sleep(time);
    }

}

DistributionReceiver5.java:

package net.anumbrella.rabbitmq.receiver;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class DistributionReceiver5 {

    private final static String QUEUE_NAME = "test";

    public static void main(String[] argv) throws IOException, InterruptedException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();


        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setHost("127.0.0.1");
        factory.setVirtualHost("/");
        factory.setPort(5672);
        // 開啟連線和建立頻道,與傳送端一樣

        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        // 宣告佇列,主要為了防止訊息接收者先執行此程式,佇列還不存在時建立佇列。
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out