1. 程式人生 > 其它 >【rabbitmq】單獨配置某一個消費者手動ack,其他消費者自動ack

【rabbitmq】單獨配置某一個消費者手動ack,其他消費者自動ack

前言:博主才疏學淺,此方案僅供參考,如有更優方案請大佬評論區告知,十分感謝✿✿ヽ(°▽°)ノ✿

問題背景:同一個服務中存在多個不同業務的rabbitmq的消費者,其中一個推送業務的消費者需要加死信佇列作為推送失敗補償機制,並且需要根據推送成功與否來判斷該訊息是否進死信佇列,這就需要手動ACK控制,但由於專案配置檔案中配置了retry,所以預設是全域性自動ack。如果只是在該推送消費者中寫手動ack,其他消費者不做改動,會導致其他消費者因沒有ack而訊息堵塞。

spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.max-attempts=2
spring.rabbitmq.listener.simple.retry.initial-interval=3000ms

 

 

 

 

處理方案:

不同消費者使用不同配置 SimpleRabbitListenerContainerFactory

@Configuration
public class ConsumerConfig {
@Bean
@ConditionalOnClass
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactoryManual(CachingConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
@Bean
@ConditionalOnClass
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(CachingConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
return factory;
}
}


/**
* 需要手動ack的佇列消費
*/
@RabbitListener(containerFactory = "rabbitListenerContainerFactoryManual",queues ="send.subscribe",concurrency="${spring.rabbitmq.listener.custom.concurrency}")
public void handleSendSubscribe(Channel channel, Message message, SendSubscribeMsg msg)  throws IOException {
  //推送
boolean sendResult = this.sendHandler.prepareSend(msg);
  //根據推送結果判斷是否進死信佇列
if (!sendResult) {
log.warn("send to dead letter .");
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
}else{
log.info("success send");
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

}
}


/**
*需要自動ack的佇列消費
*/
  @RabbitListener(containerFactory = "rabbitListenerContainerFactory",queues ="sys.property.post",concurrency="${spring.rabbitmq.listener.custom.concurrency}")
  public void handleDevicePropsPost(DevicePropertiesPostMsg attr) {
//業務邏輯處理
    ......

}