RabbitMQ 訊息應答機制
一、概述
消費者處理一個任務是需要一段時間的,如果有一個消費者正在處理一個比較耗時的任務並且只處理了一部分,突然這個時候消費者宕機了,那麼會出現什麼情況呢?
要回答這個問題,我們先了解一下 RabbitMQ 的訊息應答機制
為了保證訊息從佇列可靠地達到消費者並且被消費者消費處理,RabbitMQ 提供了訊息應答機制,RabbitMQ 有兩種應答機制,自動應答和手動應答
1、自動應答、RabbitMQ 只要將訊息分發給消費者就被認為訊息傳遞成功,就會將記憶體中的訊息刪除,而不管消費者有沒有處理完訊息
2、手動應答、RabbitMQ 將訊息分發給了消費者,並且只有當消費者處理完成了整個訊息之後才會被認為訊息傳遞成功了
可以看出,如果是自動應答模式,消費者在處理任務的過程中宕機了,那麼訊息將會丟失,而手動應答則能夠保證訊息不會被丟失,所以在實際的應用當中絕大多數都採用手動應答
二、手動應答常用 API
// 該訊息已經處理完成了,RabbitMQ 記憶體可以刪除該訊息了 void basicAck(long deliveryTag, boolean multiple) // 不處理該訊息,直接拒絕,然後將該訊息丟棄 void basicReject(long deliveryTag, boolean requeue) void basicNack(long deliveryTag, boolean multiple, boolean requeue)
三、原理圖
Producer 生產訊息傳送給訊息佇列,Consumer01 消費訊息1、Consumer02 消費訊息2、Consumer01 接收到了訊息之後,在處理完部分邏輯的時候突然宕機了,Consumer01 未傳送 ACK,此時訊息1 不會丟失,而是重新進入佇列,由狀態正常的 Consumer02 消費掉
四、編碼
1、RabbitmqUtils(工具類)
public class RabbitmqUtils { private static final String HOST_ADDRESS = "192.168.59.130"; private static final String USER_NAME = "admin"; private static final String PASSWORD = "admin123"; public static Channel getChannel() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST_ADDRESS); factory.setUsername(USER_NAME); factory.setPassword(PASSWORD); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); return channel; } }
2、Producer
public class Producer {
private static final String QUEUE_NAME = "ackDemo";
public static void main(String[] args) throws Exception {
Channel channel = RabbitmqUtils.getChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message = "有意思的訊息--->";
for (int i = 1; i < 11; i++) {
channel.basicPublish("", QUEUE_NAME, null, (message + i).getBytes(StandardCharsets.UTF_8));
}
System.out.println("Producer send message successfully");
}
}
3、Consumer01
public class Consumer01 {
private static final String QUEUE_NAME = "ackDemo";
public static void main(String[] args) throws Exception {
Channel channel = RabbitmqUtils.getChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
try {
// 休眠 10 s
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 引數一、deliveryTag:訊息應答標記
// 引數二、multiple:(false、只應答接收到的那個訊息 true、應答所有傳遞過來的訊息)
// 處理完邏輯之後應答 ack
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
System.out.println(message);
};
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println(consumerTag);
};
// 設定手動應答
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
}
}
4、Consumer02
public class Consumer02 {
private static final String QUEUE_NAME = "ackDemo";
public static void main(String[] args) throws Exception {
Channel channel = RabbitmqUtils.getChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
// 引數一、deliveryTag:訊息應答標記
// 引數二、multiple:(false、只應答接收到的那個訊息 true、應答所有傳遞過來的訊息)
// 處理完邏輯之後應答 ack
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
System.out.println(message);
};
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println(consumerTag);
};
// 設定手動應答
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
}
}
五、測試過程及結果
1、先啟動 Cousumer01、Consumer02
2、生產者傳送 10 條訊息,根據預設的輪詢規則,一個消費者(假設此時為 Consumer01)消費第 1、3、5、7、9 條訊息,另外一個消費者(假設此時為 Consumer02)消費第 2、4、6、8、10 條訊息
3、當 Consumer01 消費第 1、3 條訊息的時候手動強制關閉 Consumer01,那麼原先本應該由 Consumer01 消費的第 5、7、9 條訊息不會丟失,它們將重新進入佇列由 Consumer02 消費掉
4、Consumer01、Consumer02 消費的訊息如下: