1. 程式人生 > >訊息佇列RabbitMQ應答模式

訊息佇列RabbitMQ應答模式

為了確保訊息不會丟失,RabbitMQ支援訊息應答。消費者傳送一個訊息應答,告訴RabbitMQ這個訊息已經接收並且處理完畢了。RabbitMQ就可以刪除它了。
如果一個消費者掛掉卻沒有傳送應答,RabbitMQ會理解為這個訊息沒有處理完全,然後交給另一個消費者去重新處理。這樣,你就可以確認即使消費者偶爾掛掉也不會丟失任何訊息了。
沒有任何訊息超時限制;只有當消費者掛掉時,RabbitMQ才會重新投遞。即使處理一條訊息會花費很長的時間。
訊息應答是預設開啟的。我們通過顯示的設定autoAsk=true關閉這種機制。現即自動應答開,一旦我們完成任務,消費者會自動傳送應答。通知RabbitMQ訊息已被處理,可以從記憶體刪除。如果消費者因宕機或連結失敗等原因沒有傳送ACK(不同於ActiveMQ,在RabbitMQ裡,訊息沒有過期的概念),則RabbitMQ會將訊息重新發送給其他監聽在佇列的下一個消費者。

 

案例:
生產者端程式碼不變,消費者端程式碼這部分就是用於開啟手動應答模式的。
channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
注:第二個引數值為false代表關閉RabbitMQ的自動應答機制,改為手動應答。
在處理完訊息時,返回應答狀態,true表示為自動應答模式。
channel.basicAck(envelope.getDeliveryTag(), false);

 

 

自動應答: 不在乎消費者對訊息處理是否成功,都會告訴佇列刪除訊息。如果處理訊息失敗,實現自動補償(佇列投遞過去 重新處理)。

 

手動應答: 消費者處理完業務邏輯,手動返回ack(通知)告訴佇列處理完了,佇列進而刪除訊息。

 

生產者程式碼不變,消費者:

package com.toov5.Consumer;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties; import com.toov5.utils.MQConnectionUtils; public class Consumer { //佇列名稱 private static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException { System.out.println("消費者啟動.........."); //建立新的連線 Connection connection = MQConnectionUtils.newConnection(); //建立Channel Channel channel = connection.createChannel(); // 消費者關聯佇列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); DefaultConsumer defaultConsumerr = new DefaultConsumer(channel) { //監聽獲取訊息 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg =new String(body,"UTF-8"); System.out.println("消費者獲取生產者訊息:"+msg); } }; //牽手模式設定 預設自動應答模式 true:自動應答模式 channel.basicConsume(QUEUE_NAME, false, defaultConsumerr);// fanse手動應答 // //關閉通道和連線 // channel.close(); // connection.close(); } }

手動應答。此時 訊息佇列的訊息 一直沒有被清除掉

如下修改:

package com.toov5.Consumer;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.toov5.utils.MQConnectionUtils;

public class Consumer {
  
     //佇列名稱
        private static final String QUEUE_NAME = "test_queue";
        
        public static void main(String[] args) throws IOException, TimeoutException {
            System.out.println("消費者啟動..........");
            //建立新的連線
        Connection connection = MQConnectionUtils.newConnection();
           //建立Channel
            final Channel channel = connection.createChannel();
            // 消費者關聯佇列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            
              DefaultConsumer defaultConsumerr = new DefaultConsumer(channel) {
                  //監聽獲取訊息
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                            byte[] body) throws IOException {
                        String msg =new String(body,"UTF-8");
                        System.out.println("消費者獲取生產者訊息:"+msg);
                        channel.basicAck(envelope.getDeliveryTag(), false);  //手動應答 告訴訊息佇列伺服器 消費成功
                    }
              };
            //牽手模式設定  預設自動應答模式  true:自動應答模式  
              channel.basicConsume(QUEUE_NAME, false, defaultConsumerr);//    fanse手動應答          
              
//            //關閉通道和連線
//             channel.close();
//             connection.close();
        }
}

 

 這樣就消費完畢了