訊息confirm機制
阿新 • • 發佈:2020-12-21
我們在訊息生產端的可靠性投遞方案中的方案一,producer就是通過訊息confirm機制來確保訊息能能投遞到MQ,失敗的做重投。
三種confirm方式:
- 普通confirm模式:每傳送一條訊息後,呼叫waitForConfirms()方法,等待伺服器端confirm。實際上是一種序列confirm了。
- 批量confirm模式:每傳送一批訊息後,呼叫waitForConfirms()方法,等待伺服器端confirm。
- 非同步confirm模式:提供一個回撥方法,服務端confirm了一條或者多條訊息後Client端會回撥這個方法。
實現非同步監聽的訊息的confirm:
- 在channel上開啟確認模式:channel.confirmSelect()
- 在channel上新增監聽:addConfirmListener,監聽成功和失敗的返回結果(實際業務中就可以根據結果對詳細進行重發、記錄日誌等)
測試程式碼:
不需要消費者,直接在生產端宣告佇列,交換機做測試就好
package com.vivo.demo1.confirm;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author:luzaichun
* @Date:2020/12/20
* @Time:22:39
**/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.3.7");
factory.setPort(5672);
factory.setVirtualHost("/");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//開啟確認模式
channel.confirmSelect();
channel.exchangeDeclare("confirm_exchange","topic",false,false,null);
channel.queueDeclare("confirm_queue",false,false,false,null);
channel.queueBind("confirm_queue","confirm_exchange","topic.#");
channel.basicPublish("confirm_exchange","topic.confirm",null,"測試訊息確認機制".getBytes());
channel.addConfirmListener(new ConfirmListener() {
//只要是[交換機]確認收到了訊息就會回撥ConfirmListener的handleAck方法。(即使交換機沒有正確路由到佇列中)
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("ack...訊息投遞成功");
}
//只要是[交換機]沒有確認收到了訊息就會回撥ConfirmListener的handleNack方法。
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("nack。。。訊息投遞失敗,例如佇列滿了");
}
});
}
}
啟動生產者可以看到控制檯,回撥成功。然後我們觀察控制檯發現訊息已經到達confirm_queue佇列,訊息投遞成功。