1. 程式人生 > >rabbitmq訊息傳送確認和消費訊息手動刪除訊息

rabbitmq訊息傳送確認和消費訊息手動刪除訊息

0.application.properties新增如下配置

# 訊息傳送至exchange callback
spring.rabbitmq.publisher-confirms=true
# 訊息傳送至queue 失敗才callback
spring.rabbitmq.publisher-returns=true
# 消費訊息手動刪除
spring.rabbitmq.listener.simple.acknowledge-mode=manual

1.自定義RabbitmqConfig

@Configuration
public class RabbitmqConfig {
    public static final String QUEUE = "queue";
    public static final String DIRECT_EXCHANGE = "direct_exchange";
    public static final String ROUTING_KEY = "routing";

    @Bean
    public Queue queue() {
        return new Queue(QUEUE);
    }

    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(DIRECT_EXCHANGE);
    }

    @Bean
    public Binding binding(Queue queue, DirectExchange directExchange) {
        return BindingBuilder.bind(queue).to(directExchange).with(ROUTING_KEY);
    }
}

2.自定義生產者

@Service
public class ProducerService implements ReturnCallback, ConfirmCallback {
    private static final Logger logger = LoggerFactory.getLogger(ProducerService.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    private void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void sendMessage(String routingKey, User user) {
        logger.warn("[Send Message] ===============> " + user.toString());
        rabbitTemplate.convertAndSend(RabbitmqConfig.DIRECT_EXCHANGE, routingKey, user);
    }

    /**
     * 訊息是否到交換機中都有callback
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        logger.warn("[唯一標識]  " + correlationData);
        logger.warn("[結果]  " + ack);
        logger.warn("[失敗原因]  " + cause);
    }

    /**
     * 訊息沒有到佇列會呼叫該回調 (一般訊息傳送失敗, 使用ReturnCallback就足夠啦)
     */
    @Override
    public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText, String exchange, String routingKey) {
        logger.warn("[主體]  " + message);
        logger.warn("[replyCode]  " + replyCode);
        logger.warn("[描述]  " + replyText);
        logger.warn("[exchange]  " + exchange);
        logger.warn("[routingKey]  " + routingKey);
    }
}

3.自定義消費者

@Component
public class ConsumerService {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerService.class);

    @RabbitListener(queues = RabbitmqConfig.QUEUE)
    public void message(User user, Message message, Channel channel) {
        try {
            // 訊息刪除
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            logger.warn("[Consumer Message 01] ===============> " + user.toString());
        } catch (Exception e) {
            throw new RuntimeException("處理訊息失敗");
        }
    }
}

4.測試

@Test
public void testToExchange() {
    User user = new User();
    user.setId(123L);
    user.setUsername("tom");
    producerService.sendMessage(RabbitmqConfig.ROUTING_KEY + "01", user);
}

@Test
public void testToQueue() {
    User user = new User();
    user.setId(456L);
    user.setUsername("jack");
    producerService.sendMessage(RabbitmqConfig.ROUTING_KEY, user);
}

原始碼 https://gitee.com/jsjack_wang/springboot-demo dev-rabbitmq002分支