Rabbitmq接收方訊息確認
阿新 • • 發佈:2020-09-03
所謂的消費方訊息確認就是簽收模式ack,Rabbitmq預設開啟的是自動簽收模式,也就是消費者監聽到訊息到達,就會自動傳送ack給佇列,告訴佇列這條訊息可以刪除了,這種自動簽收的模式存在訊息丟失的可能,出現異常的話這條訊息就丟了,要保證訊息不會丟失,還是建議開啟手動簽收的模式。
一、三種簽收模式
public enum AcknowledgeMode { //自動確認 NONE, //手動確認 MANUAL, //根據情況確認 AUTO; private AcknowledgeMode() { } public boolean isTransactionAllowed() { return this == AUTO || this == MANUAL; } public boolean isAutoAck() { return this == NONE; } public boolean isManual() { return this == MANUAL; } }
二、配置檔案開啟手動簽收模式
spring: rabbitmq: host: 192.168.31.70 port: 5672 username: guest password: guest # 傳送確認 publisher-confirms: true # 路由失敗回撥 publisher-returns: true template: # 必須設定成true 訊息路由失敗通知監聽者,false 將訊息丟棄 mandatory: true #消費端 listener: simple: # 每次從RabbitMQ獲取的訊息數量 prefetch: 1 default-requeue-rejected: false # 每個佇列啟動的消費者數量 concurrency: 1 # 每個佇列最大的消費者數量 max-concurrency: 1 # 簽收模式為手動簽收-那麼需要在程式碼中手動ACK acknowledge-mode: manual
三、消費方手動簽收
@Component @Slf4j public class MessageHandler { /** * 郵件傳送 * @param message * @param channel * @param headers * @throws IOException */ @RabbitListener(queues ="demo.email") @RabbitHandler public void handleEmailMessage(Message message, Channel channel, @Headers Map<String,Object> headers) throws IOException { try { String msg=new String(message.getBody(), CharEncoding.UTF_8); JSONObject jsonObject = JSON.parseObject(msg); jsonObject.put("messageId",headers.get("spring_returned_message_correlation")); log.info("---接受到訊息---{}",jsonObject); //主動異常 int m=1/0; //手動簽收 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (Exception e) { //異常,ture 重新入隊,或者false,進入死信佇列 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true); } } }
四、channel介面的實現類
裡面有三個手動簽收的方法
public class PublisherCallbackChannelImpl implements PublisherCallbackChannel, ConfirmListener, ReturnListener, ShutdownListener {
private static final MessagePropertiesConverter converter = new DefaultMessagePropertiesConverter();
private final Log logger;
private final Channel delegate;
private final ConcurrentMap<String, Listener> listeners;
private final Map<Listener, SortedMap<Long, PendingConfirm>> pendingConfirms;
private final Map<String, PendingConfirm> pendingReturns;
private final SortedMap<Long, Listener> listenerForSeq;
private final ExecutorService executor;
private volatile Consumer<Channel> afterAckCallback;
//......省略
public void basicAck(long deliveryTag, boolean multiple) throws IOException {
this.delegate.basicAck(deliveryTag, multiple);
}
public void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException {
this.delegate.basicNack(deliveryTag, multiple, requeue);
}
public void basicReject(long deliveryTag, boolean requeue) throws IOException {
this.delegate.basicReject(deliveryTag, requeue);
}
//.......省略
}
4.1、三個方法區別
- basicAck 同意簽收 支援批量,設定入參mutiple為true
- basicReject 拒絕簽收,不支援批量,支援是否重新入隊,設定入參requeue為true
- basicNack 拒絕簽收,支援批量,支援是否重新入隊,設定入參requeue為true