RabbitMQ的使用(一)_JavaClient實現簡單模式+Work模式
RabbitMQ的使用
rabbitmq的官網教程地址:https://www.rabbitmq.com/getstarted.html
1.RabbitMQ定義:是一款開源的訊息代理的佇列伺服器,是一種應用程式之間的通訊方法。RabbitMQ是基於Erlang語言來編寫的,基於AMQP協議的佇列。你可以把RabbitMQ想象成一個郵箱。傳送人投遞訊息到郵箱中。接收者從郵箱中取出訊息的過程
RabbitMQ負責接收、儲存、轉發訊息。
2.RabbitMQ的使用場景:非同步處理、流量削峰、應用解耦、應用日誌
3.RabbitMQ中的幾個概念:
生產者:傳送訊息的程式就是生產者
消費者:就是接收訊息的那一方,負責對接收到的訊息進行處理
3.訊息模式的種類:
1.簡單模式
2.Work模式(Work queues):
3.訂閱模式(Publish/Subscribe)
4.路由模式(Routing)
5.Topics
6.RPC
7.Publisher Confirms
官網圖片如下:
4.交換機種類:direct、topic、fanout、headers四種。
5.1:在Rabbitmq伺服器上建立一個/yingxiaocao的虛擬主機,併為這個主機新增一個yingxiaocao的使用者,提供訪問許可權。
5.2 :匯入依賴包:with the groupIdcom.rabbitmqand the artifactIdamqp-client
<dependencies> <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --> <dependency> <View CodegroupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.9.0</version> </dependency> </dependencies>
5.3 :建立連線工廠
public class RabbitMQConnectionFactory { public static Connection getRabbitMQConnections() throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(CommonConstant.ADD_RESS); connectionFactory.setPort(CommonConstant.PORT); connectionFactory.setUsername(CommonConstant.USER_NAME); connectionFactory.setPassword(CommonConstant.PASSWORD); connectionFactory.setVirtualHost(CommonConstant.VIRTUAL_HOST); return connectionFactory.newConnection(); } }View Code
5.4 建立生產者
public class RabbitSender { // 宣告一個佇列的名字 private static final String SIMPLE_QUEUE_NAME = "hello_world_queue"; public static void main(String[] args) throws IOException, TimeoutException { //1.獲取一個連結 Connection rabbitMQConnections = RabbitMQConnectionFactory.getRabbitMQConnections(); // 2.建立一個通道 Channel channel = rabbitMQConnections.createChannel(); /* 3.要傳送訊息,我們必須宣告一個要傳送的佇列;然後我們可以向佇列釋出一條訊息 引數1:queue:佇列的名字 引數2:durable: 佇列是否做持久化操作,true表示佇列做持久化操作,該佇列將在伺服器重啟後,繼續存在 引數3:exclusive: 引數4:autoDelete:是否宣告自動刪除 引數5:arguments 佇列引數 */ channel.queueDeclare(SIMPLE_QUEUE_NAME, false, false, false, null); // 3.宣告要傳送的訊息 String sendMsg = "小河流水嘩啦1啦"; /* * 4.傳送訊息 * 引數1:exchange 交換機的名字,簡單模式不需要交換機,預設的就好 * 引數2:routingKey * 引數3:props 屬性 * 引數4:要傳送的訊息體 */ channel.basicPublish("", SIMPLE_QUEUE_NAME, null, sendMsg.getBytes()); channel.close(); rabbitMQConnections.close(); } }View Code
5.5 建立消費者
public class RabbitMQReceiver { // 宣告一個佇列的名字 private static final String SIMPLE_QUEUE_NAME = "hello_world_queue"; /** * 消費者這一方的使用,和生產者一樣,都是建立一個連結,開啟一個通道。然後從佇列中獲取訊息 * DeliverCallback介面來緩衝由伺服器推送給我們的訊息。 */ public static void main(String[] args) throws IOException, TimeoutException { //1.建立一個連結 Connection rabbitMQConnections = RabbitMQConnectionFactory.getRabbitMQConnections(); // 2.開啟一個通道 final Channel channel = rabbitMQConnections.createChannel(); // 3.宣告一個佇列:我們在這裡也聲明瞭佇列。因為我們可能在啟動生產者之前啟動消費者,所以我們希望在嘗試使用來自該佇列的訊息之前確保該佇列存在。 channel.queueDeclare(SIMPLE_QUEUE_NAME, false, false, false, null); // 4.伺服器的訊息是非同步傳送給我們的,所以需要提供一個回撥。這個回撥將緩衝訊息,直到我們使用收到的訊息 DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery delivery) throws IOException { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(new Date() + message); } }; // 消費訊息,引數1:佇列名稱 引數2:true標識自動應答,如果設定為false,rabbitmq還會繼續推送訊息給消費者 引數三:標識回撥 channel.basicConsume(SIMPLE_QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } }View Code
或者使用
這個來接收訊息 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("收到的訊息是:===>" + new String(body)); channel.basicAck(envelope.getDeliveryTag(), false);//告訴伺服器我們收到訊息, true代表拒收訊息 } }; //如果設定自動應答為false,那麼我們必須手動告訴伺服器我收到訊息了,否則下次消費者重啟會再次收到之前的訊息 channel.basicConsume(QUEUENAME, false, defaultConsumer);View Code
5.6 測試結果,每傳送一條訊息,消費者就能收到一條
public class WorkRabbitSender { // 宣告一個佇列的名字 private static final String WORK_QUEUE_NAME = "work_queue"; public static void main(String[] args) throws IOException, TimeoutException { //1.獲取一個連結 Connection rabbitMQConnections = RabbitMQConnectionFactory.getRabbitMQConnections(); // 2.建立一個通道 Channel channel = rabbitMQConnections.createChannel(); /* 3.要傳送訊息,我們必須宣告一個要傳送的佇列;然後我們可以向佇列釋出一條訊息 引數1:queue:佇列的名字 引數2:durable: 佇列是否做持久化操作,true表示佇列做持久化操作,該佇列將在伺服器重啟後,繼續存在 引數3:exclusive: 引數4:autoDelete:是否宣告自動刪除 引數5:arguments 佇列引數 */ channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null); // 3.宣告要傳送的訊息 /* * 4.傳送訊息 * 引數1:exchange 交換機的名字,簡單模式不需要交換機,預設的就好 * 引數2:routingKey * 引數3:props 屬性 * 引數4:要傳送的訊息體 */ for (int i=1;i<=100;i++) { // 由於沒有實際的任務,用Thead.sleep()來表示傳送複雜的字串 String sendMsg = "小河流水嘩啦1啦========>"+i; try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicPublish("", WORK_QUEUE_NAME, null, sendMsg.getBytes()); } System.out.println("訊息傳送完成"); channel.close(); rabbitMQConnections.close(); } }View Code
public class WorkRabbitMQReceiver { // 宣告一個佇列的名字 private static final String WORK_QUEUE_NAME = "work_queue"; /** * 消費者這一方的使用,和生產者一樣,都是建立一個連結,開啟一個通道。然後從佇列中獲取訊息 * DeliverCallback介面來緩衝由伺服器推送給我們的訊息。 */ public static void main(String[] args) throws IOException, TimeoutException { //1.建立一個連結 Connection rabbitMQConnections = RabbitMQConnectionFactory.getRabbitMQConnections(); // 2.開啟一個通道 final Channel channel = rabbitMQConnections.createChannel(); // 3.宣告一個佇列:我們在這裡也聲明瞭佇列。因為我們可能在啟動生產者之前啟動消費者,所以我們希望在嘗試使用來自該佇列的訊息之前確保該佇列存在。 channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null); // 4.伺服器的訊息是非同步傳送給我們的,所以需要提供一個回撥。這個回撥將緩衝訊息,直到我們使用收到的訊息 DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery delivery) throws IOException { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(new Date() + message); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }; // 消費訊息,引數1:佇列名稱 引數2:true標識自動應答,如果設定為false,rabbitmq還會繼續推送訊息給消費者 引數三:標識回撥 channel.basicConsume(WORK_QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } }View Code
建立一個消費者C2
public class WorkRabbitMQReceiver2 { // 宣告一個佇列的名字 private static final String WORK_QUEUE_NAME = "work_queue"; /** * 消費者這一方的使用,和生產者一樣,都是建立一個連結,開啟一個通道。然後從佇列中獲取訊息 * DeliverCallback介面來緩衝由伺服器推送給我們的訊息。 */ public static void main(String[] args) throws IOException, TimeoutException { //1.建立一個連結 Connection rabbitMQConnections = RabbitMQConnectionFactory.getRabbitMQConnections(); // 2.開啟一個通道 final Channel channel = rabbitMQConnections.createChannel(); // 3.宣告一個佇列:我們在這裡也聲明瞭佇列。因為我們可能在啟動生產者之前啟動消費者,所以我們希望在嘗試使用來自該佇列的訊息之前確保該佇列存在。 channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null); // 4.伺服器的訊息是非同步傳送給我們的,所以需要提供一個回撥。這個回撥將緩衝訊息,直到我們使用收到的訊息 DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery delivery) throws IOException { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(new Date() + message); try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } }; // 消費訊息,引數1:佇列名稱 引數2:true標識自動應答,如果設定為false,rabbitmq還會繼續推送訊息給消費者 引數三:標識回撥 channel.basicConsume(WORK_QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } }View Code
預設情況下,RabbitMQ將按順序將每條訊息傳送給下一個使用者。平均而言,每個消費者將獲得相同數量的訊息。這種分發訊息的方式稱為輪詢
效果圖如下:
輪詢的方式的弊端。例如上述程式碼。假設執行程式碼消耗時間忽略不計,以執行緒睡眠之間為消費者處理業務需要的時間。那麼對於消費者C1來說,每次執行訊息需要休息1s,而消費者C2每次執行訊息休息1ms。顯然,消費者C2的處理速度遠快於消費者C1,在這種情況下,輪詢的方式,仍然會使兩臺伺服器獲取到相同的訊息數。而我們的RabbitMQ伺服器並不知道,兩臺伺服器的各自處理速度。這就會造成一些效能損失。這是因為RabbitMQ只在訊息進入佇列時傳送訊息。它不檢視使用者未確認訊息的數量。它只是盲目地將第n個訊息傳送給第n個消費者
為了解決這種情況:我們可以使用設定為prefetchCount = 1的basicQos方法。這告訴RabbitMQ一次不要給一個消費者傳送一條以上的訊息。或者,換句話說,在消費者處理並確認之前,不要向它傳送新訊息。相反,它將把它分派到下一個不繁忙的消費者
生產者程式碼 維持不變,消費者程式碼,加上如下兩句話
int prefetchCount = 1; channel.basicQos(prefetchCount);View Code
basicQos根據情況設定:消費者C1設定為1,消費者C2設定為3
注意:這時候,不能使用自動應答的方式,而是應改為手動應答的方式。否則還是輪詢的接收方式。自動應答,是訊息被髮送出去之後,不管消費者是否消費成功,都被rabbitmq認為是已消費完成。然後就會發送下一條訊息給消費者
修改消費者C1程式碼:
public class WorkRabbitMQReceiver { // 宣告一個佇列的名字 private static final String WORK_QUEUE_NAME = "work_queue"; /** * 消費者這一方的使用,和生產者一樣,都是建立一個連結,開啟一個通道。然後從佇列中獲取訊息 * DeliverCallback介面來緩衝由伺服器推送給我們的訊息。 */ public static void main(String[] args) throws IOException, TimeoutException { //1.建立一個連結 Connection rabbitMQConnections = RabbitMQConnectionFactory.getRabbitMQConnections(); // 2.開啟一個通道 final Channel channel = rabbitMQConnections.createChannel(); // 3.宣告一個佇列:我們在這裡也聲明瞭佇列。因為我們可能在啟動生產者之前啟動消費者,所以我們希望在嘗試使用來自該佇列的訊息之前確保該佇列存在。 channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null); // 4.伺服器的訊息是非同步傳送給我們的,所以需要提供一個回撥。這個回撥將緩衝訊息,直到我們使用收到的訊息 channel.basicQos(1); DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery delivery) throws IOException { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(new Date() + message); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; // 消費訊息,引數1:佇列名稱 引數2:true標識自動應答,如果設定為false,rabbitmq還會繼續推送訊息給消費者 引數三:標識回撥 channel.basicConsume(WORK_QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } }View Code
修改消費者C2程式碼:
public class WorkRabbitMQReceiver2 { // 宣告一個佇列的名字 private static final String WORK_QUEUE_NAME = "work_queue"; /** * 消費者這一方的使用,和生產者一樣,都是建立一個連結,開啟一個通道。然後從佇列中獲取訊息 * DeliverCallback介面來緩衝由伺服器推送給我們的訊息。 */ public static void main(String[] args) throws IOException, TimeoutException { //1.建立一個連結 Connection rabbitMQConnections = RabbitMQConnectionFactory.getRabbitMQConnections(); // 2.開啟一個通道 final Channel channel = rabbitMQConnections.createChannel(); // 3.宣告一個佇列:我們在這裡也聲明瞭佇列。因為我們可能在啟動生產者之前啟動消費者,所以我們希望在嘗試使用來自該佇列的訊息之前確保該佇列存在。 channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null); channel.basicQos(5); // 4.伺服器的訊息是非同步傳送給我們的,所以需要提供一個回撥。這個回撥將緩衝訊息,直到我們使用收到的訊息 DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery delivery) throws IOException { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(new Date() + message); try { Thread.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; // 消費訊息,引數1:佇列名稱 引數2:true標識自動應答,如果設定為false,rabbitmq還會繼續推送訊息給消費者 引數三:標識回撥 channel.basicConsume(WORK_QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } }View Code
效果圖如下:
很顯然,不再是輪詢的方式接收訊息了。
7.訊息確認機制:
如果一個消費者開始了一個很長的任務,但是隻執行了一半,消費者就死掉了。如果使用自動應答的方式,一旦RabbitMQ向消費者傳送了一條訊息,它就會立即將其標記為刪除。在這種情況下,如果消費者死掉了,我們將丟失它正在處理的資訊。我們還將丟失所有傳送給這個特定消費者但尚未處理的訊息。但是這樣的話,程式就會有問題。為了確保訊息不會丟失,RabbitMQ支援訊息確認。一個確認被消費者傳送回來告訴RabbitMQ一個特定的訊息已經被接收,被處理並且RabbitMQ可以自由地刪除它。
如果消費者在沒有傳送ack的應答的情況下死亡(它的通道關閉了,連線關閉了,或者TCP連線丟失了),RabbitMQ將理解訊息沒有被完全處理,並將其重新排隊。如果同時有其他消費者線上,它就會迅速地將其重新發送給其他消費者。這樣你就可以確保沒有資訊丟失。前面的列子中,我將autoAck=true表示自動應答,現在只需要將該引數改為false,表示手動應答。上述程式碼已經演示了
8.訊息持久化操作
我們已經學會了如何確保即使消費者死了,任務也不會丟失。但是如果RabbitMQ伺服器停止,我們的任務仍然會丟失。
當RabbitMQ退出或崩潰時,它將忘記佇列和訊息,除非你告訴它不要這樣做。要確保訊息不丟失,需要做兩件事:我們需要將佇列和訊息標記為持久的。
首先,我們需要確保佇列在RabbitMQ節點重新啟動時能夠存活。為此,我們需要將其宣告為持久的
boolean durable = true; channel.queueDeclare("work_queue", durable, false, false, null);
這個命令本身是正確的,但是它不能在我們目前的設定中工作。這是因為我們已經定義了一個名為work_queue的佇列,它不是持久的。RabbitMQ不允許你用不同的引數重新定義一個已有的佇列,如果這樣做了都會返回一個錯誤。
錯誤資訊如下:
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'work_queue' in vhost '/yingxiaocao': received 'true' but current is 'false', class-id=50, method-id=10) at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36) at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502) at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:293) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141)View Code
解決辦法:宣告一個具有不同名稱的佇列
boolean durable = true; channel.queueDeclare("durable_work_queue", durable, false, false, null);
在這一點上,我們確信即使RabbitMQ重啟,durable_work_queue佇列也不會丟失。現在我們需要將訊息標記為持有的。
可通過將MessageProperties設定為PERSISTENT_TEXT_PLAIN
channel.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
修改生產者程式碼,設定佇列持久化操作。訊息持久化操作,如下
public class WorkRabbitSender { // 宣告一個佇列的名字 private static final String WORK_QUEUE_NAME = "durable_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { //1.獲取一個連結 Connection rabbitMQConnections = RabbitMQConnectionFactory.getRabbitMQConnections(); // 2.建立一個通道 Channel channel = rabbitMQConnections.createChannel(); /* 3.要傳送訊息,我們必須宣告一個要傳送的佇列;然後我們可以向佇列釋出一條訊息 引數1:queue:佇列的名字 引數2:durable: 佇列是否做持久化操作,true表示佇列做持久化操作,該佇列將在伺服器重啟後,繼續存在 引數3:exclusive: 引數4:autoDelete:是否宣告自動刪除 引數5:arguments 佇列引數 */ channel.queueDeclare(WORK_QUEUE_NAME, true, false, false, null); // 3.宣告要傳送的訊息 /* * 4.傳送訊息 * 引數1:exchange 交換機的名字,簡單模式不需要交換機,預設的就好 * 引數2:routingKey * 引數3:props 屬性 * 引數4:要傳送的訊息體 */ for (int i=1;i<=100;i++) { // 由於沒有實際的任務,用Thead.sleep()來表示傳送複雜的字串 String sendMsg = "小河流水嘩啦1啦========>"+i; try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicPublish("", WORK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, sendMsg.getBytes()); } System.out.println("訊息傳送完成"); channel.close(); rabbitMQConnections.close(); } }View Code
修改消費者。設定佇列持久化操作。生產者和消費者的佇列引數要一樣,不然還是會報上述錯誤
消費者C1程式碼如下:
public class WorkRabbitMQReceiver { // 宣告一個佇列的名字 private static final String WORK_QUEUE_NAME = "durable_work_queue"; /** * 消費者這一方的使用,和生產者一樣,都是建立一個連結,開啟一個通道。然後從佇列中獲取訊息 * DeliverCallback介面來緩衝由伺服器推送給我們的訊息。 */ public static void main(String[] args) throws IOException, TimeoutException { //1.建立一個連結 Connection rabbitMQConnections = RabbitMQConnectionFactory.getRabbitMQConnections(); // 2.開啟一個通道 final Channel channel = rabbitMQConnections.createChannel(); // 3.宣告一個佇列:我們在這裡也聲明瞭佇列。因為我們可能在啟動生產者之前啟動消費者,所以我們希望在嘗試使用來自該佇列的訊息之前確保該佇列存在。 channel.queueDeclare(WORK_QUEUE_NAME, true, false, false, null); // 4.伺服器的訊息是非同步傳送給我們的,所以需要提供一個回撥。這個回撥將緩衝訊息,直到我們使用收到的訊息 channel.basicQos(1); DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery delivery) throws IOException { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(new Date() + message); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; // 消費訊息,引數1:佇列名稱 引數2:true標識自動應答,如果設定為false,rabbitmq還會繼續推送訊息給消費者 引數三:標識回撥 channel.basicConsume(WORK_QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } }View Code
消費者C2程式碼如下:
public class WorkRabbitMQReceiver2 { // 宣告一個佇列的名字 private static final String WORK_QUEUE_NAME = "durable_work_queue"; /** * 消費者這一方的使用,和生產者一樣,都是建立一個連結,開啟一個通道。然後從佇列中獲取訊息 * DeliverCallback介面來緩衝由伺服器推送給我們的訊息。 */ public static void main(String[] args) throws IOException, TimeoutException { //1.建立一個連結 Connection rabbitMQConnections = RabbitMQConnectionFactory.getRabbitMQConnections(); // 2.開啟一個通道 final Channel channel = rabbitMQConnections.createChannel(); // 3.宣告一個佇列:我們在這裡也聲明瞭佇列。因為我們可能在啟動生產者之前啟動消費者,所以我們希望在嘗試使用來自該佇列的訊息之前確保該佇列存在。 channel.queueDeclare(WORK_QUEUE_NAME, true, false, false, null); channel.basicQos(5); // 4.伺服器的訊息是非同步傳送給我們的,所以需要提供一個回撥。這個回撥將緩衝訊息,直到我們使用收到的訊息 DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery delivery) throws IOException { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(new Date() + message); try { Thread.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; // 消費訊息,引數1:佇列名稱 引數2:true標識自動應答,如果設定為false,rabbitmq還會繼續推送訊息給消費者 引數三:標識回撥 channel.basicConsume(WORK_QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } }View Code
模擬生產者傳送完訊息,rabbitmq死掉。然後rabbitmq重啟。啟動消費者,看能否接收到rabbitmq死掉前生產者傳送的訊息
效果如下:
可見,rabbitmq伺服器死掉前, 傳送訊息的時間為7:31.而rabbitmq重啟後,啟動消費者,接收到 訊息的時間是7:34.可見,rabbitmq死掉了,生產者傳送的訊息,並沒有消失,而是被持久化隊列了