1. 程式人生 > 實用技巧 >RabbitMQ的使用(一)_JavaClient實現簡單模式+Work模式

RabbitMQ的使用(一)_JavaClient實現簡單模式+Work模式

RabbitMQ的使用

rabbitmq的官網教程地址:https://www.rabbitmq.com/getstarted.html

1.RabbitMQ定義:是一款開源的訊息代理的佇列伺服器,是一種應用程式之間的通訊方法。RabbitMQ是基於Erlang語言來編寫的,基於AMQP協議的佇列。你可以把RabbitMQ想象成一個郵箱。傳送人投遞訊息到郵箱中。接收者從郵箱中取出訊息的過程

RabbitMQ負責接收、儲存、轉發訊息。

2.RabbitMQ的使用場景:非同步處理、流量削峰、應用解耦、應用日誌

3.RabbitMQ中的幾個概念:

  生產者:傳送訊息的程式就是生產者

  消費者:就是接收訊息的那一方,負責對接收到的訊息進行處理

3.訊息模式的種類:

  1.簡單模式

  2.Work模式(Work queues):

  3.訂閱模式(Publish/Subscribe)

  4.路由模式(Routing)

  5.Topics

  6.RPC

  7.Publisher Confirms

  官網圖片如下:

  

  

4.交換機種類:direct、topic、fanout、headers四種。

訊息只儲存在佇列中,佇列只受主機記憶體和磁碟的限制,它本質上是一個大的訊息緩衝區。許多生產者可以把訊息傳送到一個佇列,許多消費者也可以嘗試從一個佇列接收資料。

5.Java Client使用簡單模式:

   模式圖:一個生產者、一個佇列、一個消費者

  

   5.1:在Rabbitmq伺服器上建立一個/yingxiaocao的虛擬主機,併為這個主機新增一個yingxiaocao的使用者,提供訪問許可權。

   5.2 :匯入依賴包:with the groupIdcom.rabbitmqand the artifactIdamqp-client 

  <dependencies>
        <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
        <dependency>
            <
groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.9.0</version> </dependency> </dependencies>
View Code

  5.3 :建立連線工廠

public class RabbitMQConnectionFactory {

    public static Connection getRabbitMQConnections() throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(CommonConstant.ADD_RESS);
        connectionFactory.setPort(CommonConstant.PORT);
        connectionFactory.setUsername(CommonConstant.USER_NAME);
        connectionFactory.setPassword(CommonConstant.PASSWORD);
        connectionFactory.setVirtualHost(CommonConstant.VIRTUAL_HOST);
        return connectionFactory.newConnection();
    }
}
View Code

  5.4 建立生產者

public class RabbitSender {
    // 宣告一個佇列的名字
    private static final String SIMPLE_QUEUE_NAME = "hello_world_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        //1.獲取一個連結
        Connection rabbitMQConnections = RabbitMQConnectionFactory.getRabbitMQConnections();
        // 2.建立一個通道
        Channel channel = rabbitMQConnections.createChannel();
        /*
            3.要傳送訊息,我們必須宣告一個要傳送的佇列;然後我們可以向佇列釋出一條訊息
            引數1:queue:佇列的名字
            引數2:durable: 佇列是否做持久化操作,true表示佇列做持久化操作,該佇列將在伺服器重啟後,繼續存在
            引數3:exclusive:
            引數4:autoDelete:是否宣告自動刪除
            引數5:arguments 佇列引數
         */
        channel.queueDeclare(SIMPLE_QUEUE_NAME, false, false, false, null);
        // 3.宣告要傳送的訊息
        String sendMsg = "小河流水嘩啦1啦";
        /*
         * 4.傳送訊息
         *  引數1:exchange 交換機的名字,簡單模式不需要交換機,預設的就好
         *  引數2:routingKey
         *  引數3:props 屬性
         *  引數4:要傳送的訊息體
         */
        channel.basicPublish("", SIMPLE_QUEUE_NAME, null, sendMsg.getBytes());
        channel.close();
        rabbitMQConnections.close();
    }

}
View Code

  5.5 建立消費者

public class RabbitMQReceiver {

    // 宣告一個佇列的名字
    private static final String SIMPLE_QUEUE_NAME = "hello_world_queue";

    /**
     * 消費者這一方的使用,和生產者一樣,都是建立一個連結,開啟一個通道。然後從佇列中獲取訊息
     * DeliverCallback介面來緩衝由伺服器推送給我們的訊息。
     */
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.建立一個連結
        Connection rabbitMQConnections = RabbitMQConnectionFactory.getRabbitMQConnections();
        // 2.開啟一個通道
        final Channel channel = rabbitMQConnections.createChannel();
        // 3.宣告一個佇列:我們在這裡也聲明瞭佇列。因為我們可能在啟動生產者之前啟動消費者,所以我們希望在嘗試使用來自該佇列的訊息之前確保該佇列存在。
        channel.queueDeclare(SIMPLE_QUEUE_NAME, false, false, false, null);
        // 4.伺服器的訊息是非同步傳送給我們的,所以需要提供一個回撥。這個回撥將緩衝訊息,直到我們使用收到的訊息
        DeliverCallback deliverCallback = new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery delivery) throws IOException {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(new Date() + message);
            }
        };
        // 消費訊息,引數1:佇列名稱 引數2:true標識自動應答,如果設定為false,rabbitmq還會繼續推送訊息給消費者 引數三:標識回撥
        channel.basicConsume(SIMPLE_QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });
    }
}
View Code

或者使用

這個來接收訊息        
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("收到的訊息是:===>" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);//告訴伺服器我們收到訊息, true代表拒收訊息
            }
        };
        //如果設定自動應答為false,那麼我們必須手動告訴伺服器我收到訊息了,否則下次消費者重啟會再次收到之前的訊息
        channel.basicConsume(QUEUENAME, false, defaultConsumer);
View Code

  

5.6 測試結果,每傳送一條訊息,消費者就能收到一條

  

6.Java Client使用Work模式

  work模式圖:一個生產者+一個佇列+多個消費者

       

修改生產者程式碼,由於沒有實際的任務,使用Thread.sleep()來模擬一個複雜的任務。

public class WorkRabbitSender {
    // 宣告一個佇列的名字
    private static final String WORK_QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        //1.獲取一個連結
        Connection rabbitMQConnections = RabbitMQConnectionFactory.getRabbitMQConnections();
        // 2.建立一個通道
        Channel channel = rabbitMQConnections.createChannel();
        /*
            3.要傳送訊息,我們必須宣告一個要傳送的佇列;然後我們可以向佇列釋出一條訊息
            引數1:queue:佇列的名字
            引數2:durable: 佇列是否做持久化操作,true表示佇列做持久化操作,該佇列將在伺服器重啟後,繼續存在
            引數3:exclusive:
            引數4:autoDelete:是否宣告自動刪除
            引數5:arguments 佇列引數
         */
        channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);
        // 3.宣告要傳送的訊息
        /*
         * 4.傳送訊息
         *  引數1:exchange 交換機的名字,簡單模式不需要交換機,預設的就好
         *  引數2:routingKey
         *  引數3:props 屬性
         *  引數4:要傳送的訊息體
         */
        for (int i=1;i<=100;i++) {
            // 由於沒有實際的任務,用Thead.sleep()來表示傳送複雜的字串
            String sendMsg = "小河流水嘩啦1啦========>"+i;
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            channel.basicPublish("", WORK_QUEUE_NAME, null, sendMsg.getBytes());
        }
        System.out.println("訊息傳送完成");
        channel.close();
        rabbitMQConnections.close();
    }

}
View Code

  同樣修改消費者程式碼。每個訊息被消費後,同樣使用Thread.sleep()休息一下。它將處理傳遞的訊息並執行任務,現在使用自動應答的方式,消費者C1

public class WorkRabbitMQReceiver {
    // 宣告一個佇列的名字
    private static final String WORK_QUEUE_NAME = "work_queue";

    /**
     * 消費者這一方的使用,和生產者一樣,都是建立一個連結,開啟一個通道。然後從佇列中獲取訊息
     * DeliverCallback介面來緩衝由伺服器推送給我們的訊息。
     */
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.建立一個連結
        Connection rabbitMQConnections = RabbitMQConnectionFactory.getRabbitMQConnections();
        // 2.開啟一個通道
        final Channel channel = rabbitMQConnections.createChannel();
        // 3.宣告一個佇列:我們在這裡也聲明瞭佇列。因為我們可能在啟動生產者之前啟動消費者,所以我們希望在嘗試使用來自該佇列的訊息之前確保該佇列存在。
        channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);
        // 4.伺服器的訊息是非同步傳送給我們的,所以需要提供一個回撥。這個回撥將緩衝訊息,直到我們使用收到的訊息
        DeliverCallback deliverCallback = new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery delivery) throws IOException {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(new Date() + message);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        // 消費訊息,引數1:佇列名稱 引數2:true標識自動應答,如果設定為false,rabbitmq還會繼續推送訊息給消費者 引數三:標識回撥
        channel.basicConsume(WORK_QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });
    }
}
View Code

建立一個消費者C2

public class WorkRabbitMQReceiver2 {
    // 宣告一個佇列的名字
    private static final String WORK_QUEUE_NAME = "work_queue";

    /**
     * 消費者這一方的使用,和生產者一樣,都是建立一個連結,開啟一個通道。然後從佇列中獲取訊息
     * DeliverCallback介面來緩衝由伺服器推送給我們的訊息。
     */
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.建立一個連結
        Connection rabbitMQConnections = RabbitMQConnectionFactory.getRabbitMQConnections();
        // 2.開啟一個通道
        final Channel channel = rabbitMQConnections.createChannel();
        // 3.宣告一個佇列:我們在這裡也聲明瞭佇列。因為我們可能在啟動生產者之前啟動消費者,所以我們希望在嘗試使用來自該佇列的訊息之前確保該佇列存在。
        channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);
        // 4.伺服器的訊息是非同步傳送給我們的,所以需要提供一個回撥。這個回撥將緩衝訊息,直到我們使用收到的訊息
        DeliverCallback deliverCallback = new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery delivery) throws IOException {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(new Date() + message);
                try {
                    Thread.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        // 消費訊息,引數1:佇列名稱 引數2:true標識自動應答,如果設定為false,rabbitmq還會繼續推送訊息給消費者 引數三:標識回撥
        channel.basicConsume(WORK_QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });
    }
}
View Code

 預設情況下,RabbitMQ將按順序將每條訊息傳送給下一個使用者。平均而言,每個消費者將獲得相同數量的訊息。這種分發訊息的方式稱為輪詢

效果圖如下:

輪詢的方式的弊端。例如上述程式碼。假設執行程式碼消耗時間忽略不計,以執行緒睡眠之間為消費者處理業務需要的時間。那麼對於消費者C1來說,每次執行訊息需要休息1s,而消費者C2每次執行訊息休息1ms。顯然,消費者C2的處理速度遠快於消費者C1,在這種情況下,輪詢的方式,仍然會使兩臺伺服器獲取到相同的訊息數。而我們的RabbitMQ伺服器並不知道,兩臺伺服器的各自處理速度。這就會造成一些效能損失。這是因為RabbitMQ只在訊息進入佇列時傳送訊息。它不檢視使用者未確認訊息的數量。它只是盲目地將第n個訊息傳送給第n個消費者

為了解決這種情況:我們可以使用設定為prefetchCount = 1的basicQos方法。這告訴RabbitMQ一次不要給一個消費者傳送一條以上的訊息。或者,換句話說,在消費者處理並確認之前,不要向它傳送新訊息。相反,它將把它分派到下一個不繁忙的消費者

生產者程式碼 維持不變,消費者程式碼,加上如下兩句話

int prefetchCount = 1;
channel.basicQos(prefetchCount);
View Code
basicQos根據情況設定:消費者C1設定為1,消費者C2設定為3
注意:這時候,不能使用自動應答的方式,而是應改為手動應答的方式。否則還是輪詢的接收方式。自動應答,是訊息被髮送出去之後,不管消費者是否消費成功,都被rabbitmq認為是已消費完成。然後就會發送下一條訊息給消費者
修改消費者C1程式碼:
public class WorkRabbitMQReceiver {
    // 宣告一個佇列的名字
    private static final String WORK_QUEUE_NAME = "work_queue";

    /**
     * 消費者這一方的使用,和生產者一樣,都是建立一個連結,開啟一個通道。然後從佇列中獲取訊息
     * DeliverCallback介面來緩衝由伺服器推送給我們的訊息。
     */
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.建立一個連結
        Connection rabbitMQConnections = RabbitMQConnectionFactory.getRabbitMQConnections();
        // 2.開啟一個通道
        final Channel channel = rabbitMQConnections.createChannel();
        // 3.宣告一個佇列:我們在這裡也聲明瞭佇列。因為我們可能在啟動生產者之前啟動消費者,所以我們希望在嘗試使用來自該佇列的訊息之前確保該佇列存在。
        channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);
        // 4.伺服器的訊息是非同步傳送給我們的,所以需要提供一個回撥。這個回撥將緩衝訊息,直到我們使用收到的訊息
        channel.basicQos(1);
        DeliverCallback deliverCallback = new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery delivery) throws IOException {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(new Date() + message);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        // 消費訊息,引數1:佇列名稱 引數2:true標識自動應答,如果設定為false,rabbitmq還會繼續推送訊息給消費者 引數三:標識回撥
        channel.basicConsume(WORK_QUEUE_NAME, false, deliverCallback, consumerTag -> {
        });
    }
}
View Code

修改消費者C2程式碼:

public class WorkRabbitMQReceiver2 {
    // 宣告一個佇列的名字
    private static final String WORK_QUEUE_NAME = "work_queue";

    /**
     * 消費者這一方的使用,和生產者一樣,都是建立一個連結,開啟一個通道。然後從佇列中獲取訊息
     * DeliverCallback介面來緩衝由伺服器推送給我們的訊息。
     */
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.建立一個連結
        Connection rabbitMQConnections = RabbitMQConnectionFactory.getRabbitMQConnections();
        // 2.開啟一個通道
        final Channel channel = rabbitMQConnections.createChannel();
        // 3.宣告一個佇列:我們在這裡也聲明瞭佇列。因為我們可能在啟動生產者之前啟動消費者,所以我們希望在嘗試使用來自該佇列的訊息之前確保該佇列存在。
        channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);
        channel.basicQos(5);
        // 4.伺服器的訊息是非同步傳送給我們的,所以需要提供一個回撥。這個回撥將緩衝訊息,直到我們使用收到的訊息
        DeliverCallback deliverCallback = new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery delivery) throws IOException {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(new Date() + message);
                try {
                    Thread.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        // 消費訊息,引數1:佇列名稱 引數2:true標識自動應答,如果設定為false,rabbitmq還會繼續推送訊息給消費者 引數三:標識回撥
        channel.basicConsume(WORK_QUEUE_NAME, false, deliverCallback, consumerTag -> {
        });
    }
}
View Code

效果圖如下:

很顯然,不再是輪詢的方式接收訊息了。


7.訊息確認機制:

如果一個消費者開始了一個很長的任務,但是隻執行了一半,消費者就死掉了。如果使用自動應答的方式,一旦RabbitMQ向消費者傳送了一條訊息,它就會立即將其標記為刪除。在這種情況下,如果消費者死掉了,我們將丟失它正在處理的資訊。我們還將丟失所有傳送給這個特定消費者但尚未處理的訊息。但是這樣的話,程式就會有問題。為了確保訊息不會丟失,RabbitMQ支援訊息確認。一個確認被消費者傳送回來告訴RabbitMQ一個特定的訊息已經被接收,被處理並且RabbitMQ可以自由地刪除它。

如果消費者在沒有傳送ack的應答的情況下死亡(它的通道關閉了,連線關閉了,或者TCP連線丟失了),RabbitMQ將理解訊息沒有被完全處理,並將其重新排隊。如果同時有其他消費者線上,它就會迅速地將其重新發送給其他消費者。這樣你就可以確保沒有資訊丟失。前面的列子中,我將autoAck=true表示自動應答,現在只需要將該引數改為false,表示手動應答。上述程式碼已經演示了

8.訊息持久化操作

我們已經學會了如何確保即使消費者死了,任務也不會丟失。但是如果RabbitMQ伺服器停止,我們的任務仍然會丟失。

當RabbitMQ退出或崩潰時,它將忘記佇列和訊息,除非你告訴它不要這樣做。要確保訊息不丟失,需要做兩件事:我們需要將佇列和訊息標記為持久的。

首先,我們需要確保佇列在RabbitMQ節點重新啟動時能夠存活。為此,我們需要將其宣告為持久的

boolean durable = true;
channel.queueDeclare("work_queue", durable, false, false, null);

這個命令本身是正確的,但是它不能在我們目前的設定中工作。這是因為我們已經定義了一個名為work_queue的佇列,它不是持久的。RabbitMQ不允許你用不同的引數重新定義一個已有的佇列,如果這樣做了都會返回一個錯誤。

錯誤資訊如下:

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'work_queue' in vhost '/yingxiaocao': received 'true' but current is 'false', class-id=50, method-id=10)
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:293)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141)
View Code

解決辦法:宣告一個具有不同名稱的佇列

boolean durable = true;
channel.queueDeclare("durable_work_queue", durable, false, false, null);

在這一點上,我們確信即使RabbitMQ重啟,durable_work_queue佇列也不會丟失。現在我們需要將訊息標記為持有的。

可通過將MessageProperties設定為PERSISTENT_TEXT_PLAIN

channel.basicPublish("", "task_queue",
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());

修改生產者程式碼,設定佇列持久化操作。訊息持久化操作,如下

public class WorkRabbitSender {
    // 宣告一個佇列的名字
    private static final String WORK_QUEUE_NAME = "durable_work_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        //1.獲取一個連結
        Connection rabbitMQConnections = RabbitMQConnectionFactory.getRabbitMQConnections();
        // 2.建立一個通道
        Channel channel = rabbitMQConnections.createChannel();
        /*
            3.要傳送訊息,我們必須宣告一個要傳送的佇列;然後我們可以向佇列釋出一條訊息
            引數1:queue:佇列的名字
            引數2:durable: 佇列是否做持久化操作,true表示佇列做持久化操作,該佇列將在伺服器重啟後,繼續存在
            引數3:exclusive:
            引數4:autoDelete:是否宣告自動刪除
            引數5:arguments 佇列引數
         */
        channel.queueDeclare(WORK_QUEUE_NAME, true, false, false, null);
        // 3.宣告要傳送的訊息
        /*
         * 4.傳送訊息
         *  引數1:exchange 交換機的名字,簡單模式不需要交換機,預設的就好
         *  引數2:routingKey
         *  引數3:props 屬性
         *  引數4:要傳送的訊息體
         */
        for (int i=1;i<=100;i++) {
            // 由於沒有實際的任務,用Thead.sleep()來表示傳送複雜的字串
            String sendMsg = "小河流水嘩啦1啦========>"+i;
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            channel.basicPublish("", WORK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, sendMsg.getBytes());
        }
        System.out.println("訊息傳送完成");
        channel.close();
        rabbitMQConnections.close();
    }

}
View Code

修改消費者。設定佇列持久化操作。生產者和消費者的佇列引數要一樣,不然還是會報上述錯誤

消費者C1程式碼如下:

public class WorkRabbitMQReceiver {
    // 宣告一個佇列的名字
    private static final String WORK_QUEUE_NAME = "durable_work_queue";

    /**
     * 消費者這一方的使用,和生產者一樣,都是建立一個連結,開啟一個通道。然後從佇列中獲取訊息
     * DeliverCallback介面來緩衝由伺服器推送給我們的訊息。
     */
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.建立一個連結
        Connection rabbitMQConnections = RabbitMQConnectionFactory.getRabbitMQConnections();
        // 2.開啟一個通道
        final Channel channel = rabbitMQConnections.createChannel();
        // 3.宣告一個佇列:我們在這裡也聲明瞭佇列。因為我們可能在啟動生產者之前啟動消費者,所以我們希望在嘗試使用來自該佇列的訊息之前確保該佇列存在。
        channel.queueDeclare(WORK_QUEUE_NAME, true, false, false, null);
        // 4.伺服器的訊息是非同步傳送給我們的,所以需要提供一個回撥。這個回撥將緩衝訊息,直到我們使用收到的訊息
        channel.basicQos(1);
        DeliverCallback deliverCallback = new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery delivery) throws IOException {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(new Date() + message);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        // 消費訊息,引數1:佇列名稱 引數2:true標識自動應答,如果設定為false,rabbitmq還會繼續推送訊息給消費者 引數三:標識回撥
        channel.basicConsume(WORK_QUEUE_NAME, false, deliverCallback, consumerTag -> {
        });
    }
}
View Code

消費者C2程式碼如下:

public class WorkRabbitMQReceiver2 {
    // 宣告一個佇列的名字
    private static final String WORK_QUEUE_NAME = "durable_work_queue";

    /**
     * 消費者這一方的使用,和生產者一樣,都是建立一個連結,開啟一個通道。然後從佇列中獲取訊息
     * DeliverCallback介面來緩衝由伺服器推送給我們的訊息。
     */
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.建立一個連結
        Connection rabbitMQConnections = RabbitMQConnectionFactory.getRabbitMQConnections();
        // 2.開啟一個通道
        final Channel channel = rabbitMQConnections.createChannel();
        // 3.宣告一個佇列:我們在這裡也聲明瞭佇列。因為我們可能在啟動生產者之前啟動消費者,所以我們希望在嘗試使用來自該佇列的訊息之前確保該佇列存在。
        channel.queueDeclare(WORK_QUEUE_NAME, true, false, false, null);
        channel.basicQos(5);
        // 4.伺服器的訊息是非同步傳送給我們的,所以需要提供一個回撥。這個回撥將緩衝訊息,直到我們使用收到的訊息
        DeliverCallback deliverCallback = new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery delivery) throws IOException {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(new Date() + message);
                try {
                    Thread.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        // 消費訊息,引數1:佇列名稱 引數2:true標識自動應答,如果設定為false,rabbitmq還會繼續推送訊息給消費者 引數三:標識回撥
        channel.basicConsume(WORK_QUEUE_NAME, false, deliverCallback, consumerTag -> {
        });
    }
}
View Code

模擬生產者傳送完訊息,rabbitmq死掉。然後rabbitmq重啟。啟動消費者,看能否接收到rabbitmq死掉前生產者傳送的訊息

效果如下:

可見,rabbitmq伺服器死掉前, 傳送訊息的時間為7:31.而rabbitmq重啟後,啟動消費者,接收到 訊息的時間是7:34.可見,rabbitmq死掉了,生產者傳送的訊息,並沒有消失,而是被持久化隊列了