1. 程式人生 > 其它 >RabbitMQ訊息確認機制 RabbitMQ持久化

RabbitMQ訊息確認機制 RabbitMQ持久化

RabbitMQ訊息確認的本質也就是為了解決RabbitMQ訊息丟失問題,因為哪怕我們做了RabbitMQ持久化,其實也並不能保證解決我們的資料丟失問題


RabbitMQ的訊息確認有兩種

  • 第一種是訊息傳送確認。這種是用來確認生產者將訊息傳送給交換器,交換器傳遞給佇列的過程中,訊息是否成功投遞。傳送確認分為兩步,一是確認是否到達交換器,二是確認是否到達佇列。
  • 第二種是消費接收確認。這種是確認消費者是否成功消費了佇列中的訊息。

1.訊息傳送確認(生產者)

當生產者傳送訊息給rabbitmq伺服器時,訊息是否真正的到達了伺服器?為了保證生產者傳送的訊息能夠可靠的傳送到伺服器(即訊息落地),rabbitmq提供了兩種方式:

  • 通過事務實現
  • 通過傳送方確認機制(publisher confirm)實現

事務實現

  • channel.txSelect(): 將當前通道設定成事務模式
  • channel.txCommit(): 用於提交事務
  • channel.txRollback(): 用於回滾事務

通過事務實現機制,只有訊息成功被rabbitmq伺服器接收,事務才能提交成功,否則便可在捕獲異常之後進行回滾,然後進行訊息重發,但是事務非常影響rabbitmq的效能。還有就是事務機制是阻塞的過程,只有等待伺服器迴應之後才會處理下一條訊息

 待更新。。。

2.訊息接收確認(消費者)

訊息接收確認機制,分為訊息自動確認模式和訊息手動確認模式,當訊息確認後,我們佇列中的訊息將會移除

那這兩種模式要如何選擇呢?

  • 如果訊息不太重要,丟失也沒有影響,那麼自動ACK會比較方便。好處就是可以提高吞吐量,缺點就是會丟失訊息
  • 如果訊息非常重要,不容丟失,則建議手動ACK,正常情況都是更建議使用手動ACK。雖然可以解決訊息不會丟失的問題,但是可能會造成消費者過載

訊息自動確認模式的實現

注:自動確認模式,消費者不會判斷消費者是否成功接收到訊息,也就是當我們程式程式碼有問題,我們的訊息都會被自動確認,訊息被自動確認了,我們佇列就會移除該訊息,這就會造成我們的訊息丟失

/**
 * 消費者
 */
public class Recv {
    //設定佇列名稱(已存在的佇列)
    private
static final String QUEUE_NAME = "queue1"; public static void main(String[] args) throws IOException, TimeoutException { //從mq工具類獲取連線資訊 Connection connection = MqConnectionUtils.getConnection(); //獲取一個通道 Channel channel = connection.createChannel(); //監聽該佇列,true代表自動確認 channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] body) throws IOException{ System.out.println("接收到的訊息:"+ new String(body,"UTF-8")); } }); } }

實現效果,消費者會將我們佇列中的訊息全部接收然後確認,並移除佇列

訊息手動確認模式的實現

/**
 * 消費者
 */
public class Recv {
    //設定佇列名稱(已存在的佇列)
    private static final String QUEUE_NAME = "queue1";
    public static void main(String[] args) throws IOException, TimeoutException {
        //從mq工具類獲取連線資訊
        Connection connection = MqConnectionUtils.getConnection();
        //獲取一個通道
        Channel channel = connection.createChannel();
        //監聽該佇列,false代表手動確認
        channel.basicConsume(QUEUE_NAME,false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] body) throws IOException{
                System.out.println("接收到的訊息:"+ new String(body,"UTF-8"));
            }
        });
    }
}

手動確認模式下,當我們消費者成功接收到訊息後,在佇列中訊息會進入Unacked項,也就是待確認模式

所以我們還需要加上下列程式碼,來實現訊息者在成功接收到訊息後,手動確認

#新增紅色欄位

/**
 * 消費者
 */
public class Recv {
    //設定佇列名稱(已存在的佇列)
    private static final String QUEUE_NAME = "queue1";
    public static void main(String[] args) throws IOException, TimeoutException {
        //從mq工具類獲取連線資訊
        Connection connection = MqConnectionUtils.getConnection();
        //獲取一個通道
        Channel channel = connection.createChannel();
        //監聽該佇列
        channel.basicConsume(QUEUE_NAME,false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] body) throws IOException{
                System.out.println("接收到的訊息:"+ new String(body,"UTF-8"));

                //獲取訊息的編號,我們需要根據訊息的編號來確認訊息
                long tag = envelope.getDeliveryTag();
                //獲取當前內部類中的通道
                Channel c = this.getChannel();
                //手動確認訊息,確認以後,則表示訊息已經成功處理,訊息就會從佇列中移除
                c.basicAck(tag,true);            }
        });
    }
}

此時,我們的訊息才會成功被確認,並移除佇列