1. 程式人生 > >RabbitMQ高階特性-Confirm確認訊息

RabbitMQ高階特性-Confirm確認訊息

Confirm確認訊息

  • 訊息的確認, 是指生產者投遞訊息後, 如果Broker收到訊息, 則會給我們產生一個應答

  • 生產者進行接收應答, 用來確定這條訊息是否正常傳送到Broker, 這種方式也是訊息的可靠性投遞的核心保障
    Confirm確認訊息

如何實現Confirm確認訊息

  1. 在channel上開啟確認模式 : channel.confirmSelect()
  2. 在channel上新增監聽 : addConfirmListener, 監聽成功和失敗的返回結果, 根據具體的結果對訊息進行重新發送, 或記錄日誌等後續處理

Producer程式碼

package
com.qiyexue.confirm; import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmListener; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * Confirm確認訊息模式生產者 * * @author 七夜雪 * @create 2018-12-15 16:12 */ public class ProducerByConfirm { public
static void main(String[] args) throws Exception { // 1. 建立ConnectionFactory, 並設定屬性 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.72.138"); factory.setPort(5672); factory.setVirtualHost("/"); // 2. 建立連線 Connection connection =
factory.newConnection(); // 3. 建立channel Channel channel = connection.createChannel(); // 4. 開啟Confirm模式 channel.confirmSelect(); String exchangeName = "test_confirm_exchange"; String routingKey = "confirm.qiye"; // 5. 傳送訊息 String msg = "Send Msg By Confirm ..."; channel.basicPublish(exchangeName, routingKey, null, msg.getBytes()); // 6. 設定監聽 channel.addConfirmListener(new ConfirmListener() { public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("-------ACK Success--------"); } public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.err.println("-------ACK Failed--------"); } }); // 因為設定了監聽, 這裡就不關閉channel和connection了 } }

Consumer程式碼

package com.qiyexue.confirm;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

/**
 * Confirm訊息確認的生產者
 *
 * @author 七夜雪
 * @create 2018-12-15 16:19
 */
public class ConsumerByConfirm {

    public static void main(String[] args) throws Exception {
        // 1. 建立連線工廠並設定屬性
        ConnectionFactory factory = new ConnectionFactory();;
        factory.setHost("192.168.72.138");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        // 2. 建立連線
        Connection connection = factory.newConnection();

        // 3. 建立channel
        Channel channel = connection.createChannel();

        // 4. 宣告Exchange
        String exchangeName = "test_confirm_exchange";
        String exchangeType = "topic";
        String routingKey = "confirm.*";
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, null);

        // 5. 宣告訊息佇列
        String queueName = "test_confirm_queue";
        channel.queueDeclare(queueName, true, false, false, null);

        // 6. 繫結佇列和Exchange
        channel.queueBind(queueName, exchangeName, routingKey);

        // 7. 建立一個消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);

        // 8. 設定消費者從哪個佇列開始消費, 設定自動ACK
        channel.basicConsume(queueName, true, consumer);

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println(msg);
        }

    }

}