rabbitmq簡單收發服務搭建
消息發送、接收簡單代碼示例
mq.xml //rabbitmq config spring.rabbitmq.host=ip:port spring.rabbitmq.username= spring.rabbitmq.password= spring.rabbitmq.virtual-host= //發送隊列 send.exchange.name= send.queue.name= //接收 listen.queue.name.system= @Configuration public class AmqpConfig { @Value("${spring.rabbitmq.host}") private String address; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; @Value("${spring.rabbitmq.virtual-host}") private String virtualHost; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses(address); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(virtualHost); connectionFactory.setPublisherConfirms(true); //必須要設置、消息發送確認 return connectionFactory; } /** * 常用spring為singleton單例模式,此處mq消息需將其改為非單例模式 */ @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)//必須是prototype類型 public RabbitTemplate rabbitTemplate() { return new RabbitTemplate(connectionFactory()); } @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); } } //消息發送 @Component public class SystemMqMessageSender { private static final Logger logger = LoggerFactory.getLogger(SystemMqMessageSender.class); @Autowired private AmqpTemplate rabbitTemplate; @Value("${send.exchange.name}") private String exchangeSystem; @Value("${send.queue.name}") private String queueSystem; @Resource private RabbitAdmin rabbitAdmin; public void sendMessage(EventModel eventModel) { String message = JsonUtils.json(eventModel); logger.info("發送消息:{}", message); rabbitTemplate.convertAndSend(exchangeSystem, queueSystem, message); } //聲明持久化隊列,並綁定到exchange上 @Bean public Binding bindingExchangeSystem() { Queue queue = QueueBuilder.durable(queueSystem).build();//隊列持久化 rabbitAdmin.declareQueue(queue);//聲明隊列 DirectExchange exchange = (DirectExchange) ExchangeBuilder.directExchange(exchangeSystem).build(); rabbitAdmin.declareExchange(exchange);//創建路由 Binding binding = BindingBuilder.bind(queue).to(exchange).withQueueName();//綁定路由 rabbitAdmin.declareBinding(binding); return binding; } } //消息接收 @Component @RabbitListener(queues = "${listen.queue.name.system}") public class SystemMessageListener extends BaseListener implements EventModelConsumer,InitializingBean { private static final Logger logger = LoggerFactory.getLogger(SystemMessageListener.class); @Value("${listen.queue.name.system}") private String queueName; @RabbitHandler public void process(String message) {//監聽消息 logger.info("接收到消息:{}", message); processMessage(message, queueName); } public void processMessage(String content, String queueName) { //業務處理 } }
rabbitmq如何保證高可用呢
答案是消息應答機制,一下是rabbitmq消息應答機制的原文:
Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code, once RabbitMQ delivers a message to the customer it immediately marks it for deletion. In this case, if you kill a worker we will lose the message it was just processing. We‘ll also lose all the messages that were dispatched to this particular worker but were not yet handled.
But we don‘t want to lose any tasks. If a worker dies, we‘d like the task to be delivered to another worker.
In order to make sure a message is never lost, RabbitMQ supports message acknowledgments. An ack(nowledgement) is sent back by the consumer to tell RabbitMQ that a particular message has been received, processed and that RabbitMQ is free to delete it.
If a consumer dies (its channel is closed, connection is closed, or TCP connection is lost) without sending an ack, RabbitMQ will understand that a message wasn‘t processed fully and will re-queue it. If there are other consumers online at the same time, it will then quickly redeliver it to another consumer. That way you can be sure that no message is lost, even if the workers occasionally die.
There aren‘t any message timeouts; RabbitMQ will redeliver the message when the consumer dies. It‘s fine even if processing a message takes a very, very long time.
Manual message acknowledgments are turned on by default. In previous examples we explicitly turned them off via the autoAck=true flag. It‘s time to set this flag to false and send a proper acknowledgment from the worker, once we‘re done with a task.
執行一個任務可能需要花費幾秒鐘,你可能會擔心如果一個消費者在執行任務過程中掛掉了。一旦RabbitMQ將消息分發給了消費者,就會從內存中刪除。在這種情況下,如果正在執行任務的消費者宕機,會丟失正在處理的消息和分發給這個消費者但尚未處理的消息。
但是,我們不想丟失任何任務,如果有一個消費者掛掉了,那麽我們應該將分發給它的任務交付給另一個消費者去處理。
為了確保消息不會丟失,RabbitMQ支持消息應答。消費者發送一個消息應答,告訴RabbitMQ這個消息已經接收並且處理完畢了。RabbitMQ就可以刪除它了。
如果一個消費者掛掉卻沒有發送應答,RabbitMQ會理解為這個消息沒有處理完全,然後交給另一個消費者去重新處理。這樣,你就可以確認即使消費者偶爾掛掉也不會丟失任何消息了。
沒有任何消息超時限制;只有當消費者掛掉時,RabbitMQ才會重新投遞。即使處理一條消息會花費很長的時間。
消息應答是默認打開的。我們通過顯示的設置autoAsk=true關閉這種機制。現即自動應答開,一旦我們完成任務,消費者會自動發送應答。通知RabbitMQ消息已被處理,可以從內存刪除。如果消費者因宕機或鏈接失敗等原因沒有發送ACK(不同於ActiveMQ,在RabbitMQ裏,消息沒有過期的概念),則RabbitMQ會將消息重新發送給其他監聽在隊列的下一個消費者。
rabbitmq簡單收發服務搭建