1. 程式人生 > >RabbitMQ學習筆記八:RabbitMQ的訊息確認

RabbitMQ學習筆記八:RabbitMQ的訊息確認

來源:

https://blog.csdn.net/chenxyt/article/details/79259838

一、概述
    前文說到RabbitMQ的交換機、佇列、訊息的持久化並不能100%的保證訊息不會丟失。首先從生產者端,持久化的訊息在RabbitMQ同步到磁碟之前,還需要一段時間,這個時間很短,但是不容忽視。假如此時伺服器宕機了,那麼訊息就丟失了。這種發生在生產者上的訊息丟失我們可以使用映象佇列和事務機制來保證資料的完整性。其次是消費者端,假如消費者拿到訊息還未處理,發生異常而崩潰,此時這條訊息佇列中已經沒有了,而我們的業務還需要這條訊息,那麼這種情況也算是訊息丟失。在消費者端發生的訊息丟失可以通過消費者的訊息確認機制來解決。當然無論哪種方式對RabbitMQ的效能都有一定的影響。本文主要對RabbitMQ對於生產者和消費者不同的訊息確認方式做一個瞭解,並解決在訊息確認中出現的阻塞問題。
二、事務管理(生產者)
    事務管理的操作是針對於生產者向RabbitMQ伺服器傳送訊息這一過程的。RabbitMQ對事務的管理有如下兩個層面的方式:
    1、AMQP協議層面,基於AMQP的事務機制
    2、通道層面,將channel設定成confirm
2.1事務機制
    RabbitMQ提供了txSelect()、txCommit()和txRollback()三個方法對訊息傳送進行事務管理,txSelect用於將通道channel開啟事務模式,txCommit用於提交事務,txRollback使用者進行事務回滾操作。
    示例程式碼:
try{
   //channel開啟事務模式
   channel.txSelect();
   //傳送訊息
   channel.basicPublish(EXCHANGE_NAME,"",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
   //模擬異常
   int n = 1/0;
   //提交事務
   channel.txCommit();
}catch(Exception e){
   e.printStackTrace();
   channel.txRollback();
}
    假如在txCommit之前發生了異常,那麼就可以通過Rollback進行回滾操作。
    以上是基於AMQP協議層的事務機制,確保了資料在生產者與RabbitMQ伺服器之間的可靠性,但是效能開銷較大。
2.2Confirm模式
    RabbitMQ提供了一種低消耗的事務管理方式,將channel設定成confirm模式。confirm模式的channel,通過該channel發出的訊息會生成一個唯一的有序ID(從1開始),一旦訊息成功傳送到相應的佇列之後,RabbitMQ服務端會發送給生產者一個確認標誌,包含訊息的ID,這樣生產者就知道該訊息已經發送成功了。如果訊息和佇列是持久化的,那麼當訊息成功寫入磁碟之後,生產者會收到確認訊息。此外服務端也可以設定basic.ack的mutiple域,表明是否是批量確認的訊息,即該序號之前的所有訊息都已經收到了。
    confirm的機制是非同步的,生產者可以在等待的同時繼續傳送下一條訊息,並且非同步等待回撥處理,如果訊息成功傳送,會返回ack訊息供非同步處理,如果訊息傳送失敗發生異常,也會返回nack訊息。confirm的時間沒有明確說明,並且同一個訊息只會被confirm一次。
    我們在生產者使用如下程式碼開啟channel的confirm模式,並且已經開啟事務機制的channel是不能開啟confirm模式的。

channel.confirmSelect();
    處理ack或者nack的方式有三種:
    1、序列confirm:每傳送一條訊息就呼叫waitForConfirms()方法等待服務端confirm

//開啟confirm模式
channel.confirmSelect();
String message = "Hello World";
//傳送訊息
channel.basicPublish(EXCHANGE_NAME,"",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
//判斷是否回覆
if(channel.waitForConfirms()){
    System.out.println("Message send success."); 
 }
    其中waitForConfirms可以換成帶有時間引數的方法waitForConfirms(Long mills)指定等待響應時間

    2、批量confirm:每傳送一批次訊息就呼叫waitForConfirms()方法等待服務端confirm

//開啟confirm模式
channel.confirmSelect();
for(int i =0;i<1000;i++){
    String message = "Hello World";
    //傳送訊息
    channel.basicPublish(EXCHANGE_NAME,"",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
    if(i%100==0){
        //每傳送100條判斷一次是否回覆
        if(channel.waitForConfirms()){
              System.out.println("Message send success."); 
          }
     }
}
    批量的方法從數量級上降低了confirm的效能消耗,提高了效率,但是有個致命的缺陷,一旦回覆確認失敗,當前確認批次的訊息會全部重新發送,導致訊息重複傳送。所以批量的confirm雖然效能提高了,但是訊息的重複率也提高了。

    3、非同步confirm:使用監聽方法,當服務端confirm了一條或多條訊息後,呼叫回撥方法

//宣告一個用來記錄訊息唯一ID的有序集合SortedSet
final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
//開啟confirm模式
channel.confirmSelect();
//非同步監聽方法 處理ack與nack方法
channel.addConfirmListener(new ConfirmListener() {
    //處理ack multiple 是否批量 如果是批量 則將比該條小的所有資料都移除 否則只移除該條
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        if (multiple) {
            confirmSet.headSet(deliveryTag + 1).clear();
        } else {
            confirmSet.remove(deliveryTag);
        }
    }
    //處理nack 與ack相同
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("There is Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);
        if (multiple) {
            confirmSet.headSet(deliveryTag + 1).clear();
        } else {
            confirmSet.remove(deliveryTag);
        }
    }
});
while (true) {
    //獲取訊息confirm的唯一ID
    long nextSeqNo = channel.getNextPublishSeqNo();
    String message = "Hello World.";
    //傳送訊息
    channel.basicPublish(EXCHANGE_NAME,"",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
    //將ID加入到有序集合中
    confirmSet.add(nextSeqNo);
}
    每一個comfirm的通道維護一個集合,每傳送一條資料,集合增加一個元素,每非同步響應一條ack或者nack的資料,集合刪除一條。SortedSet是一個有序的集合,它的有序是值大小的有序,不是插入時間的有序。JDK中waitForConfirms()方法也是使用了SortedSet集合

    以上就是RabbitMQ基於訊息傳送方的事務機制和訊息確認,接下來了解訊息接收方的訊息確認機制。
三、訊息確認ack(消費者)
    為了保證RabbitMQ能夠感知消費者正確取到了訊息,RabbitMQ提供了訊息確認機制,與給生產者回復ACK的方式類似,當佇列傳送一條訊息給消費者時,會記錄一個unack標誌,當消費者拿到訊息之後,會回覆一個ack標誌,從而抵消了原來的unack標誌。一般情況下,我們預設是開啟了自動回覆ack的標誌,即當消費者拿到訊息之後立即回覆ack而不管訊息是否正確被處理,這個時間很快,以至於基本看不到unack的狀態。如開篇說到,這裡存在一個嚴重的問題,假如訊息在業務處理的過程中發生異常crash了,那麼這條訊息就消失了,持久化也不會解決這個問題。這裡就需要我們在日常的業務處理中,消費者要手動的確認訊息。確認訊息包括兩種,一種是ack,另一種是unack,unack是表明我這條訊息處理異常了,可以設定引數告訴MQ伺服器是否需要將訊息重新放入到佇列中。同時,如果開啟了手動回覆確認的消費者,當消費者異常斷開時,沒有回覆的訊息會被重新放入佇列供給其他消費者使用。所以程式設計師必須一定要記得回覆訊息確認,不然會導致訊息重複或者大量的訊息堆積。
    下面將通過一個簡單的示例,演示手動回覆訊息確認和忘記回覆訊息確認的場景。示例場景:一個佇列下有兩個手動回覆訊息確認的消費者,兩個消費者會按照系統自帶的輪訓機制獲取訊息,即一個獲取奇數的訊息,一個獲取偶數的訊息。
    1、消費者1和2手動回覆訊息(正常情況)
    2、消費者1和2手動回覆訊息,且消費者1忘了手動回覆並且讀取一部分資料之後發生異常(異常情況)
    編寫生產者程式碼,生產者傳送1000條訊息,並且沒有訊息間隔。

package com.cn.chenxyt.mq;
 
import java.io.IOException;
import java.util.Collections;
import java.util.Hashtable;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
 
import com.rabbitmq.client.AMQP.BasicProperties;  
import com.rabbitmq.client.AMQP.BasicProperties.Builder; 
import com.rabbitmq.client.AMQP.Confirm.SelectOk;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
 
 
public class MqProducer {
     public final static String EXCHANGE_NAME="EXCHANGE_MQ";
     public final static String QUEUE_NAME="queue";
     public void sendMessage() throws IOException, InterruptedException {
         //建立連線工廠
         ConnectionFactory factory = new ConnectionFactory();
         //設定主機、使用者名稱、密碼和客戶端埠號
         factory.setHost("localhost");
         factory.setUsername("guest");
         factory.setPassword("guest");
         factory.setPort(5672);
         //建立一個新的連線 即TCP連線
         Connection connection = factory.newConnection();
         //建立一個通道
         Channel channel = connection.createChannel();
         //建立一個交換機
        // channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
         channel.exchangeDeclare(EXCHANGE_NAME, "fanout",true);
         //建立一個佇列
         channel.queueDeclare(QUEUE_NAME, true, false, false, null);
         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,"");
         for(int i =1;i<1000;i++){
             String message = "Hello World" + (i);
             //傳送訊息
             channel.basicPublish(EXCHANGE_NAME,"",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
             System.out.println("Message send success:" + message); 
         }
 
    }
}
    接下來編寫消費者程式碼,與之前相同,有兩個消費者,消費者1和消費者2
package com.cn.chenxyt.mq;
import java.io.IOException;
import java.util.Hashtable;
import java.util.Map;
 
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
public class MqConsumer1 {
     private static String EXCHANGE_NAME="EXCHANGE_MQ";
     private final static String QUEUE_NAME="queue";
     public static void main(String[] args) throws IOException {
         //建立連線工廠
         ConnectionFactory factory = new ConnectionFactory();
         //設定主機
         factory.setHost("localhost");
         //建立一個新的連線 即TCP連線
         Connection connection = factory.newConnection();
         //建立一個通道
         final Channel channel = connection.createChannel();
         //宣告佇列
         channel.queueDeclare(QUEUE_NAME, true, false, false, null);
         //建立一個交換機
         channel.exchangeDeclare(EXCHANGE_NAME,"fanout",true);         
         //繫結佇列到交換機
         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "",null);
         System.out.println("Consumer1 Waiting Received messages");
         //DefaultConsumer類實現了Consumer介面,通過傳入一個channel,
         //告訴伺服器我們需要哪個channel的訊息並監聽channel,如果channel中有訊息,就會執行回撥函式handleDelivery
         Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("Consumer1 Received '" + message + "'");
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }finally{
                            channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
            //自動回覆佇列應答 -- RabbitMQ中的訊息確認機制
            //false 不自動回覆應答 
            channel.basicConsume(QUEUE_NAME,false, consumer);
    }
}

package com.cn.chenxyt.mq;
import java.io.IOException;
import java.util.Hashtable;
import java.util.Map;
 
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
public class MqConsumer2 {
     private static String EXCHANGE_NAME="EXCHANGE_MQ";
     private final static String QUEUE_NAME="queue";
     public static void main(String[] args) throws IOException {
         //建立連線工廠
         ConnectionFactory factory = new ConnectionFactory();
         //設定主機
         factory.setHost("localhost");
         //建立一個新的連線 即TCP連線
         Connection connection = factory.newConnection();
         //建立一個通道
         final Channel channel = connection.createChannel();
         //宣告佇列
         channel.queueDeclare(QUEUE_NAME, true, false, false, null);
         //建立一個交換機
         channel.exchangeDeclare(EXCHANGE_NAME,"fanout",true);         
         //繫結佇列到交換機
         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "",null);
         System.out.println("Consumer2 Waiting Received messages");
         //DefaultConsumer類實現了Consumer介面,通過傳入一個channel,
         //告訴伺服器我們需要哪個channel的訊息並監聽channel,如果channel中有訊息,就會執行回撥函式handleDelivery
         Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("Consumer2 Received '" + message + "'");
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }finally{
                            channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
            //自動回覆佇列應答 -- RabbitMQ中的訊息確認機制
            //false 不自動回覆應答 
            channel.basicConsume(QUEUE_NAME,false, consumer);
    }
}
    這裡basicConsume設定為false為不自動應答,同時為了保證業務正常執行完,回覆確認要寫在finally程式碼塊裡。channel.basicAck()回覆處理正確,channel.basicNAck()回覆處理失敗,引數設定為true為重新加入佇列。
   啟動消費者1和2再啟動生產者,因為兩個消費者對訊息延遲2s才回復,所以佇列中積累了大量的unack訊息
    
    
    
    
    接下來修改消費者1程式碼,看一下如果程式沒有回覆ack確認是什麼樣子,註釋掉消費者1的ack確認,並把生產者傳送資料條數改成10條(這裡如果在上邊的例子改,需要保證佇列裡沒有資料,可以在管理臺把佇列刪掉,也可以停掉消費者把sleep時間改短然後啟動把之前的訊息接收完畢,當然也可以在上邊測試的時候就把傳送訊息的數目改小一些)

     try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }finally{
                            //channel.basicAck(envelope.getDeliveryTag(), false);
                    }
    接下來啟動消費者1和消費者2以及生產者,可以看到10條訊息,有五條傳送到消費者1,五條傳送到消費者2,同時在訊息接收完畢的時候,由於消費者1沒有ack,所以管理臺上一直有5個unack狀態
    
    
    
    這時我們停掉消費者1,模擬消費者1crash斷開的狀態,可以看到消費者2收到了消費者1沒有ack的訊息,並且管理臺佇列裡的unack狀態也沒有了
    
    
    以上就是關於消費者自動回覆訊息確認的相關內容。
四、阻塞的問題解決
    這裡思考一個問題,就是當消費者1和消費者2都開啟手動回覆並且在業務執行完成之後都進行了回覆,如果生產者傳送了大量訊息,而消費者處理業務的時間(我們用sleep時間模擬)又過長,就會導致訊息佇列中阻塞大量未unack的訊息,會降低系統性能,即便我們把消費者2的sleep時間調低,消費者1仍然是2s處理一條訊息,消費者2迅速處理完,佇列中仍然積累一半unack的訊息,這是為什麼呢?這是因為每個消費者會有一個緩衝池prefetch的概念,prefetch是消費者一次能處理的最大unack的數量,消費者獲取訊息時,實際上是mq先放到了這個緩衝池中,當ack一個之後,mq從緩衝池中拿掉一個。而MQ的輪訓機制恰好是按順序分發,因為我們這裡沒有設定緩衝池的大小,也就是消費者一次最多能拿多少個訊息沒有設定,所以MQ預設你的處理能力很好,會按照順序將訊息全部分發完。所以這裡就會看到消費者1剛好列印的都是奇數的訊息,消費者2剛好列印的是偶數的訊息。
    所以阻塞的問題的解決方案就是我們合理的設定prefetch大小,這樣處理快的消費者就能夠處理更多的訊息,處理慢的消費者也不會發生長時間的阻塞。更詳細的描述,假設有兩個消費者,都設定prefetch大小為10,消費者1處理業務時間是2s,消費者2處理業務時間是2ms,那麼就不會出現上邊的情況消費者1積累大量的unack,這裡最多的unack數目就是兩個prefetch的大小之和20,同時,MQ分發訊息是先塞滿10個到消費者1,再塞滿10個到消費者2,塞第21個的時候,先看消費者1的緩衝池有沒有空位,沒有的話去看消費者2,因為消費者2的處理速度比1快1000倍,所以1000條資料前10條塞給消費者1之後,後邊的資料就都塞給消費者2了。
    設定prefetch大小的方法,在消費者中加入如下程式碼
    
         channel.basicQos(10);
    為了更好的說明上邊的詳細描述,我把程式碼貼出來,變化就是生產者一次發1000條資訊,消費者1和消費者2設定最大prefetch值為10,同時消費者1的處理業務時間(sleep時間)2s,消費者2處理業務時間2ms
    
package com.cn.chenxyt.mq;
 
import java.io.IOException;
import java.util.Collections;
import java.util.Hashtable;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
 
import com.rabbitmq.client.AMQP.BasicProperties;  
import com.rabbitmq.client.AMQP.BasicProperties.Builder; 
import com.rabbitmq.client.AMQP.Confirm.SelectOk;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
 
 
public class MqProducer {
     private final static String EXCHANGE_NAME="EXCHANGE_MQ";
     private final static String QUEUE_NAME="queue";
     
     public static void main(String[] args) throws IOException, InterruptedException {
         //建立連線工廠
         ConnectionFactory factory = new ConnectionFactory();
         //設定主機、使用者名稱、密碼和客戶端埠號
         factory.setHost("localhost");
         factory.setUsername("guest");
         factory.setPassword("guest");
         factory.setPort(5672);
         //建立一個新的連線 即TCP連線
         Connection connection = factory.newConnection();
         //建立一個通道
         Channel channel = connection.createChannel();
         //建立一個交換機
        // channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
         channel.exchangeDeclare(EXCHANGE_NAME, "fanout",true);
         //建立一個佇列
         channel.queueDeclare(QUEUE_NAME, true, false, false, null);
         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,"");
         for(int i =1;i<=1000;i++){
             String message = "Hello World" + (i);
             //傳送訊息
             channel.basicPublish(EXCHANGE_NAME,"",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
             System.out.println("Message send success:" + message); 
         }
 
    }
}
package com.cn.chenxyt.mq;
import java.io.IOException;
import java.util.Hashtable;
import java.util.Map;
 
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
public class MqConsumer1 {
     private static String EXCHANGE_NAME="EXCHANGE_MQ";
     private final static String QUEUE_NAME="queue";
     public static void main(String[] args) throws IOException {
         //建立連線工廠
         ConnectionFactory factory = new ConnectionFactory();
         //設定主機
         factory.setHost("localhost");
         //建立一個新的連線 即TCP連線
         Connection connection = factory.newConnection();
         //建立一個通道
         final Channel channel = connection.createChannel();
         //宣告佇列
         channel.queueDeclare(QUEUE_NAME, true, false, false, null);
         //建立一個交換機
         channel.exchangeDeclare(EXCHANGE_NAME,"fanout",true);         
         //繫結佇列到交換機
         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "",null);
         channel.basicQos(10);
         System.out.println("Consumer1 Waiting Received messages");
         //DefaultConsumer類實現了Consumer介面,通過傳入一個channel,
         //告訴伺服器我們需要哪個channel的訊息並監聽channel,如果channel中有訊息,就會執行回撥函式handleDelivery
         Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("Consumer1 Received '" + message + "'");
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }finally{
                            //channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
            //自動回覆佇列應答 -- RabbitMQ中的訊息確認機制
            //false 不自動回覆應答 
            channel.basicConsume(QUEUE_NAME,false, consumer);
    }
}

package com.cn.chenxyt.mq;
import java.io.IOException;
import java.util.Hashtable;
import java.util.Map;
 
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
public class MqConsumer2 {
     private static String EXCHANGE_NAME="EXCHANGE_MQ";
     private final static String QUEUE_NAME="queue";
     public static void main(String[] args) throws IOException {
         //建立連線工廠
         ConnectionFactory factory = new ConnectionFactory();
         //設定主機
         factory.setHost("localhost");
         //建立一個新的連線 即TCP連線
         Connection connection = factory.newConnection();
         //建立一個通道
         final Channel channel = connection.createChannel();
         //宣告佇列
         channel.queueDeclare(QUEUE_NAME, true, false, false, null);
         //建立一個交換機
         channel.exchangeDeclare(EXCHANGE_NAME,"fanout",true);         
         //繫結佇列到交換機
         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "",null);
         channel.basicQos(10);
         System.out.println("Consumer2 Waiting Received messages");
         //DefaultConsumer類實現了Consumer介面,通過傳入一個channel,
         //告訴伺服器我們需要哪個channel的訊息並監聽channel,如果channel中有訊息,就會執行回撥函式handleDelivery
         Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("Consumer2 Received '" + message + "'");
                    try {
                        Thread.sleep(2);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }finally{
                            channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
            //自動回覆佇列應答 -- RabbitMQ中的訊息確認機制
            //false 不自動回覆應答 
            channel.basicConsume(QUEUE_NAME,false, consumer);
    }
}
    這裡為了驗證unak的數目與prefetch的關係,我們消費者1注掉回覆確認訊息的程式碼,啟動消費者1和消費者2以及生產者
    
    
    
    如圖可見,消費者1只處理了10條訊息,消費者2把其他的訊息處理了。合理的利用了有限的資源。
五、總結
    本文主要講述了RabbitMQ與生產者和消費者之間的訊息確認以及消費者手動確認訊息帶來的阻塞問題的解決之道。生產者的訊息確認有事務機制和confirm模式兩種,消費者通過自動回覆ack和手動回覆ack的方式確認,手動ack切記有ack和nack兩種,合理安排使用。消費者手動確認帶來的阻塞問題是由於沒有設定緩衝池的大小,可以通過設定prefetch的大小來限制每個消費者能持有的最大unack的數量,合理的分配資源。
    
--------------------- 
作者:Crayoncxy 
來源:CSDN 
原文:https://blog.csdn.net/chenxyt/article/details/79259838 
版權宣告:本文為博主原創文章,轉載請附上博文連結!