rabbitmq訊息傳送確認和消費訊息手動刪除訊息
阿新 • • 發佈:2018-11-06
0.application.properties新增如下配置
# 訊息傳送至exchange callback
spring.rabbitmq.publisher-confirms=true
# 訊息傳送至queue 失敗才callback
spring.rabbitmq.publisher-returns=true
# 消費訊息手動刪除
spring.rabbitmq.listener.simple.acknowledge-mode=manual
1.自定義RabbitmqConfig
@Configuration public class RabbitmqConfig { public static final String QUEUE = "queue"; public static final String DIRECT_EXCHANGE = "direct_exchange"; public static final String ROUTING_KEY = "routing"; @Bean public Queue queue() { return new Queue(QUEUE); } @Bean public DirectExchange directExchange() { return new DirectExchange(DIRECT_EXCHANGE); } @Bean public Binding binding(Queue queue, DirectExchange directExchange) { return BindingBuilder.bind(queue).to(directExchange).with(ROUTING_KEY); } }
2.自定義生產者
@Service public class ProducerService implements ReturnCallback, ConfirmCallback { private static final Logger logger = LoggerFactory.getLogger(ProducerService.class); @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct private void init() { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); } public void setRabbitTemplate(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } public void sendMessage(String routingKey, User user) { logger.warn("[Send Message] ===============> " + user.toString()); rabbitTemplate.convertAndSend(RabbitmqConfig.DIRECT_EXCHANGE, routingKey, user); } /** * 訊息是否到交換機中都有callback */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { logger.warn("[唯一標識] " + correlationData); logger.warn("[結果] " + ack); logger.warn("[失敗原因] " + cause); } /** * 訊息沒有到佇列會呼叫該回調 (一般訊息傳送失敗, 使用ReturnCallback就足夠啦) */ @Override public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText, String exchange, String routingKey) { logger.warn("[主體] " + message); logger.warn("[replyCode] " + replyCode); logger.warn("[描述] " + replyText); logger.warn("[exchange] " + exchange); logger.warn("[routingKey] " + routingKey); } }
3.自定義消費者
@Component public class ConsumerService { private static final Logger logger = LoggerFactory.getLogger(ConsumerService.class); @RabbitListener(queues = RabbitmqConfig.QUEUE) public void message(User user, Message message, Channel channel) { try { // 訊息刪除 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); logger.warn("[Consumer Message 01] ===============> " + user.toString()); } catch (Exception e) { throw new RuntimeException("處理訊息失敗"); } } }
4.測試
@Test
public void testToExchange() {
User user = new User();
user.setId(123L);
user.setUsername("tom");
producerService.sendMessage(RabbitmqConfig.ROUTING_KEY + "01", user);
}
@Test
public void testToQueue() {
User user = new User();
user.setId(456L);
user.setUsername("jack");
producerService.sendMessage(RabbitmqConfig.ROUTING_KEY, user);
}
原始碼 https://gitee.com/jsjack_wang/springboot-demo dev-rabbitmq002分支