訊息佇列RabbitMQ應答模式
為了確保訊息不會丟失,RabbitMQ支援訊息應答。消費者傳送一個訊息應答,告訴RabbitMQ這個訊息已經接收並且處理完畢了。RabbitMQ就可以刪除它了。
如果一個消費者掛掉卻沒有傳送應答,RabbitMQ會理解為這個訊息沒有處理完全,然後交給另一個消費者去重新處理。這樣,你就可以確認即使消費者偶爾掛掉也不會丟失任何訊息了。
沒有任何訊息超時限制;只有當消費者掛掉時,RabbitMQ才會重新投遞。即使處理一條訊息會花費很長的時間。
訊息應答是預設開啟的。我們通過顯示的設定autoAsk=true關閉這種機制。現即自動應答開,一旦我們完成任務,消費者會自動傳送應答。通知RabbitMQ訊息已被處理,可以從記憶體刪除。如果消費者因宕機或連結失敗等原因沒有傳送ACK(不同於ActiveMQ,在RabbitMQ裡,訊息沒有過期的概念),則RabbitMQ會將訊息重新發送給其他監聽在佇列的下一個消費者。
案例:
生產者端程式碼不變,消費者端程式碼這部分就是用於開啟手動應答模式的。
channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
注:第二個引數值為false代表關閉RabbitMQ的自動應答機制,改為手動應答。
在處理完訊息時,返回應答狀態,true表示為自動應答模式。
channel.basicAck(envelope.getDeliveryTag(), false);
自動應答: 不在乎消費者對訊息處理是否成功,都會告訴佇列刪除訊息。如果處理訊息失敗,實現自動補償(佇列投遞過去 重新處理)。
手動應答: 消費者處理完業務邏輯,手動返回ack(通知)告訴佇列處理完了,佇列進而刪除訊息。
生產者程式碼不變,消費者:
package com.toov5.Consumer; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties; import com.toov5.utils.MQConnectionUtils; public class Consumer { //佇列名稱 private static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException { System.out.println("消費者啟動.........."); //建立新的連線 Connection connection = MQConnectionUtils.newConnection(); //建立Channel Channel channel = connection.createChannel(); // 消費者關聯佇列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); DefaultConsumer defaultConsumerr = new DefaultConsumer(channel) { //監聽獲取訊息 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg =new String(body,"UTF-8"); System.out.println("消費者獲取生產者訊息:"+msg); } }; //牽手模式設定 預設自動應答模式 true:自動應答模式 channel.basicConsume(QUEUE_NAME, false, defaultConsumerr);// fanse手動應答 // //關閉通道和連線 // channel.close(); // connection.close(); } }
手動應答。此時 訊息佇列的訊息 一直沒有被清除掉
如下修改:
package com.toov5.Consumer; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties; import com.toov5.utils.MQConnectionUtils; public class Consumer { //佇列名稱 private static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException { System.out.println("消費者啟動.........."); //建立新的連線 Connection connection = MQConnectionUtils.newConnection(); //建立Channel final Channel channel = connection.createChannel(); // 消費者關聯佇列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); DefaultConsumer defaultConsumerr = new DefaultConsumer(channel) { //監聽獲取訊息 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg =new String(body,"UTF-8"); System.out.println("消費者獲取生產者訊息:"+msg); channel.basicAck(envelope.getDeliveryTag(), false); //手動應答 告訴訊息佇列伺服器 消費成功 } }; //牽手模式設定 預設自動應答模式 true:自動應答模式 channel.basicConsume(QUEUE_NAME, false, defaultConsumerr);// fanse手動應答 // //關閉通道和連線 // channel.close(); // connection.close(); } }
這樣就消費完畢了