1. 程式人生 > 程式設計 >RabbitMQ 高階特性-生產端Confirm訊息確認機制

RabbitMQ 高階特性-生產端Confirm訊息確認機制

生產端Confirm訊息確認機制

什麼是訊息確認

訊息確認,是指生產者投遞訊息後,如果Broker收到訊息,則會給我們生產者一個應答。生產者進行接受應答,用來確認訊息這條是否正常傳送到Broker,這種方式也是訊息的可靠性投遞的核心保障!

Confirm訊息確認

如何實現Confirm確認訊息

  1. 在channel上開啟確認模式:channel.confirmSelect();
  2. 在channel上新增監聽: addConfirmListener,監聽成功和失敗返回的結果,根據結果對訊息重傳送或記錄日誌等後續處理。

程式碼實現

producer

package com.wyg.rabbitmq.javaclient.confirm;

import
java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmListener; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * confirm訊息確認機制 * * @author [email protected] * @date 2019-11-22 13:25 * @since
JDK1.8 * @version V1.0 */
public class Producer { private static final String HOST = "localhost"; private static final int PORT = 5672; private static final String USERNAME = "guest"; private static final String PASSWORD = "guest"; private static final String EXCHANGE = "test_confirm_exchange"
; public static void main(String[] args) throws IOException,TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setVirtualHost("/"); factory.setPort(PORT); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE,"topic"); // 開啟confirm確認模式 channel.confirmSelect(); // 新增監聽 channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag,boolean multiple) throws IOException { System.out.println("---訊息投遞成功,deliveryTag:" + deliveryTag); } @Override public void handleNack(long deliveryTag,boolean multiple) throws IOException { System.out.println("---訊息投遞失敗,deliveryTag:" + deliveryTag); } }); String msg1 = "這是一條confirm確認訊息save"; channel.basicPublish(EXCHANGE,"confirm.save",null,msg1.getBytes("UTF-8")); String msg2 = "這是一條confirm確認訊息abc"; channel.basicPublish(EXCHANGE,"confirm.abc",msg2.getBytes("UTF-8")); } // 注意,因為要等待broker的confirm訊息,暫時不關閉channel和connection } 複製程式碼

執行結果

producer執行結果

注意

broker在接受訊息成功後,ConfirmListener進入handleAck;失敗進入handleNack,失敗原因有很多,比如MQ異常,queue容量達到上限,磁碟寫滿了等。

還有極端情況,ack和nack都沒獲取到,比如網路閃斷,這時候需要用定時任務去抓取做後續處理。

Consumer

package com.wyg.rabbitmq.javaclient.confirm;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.*;

/**
 * 訊息確認
 * 
 * @author [email protected]
 * @date 2019-11-22 14:07
 * @since JDK1.8
 * @version V1.0
 */

public class Consumer {

    private static final String HOST = "localhost";
    private static final int PORT = 5672;
    private static final String USERNAME = "guest";
    private static final String PASSWORD = "guest";

    public static void main(String[] args) throws IOException,TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setVirtualHost("/");
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        String queueName = "test_confirm_queue";
        String routingKey = "confirm.#";
        String exchangeName = "test_confirm_exchange";
        // 申明佇列
        channel.queueDeclare(queueName,true,false,null);
        // 佇列繫結到exchange
        channel.queueBind(queueName,exchangeName,routingKey,null);

        DeliverCallback deliverCallback = new DeliverCallback() {
            @Override
            public void handle(String consumerTag,Delivery message) throws IOException {
                try {
                    System.out.println("---消費者:" + new String(message.getBody(),"UTF-8"));
                } finally {
                    // consumer手動 ack 給broker
                    channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
                }
            }
        };
        CancelCallback cancelCallback = new CancelCallback() {
            @Override
            public void handle(String consumerTag) throws IOException {
                System.out.println("---消費者:cancelCallback");
            }
        };

        // 消費訊息
        channel.basicConsume(queueName,deliverCallback,cancelCallback);
    }
}

複製程式碼

執行結果

consumer執行結果