Java搭建RabbitMq訊息中介軟體過程詳解
阿新 • • 發佈:2020-01-07
這篇文章主要介紹了Java搭建RabbitMq訊息中介軟體過程詳解,文中通過示例程式碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
前言
當系統中出現“生產“和“消費“的速度或穩定性等因素不一致的時候,就需要訊息佇列。
名詞
- exchange: 交換機
- routingkey: 路由key
- queue:佇列
控制檯埠:15672
exchange和queue是需要繫結在一起的,然後訊息傳送到exchange再由exchange通過routingkey傳送到對應的佇列中。
使用場景
1.技能訂單3分鐘自動取消,改變狀態
2.直播開始前15分鐘提醒
3.直播狀態自動結束
流程
生產者傳送訊息 —> order_pre_exchange交換機 —> order_per_ttl_delay_queue佇列
—> 時間到期 —> order_delay_exchange交換機 —> order_delay_process_queue佇列 —> 消費者
第一步:在pom檔案中新增
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
第二步:在application.properties檔案中新增
spring.rabbitmq.host=172.xx.xx.xxx spring.rabbitmq.port=5672 spring.rabbitmq.username=rabbit spring.rabbitmq.password=123456 spring.rabbitmq.virtual-host=/ spring.rabbitmq.connection-timeout=15000 spring.rabbitmq.publisher-confirms=true spring.rabbitmq.publisher-returns=true spring.rabbitmq.template.mandatory=true
第三步:配置 OrderQueueConfig
package com.tuohang.platform.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.QueueBuilder; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * rabbitMQ的佇列設定(生產者傳送的訊息,永遠是先進入exchange,再通過路由,轉發到佇列) * * * @author Administrator * @version 1.0 * @Date 2018年9月18日 */ @Configuration public class OrderQueueConfig { /** * 訂單緩衝交換機名稱 */ public final static String ORDER_PRE_EXCHANGE_NAME = "order_pre_exchange"; /** * 傳送到該佇列的message會在一段時間後過期進入到order_delay_process_queue 【佇列裡所有的message都有統一的失效時間】 */ public final static String ORDER_PRE_TTL_DELAY_QUEUE_NAME = "order_pre_ttl_delay_queue"; /** * 訂單的交換機DLX 名字 */ final static String ORDER_DELAY_EXCHANGE_NAME = "order_delay_exchange"; /** * 訂單message時間過期後進入的佇列,也就是訂單實際的消費佇列 */ public final static String ORDER_DELAY_PROCESS_QUEUE_NAME = "order_delay_process_queue"; /** * 訂單在緩衝佇列過期時間(毫秒)30分鐘 */ public final static int ORDER_QUEUE_EXPIRATION = 1800000; /** * 訂單緩衝交換機 * * @return */ @Bean public DirectExchange preOrderExange() { return new DirectExchange(ORDER_PRE_EXCHANGE_NAME); } /** * 建立order_per_ttl_delay_queue佇列,訂單訊息經過緩衝交換機,會進入該佇列 * * @return */ @Bean public Queue delayQueuePerOrderTTLQueue() { return QueueBuilder.durable(ORDER_PRE_TTL_DELAY_QUEUE_NAME) .withArgument("x-dead-letter-exchange",ORDER_DELAY_EXCHANGE_NAME) // DLX .withArgument("x-dead-letter-routing-key",ORDER_DELAY_PROCESS_QUEUE_NAME) // dead letter攜帶的routing key .withArgument("x-message-ttl",ORDER_QUEUE_EXPIRATION) // 設定訂單佇列的過期時間 .build(); } /** * 將order_pre_exchange繫結到order_pre_ttl_delay_queue佇列 * * @param delayQueuePerOrderTTLQueue * @param preOrderExange * @return */ @Bean public Binding queueOrderTTLBinding(Queue delayQueuePerOrderTTLQueue,DirectExchange preOrderExange) { return BindingBuilder.bind(delayQueuePerOrderTTLQueue).to(preOrderExange).with(ORDER_PRE_TTL_DELAY_QUEUE_NAME); } /** * 建立訂單的DLX exchange * * @return */ @Bean public DirectExchange delayOrderExchange() { return new DirectExchange(ORDER_DELAY_EXCHANGE_NAME); } /** * 建立order_delay_process_queue佇列,也就是訂單實際消費佇列 * * @return */ @Bean public Queue delayProcessOrderQueue() { return QueueBuilder.durable(ORDER_DELAY_PROCESS_QUEUE_NAME).build(); } /** * 將DLX繫結到實際消費佇列 * * @param delayProcessOrderQueue * @param delayExchange * @return */ @Bean public Binding dlxOrderBinding(Queue delayProcessOrderQueue,DirectExchange delayOrderExchange) { return BindingBuilder.bind(delayProcessOrderQueue).to(delayOrderExchange).with(ORDER_DELAY_PROCESS_QUEUE_NAME); } /** * 監聽訂單實際消費者佇列order_delay_process_queue * * @param connectionFactory * @param processReceiver * @return */ @Bean public SimpleMessageListenerContainer orderProcessContainer(ConnectionFactory connectionFactory,OrderProcessReceiver processReceiver) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames(ORDER_DELAY_PROCESS_QUEUE_NAME); // 監聽order_delay_process_queue container.setMessageListener(new MessageListenerAdapter(processReceiver)); return container; } }
消費者 OrderProcessReceiver :
package com.tuohang.platform.config; import java.util.Objects; import org.apache.tools.ant.types.resources.selectors.Date; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.stereotype.Component; import com.rabbitmq.client.Channel; /** * 訂單延遲處理消費者 * * * @author Administrator * @version 1.0 * @Date 2018年9月18日 */ @Component public class OrderProcessReceiver implements ChannelAwareMessageListener { private static Logger logger = LoggerFactory.getLogger(OrderProcessReceiver.class); String msg = "The failed message will auto retry after a certain delay"; @Override public void onMessage(Message message,Channel channel) throws Exception { try { processMessage(message); } catch (Exception e) { // 如果發生了異常,則將該訊息重定向到緩衝佇列,會在一定延遲之後自動重做 channel.basicPublish(OrderQueueConfig.ORDER_PRE_EXCHANGE_NAME,OrderQueueConfig.ORDER_PRE_TTL_DELAY_QUEUE_NAME,null,msg.getBytes()); } } /** * 處理訂單訊息,如果訂單未支付,取消訂單(如果當訊息內容為FAIL_MESSAGE的話,則需要丟擲異常) * * @param message * @throws Exception */ public void processMessage(Message message) throws Exception { String realMessage = new String(message.getBody()); logger.info("Received <" + realMessage + ">"); // 取消訂單 if(!Objects.equals(realMessage,msg)) { // SpringKit.getBean(ITestService.class).resetSexById(Long.valueOf(realMessage)); System.out.println("測試111111-----------"+new Date()); System.out.println(message); } } }
或者
/** * 測試 rabbit 消費者 * * * @author Administrator * @version 1.0 * @Date 2018年9月25日 */ @Component @RabbitListener(queues = TestQueueConfig.TEST_DELAY_PROCESS_QUEUE_NAME) public class TestProcessReceiver { private static Logger logger = LoggerFactory.getLogger(TestProcessReceiver.class); String msg = "The failed message will auto retry after a certain delay"; @RabbitHandler public void onMessage(Message message,Channel channel) throws Exception { try { processMessage(message); //告訴伺服器收到這條訊息 已經被我消費了 可以在佇列刪掉;否則訊息伺服器以為這條訊息沒處理掉 後續還會在發 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (Exception e) { // 如果發生了異常,則將該訊息重定向到緩衝佇列,會在一定延遲之後自動重做 channel.basicPublish(TestQueueConfig.TEST_PRE_EXCHANGE_NAME,TestQueueConfig.TEST_PRE_TTL_DELAY_QUEUE_NAME,msg.getBytes()); } } /** * 處理訂單訊息,如果訂單未支付,取消訂單(如果當訊息內容為FAIL_MESSAGE的話,則需要丟擲異常) * * @param message * @throws Exception */ public void processMessage(Message message) throws Exception { String realMessage = new String(message.getBody()); logger.info("Received < " + realMessage + " >"); // 取消訂單 if(!Objects.equals(realMessage,msg)) { System.out.println("測試111111-----------"+new Date()); }else { System.out.println("rabbit else..."); } } }
生產者
/** * 測試rabbitmq * * @return */ @RequestMapping(value = "/testrab") public String testraa() { GenericResult gr = null; try { String name = "test_pre_ttl_delay_queue"; long expiration = 10000;//10s 過期時間 rabbitTemplate.convertAndSend(name,String.valueOf(123456)); // 在單個訊息上設定過期時間 //rabbitTemplate.convertAndSend(name,(Object)String.valueOf(123456),new ExpirationMessagePostProcessor(expiration)); } catch (ServiceException e) { e.printStackTrace(); gr = new GenericResult(StateCode.ERROR,languageMap.get("network_error"),e.getMessage()); } return getWrite(gr); }
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援我們。