1. 程式人生 > 其它 >訊息confirm機制

訊息confirm機制

技術標籤:RabbitMQrabbitmq

我們在訊息生產端的可靠性投遞方案中的方案一,producer就是通過訊息confirm機制來確保訊息能能投遞到MQ,失敗的做重投。

三種confirm方式:

  • 普通confirm模式:每傳送一條訊息後,呼叫waitForConfirms()方法,等待伺服器端confirm。實際上是一種序列confirm了。
  • 批量confirm模式:每傳送一批訊息後,呼叫waitForConfirms()方法,等待伺服器端confirm。
  • 非同步confirm模式:提供一個回撥方法,服務端confirm了一條或者多條訊息後Client端會回撥這個方法。
實現非同步監聽的訊息的confirm:
  1. 在channel上開啟確認模式:channel.confirmSelect()
  2. 在channel上新增監聽:addConfirmListener,監聽成功和失敗的返回結果(實際業務中就可以根據結果對詳細進行重發、記錄日誌等)

測試程式碼:

不需要消費者,直接在生產端宣告佇列,交換機做測試就好

package com.vivo.demo1.confirm;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author:luzaichun
 * @Date:2020/12/20
 * @Time:22:39
 **/
public class Producer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.3.7"); factory.setPort(5672); factory.setVirtualHost("/"); Connection connection =
factory.newConnection(); Channel channel = connection.createChannel(); //開啟確認模式 channel.confirmSelect(); channel.exchangeDeclare("confirm_exchange","topic",false,false,null); channel.queueDeclare("confirm_queue",false,false,false,null); channel.queueBind("confirm_queue","confirm_exchange","topic.#"); channel.basicPublish("confirm_exchange","topic.confirm",null,"測試訊息確認機制".getBytes()); channel.addConfirmListener(new ConfirmListener() { //只要是[交換機]確認收到了訊息就會回撥ConfirmListener的handleAck方法。(即使交換機沒有正確路由到佇列中) @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("ack...訊息投遞成功"); } //只要是[交換機]沒有確認收到了訊息就會回撥ConfirmListener的handleNack方法。 @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("nack。。。訊息投遞失敗,例如佇列滿了"); } }); } }

啟動生產者可以看到控制檯,回撥成功。然後我們觀察控制檯發現訊息已經到達confirm_queue佇列,訊息投遞成功。在這裡插入圖片描述
在這裡插入圖片描述