mq.xml //rabbitmq config spring.rabbitmq.host=ip:port spring.rabbitmq.username= spring.rabbitmq.password= spring.rabbitmq.virtual-host= //發送隊列 send.exchange.name= send.queue.name= //接收 listen.queue.name.system= @Configuration public class AmqpConfig { @Value("${spring.rabbitmq.host}") private String address; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; @Value("${spring.rabbitmq.virtual-host}") private String virtualHost; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses(address); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(virtualHost); connectionFactory.setPublisherConfirms(true); //必須要設置、消息發送確認 return connectionFactory; } /** * 常用spring為singleton單例模式,此處mq消息需將其改為非單例模式 */ @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)//必須是prototype類型 public RabbitTemplate rabbitTemplate() { return new RabbitTemplate(connectionFactory()); } @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); } } //消息發送 @Component public class SystemMqMessageSender { private static final Logger logger = LoggerFactory.getLogger(SystemMqMessageSender.class); @Autowired private AmqpTemplate rabbitTemplate; @Value("${send.exchange.name}") private String exchangeSystem; @Value("${send.queue.name}") private String queueSystem; @Resource private RabbitAdmin rabbitAdmin; public void sendMessage(EventModel eventModel) { String message = JsonUtils.json(eventModel); logger.info("發送消息:{}", message); rabbitTemplate.convertAndSend(exchangeSystem, queueSystem, message); } //聲明持久化隊列,並綁定到exchange上 @Bean public Binding bindingExchangeSystem() { Queue queue = QueueBuilder.durable(queueSystem).build();//隊列持久化 rabbitAdmin.declareQueue(queue);//聲明隊列 DirectExchange exchange = (DirectExchange) ExchangeBuilder.directExchange(exchangeSystem).build(); rabbitAdmin.declareExchange(exchange);//創建路由 Binding binding = BindingBuilder.bind(queue).to(exchange).withQueueName();//綁定路由 rabbitAdmin.declareBinding(binding); return binding; } } //消息接收 @Component @RabbitListener(queues = "${listen.queue.name.system}") public class SystemMessageListener extends BaseListener implements EventModelConsumer,InitializingBean { private static final Logger logger = LoggerFactory.getLogger(SystemMessageListener.class); @Value("${listen.queue.name.system}") private String queueName; @RabbitHandler public void process(String message) {//監聽消息 logger.info("接收到消息:{}", message); processMessage(message, queueName); } public void processMessage(String content, String queueName) { //業務處理 } }
Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code, once RabbitMQ delivers a message to the customer it immediately marks it for deletion. In this case, if you kill a worker we will lose the message it was just processing. We‘ll also lose all the messages that were dispatched to this particular worker but were not yet handled.
But we don‘t want to lose any tasks. If a worker dies, we‘d like the task to be delivered to another worker.
In order to make sure a message is never lost, RabbitMQ supports message acknowledgments. An ack(nowledgement) is sent back by the consumer to tell RabbitMQ that a particular message has been received, processed and that RabbitMQ is free to delete it.
If a consumer dies (its channel is closed, connection is closed, or TCP connection is lost) without sending an ack, RabbitMQ will understand that a message wasn‘t processed fully and will re-queue it. If there are other consumers online at the same time, it will then quickly redeliver it to another consumer. That way you can be sure that no message is lost, even if the workers occasionally die.
There aren‘t any message timeouts; RabbitMQ will redeliver the message when the consumer dies. It‘s fine even if processing a message takes a very, very long time.
Manual message acknowledgments are turned on by default. In previous examples we explicitly turned them off via the autoAck=true flag. It‘s time to set this flag to false and send a proper acknowledgment from the worker, once we‘re done with a task.