穀粒商城高階篇—RabbitMQ
延時佇列(實現定時任務)
未付款訂單,超時自動取消並釋放佔有。
常用解決方案:定時任務輪詢
缺點:消耗記憶體,增加資料庫壓力,時間誤差大
解決:RabbitMQ 訊息TTL和死信Exchange結合
-
訊息TTL:訊息存活時間,RabbitMQ可以對佇列和訊息分別設定TTL,同時設定取小的。
-
下列條件,訊息會進入死信路由
- 訊息被Consumer拒收,並reject方法的引數requeue是false。
- 訊息TTL到了,訊息過期
- 佇列的長度限制滿了,前面的訊息會被丟棄或扔到死信路由
-
Dead letter Exchange其實是一種普通的exchange,和建立其他exchange沒有兩樣,只是
控制訊息在一段時間變成死信,控制死信的訊息被路由到某個指定交換機,實驗延時佇列。
延時佇列實現-1
延時佇列實現-2
推薦方式一給佇列設定過期時間,因為RabbitMQ是惰性檢查。
SpringBoot整合RabbitMQ
-
引入依賴
<!--RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
-
自動配置了 RabbitTemplate、AmqpAdmin、CachingConnectionFactory、RabbitMessagingTemplate
#配置RabbitMQ spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.virtual-host=/
測試 amqpAdmin、rabbitTemplate
@Autowired AmqpAdmin amqpAdmin; @Autowired RabbitTemplate rabbitTemplate; /* * * 如何建立Exchange、Queue、Binding * 如何收發訊息 */ @Test public void createExchange() { //Exchange amqpAdmin.declareExchange(new DirectExchange("hello-java-exchange",true,false)); log.info("Exchange[{}]建立成功","hello-java-exchange"); } @Test public void createQueue(){ amqpAdmin.declareQueue(new Queue("hello-java-queue",true,false,false)); log.info("Queue[{}]建立成功","hello-java-queue"); } @Test public void createBinding(){ amqpAdmin.declareBinding(new Binding("hello-java-queue", Binding.DestinationType.QUEUE, "hello-java-exchange", "hello.java", null)); log.info("Binding[{}]建立成功","hello-java-binding"); } @Test public void sendMessageTest(){ OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity(); reasonEntity.setId(1L); reasonEntity.setCreateTime(new Date()); reasonEntity.setName("哈哈"); // 如果傳送訊息是個物件,會使用序列化機制,將物件寫出去,物件必須實現Serializable String msg = "hello world!"; rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",reasonEntity); log.info("訊息傳送完成{}",reasonEntity); }
-
@EnableRabbit
-
監聽訊息 @RabbitListener
@RabbitListener(queues = {"hello-java-queue"}) public void recieverMessage(Message message, OrderReturnReasonEntity content, Channel channel){ //訊息體 byte[] body = message.getBody(); //訊息頭 MessageProperties messageProperties = message.getMessageProperties(); System.out.println("接收到訊息。。。內容:"+message+"==>型別:"+message.getClass()); }
- @RabbitListener 類+方法上
- @RabbitHandler 標在方法上(過載區分不同的訊息)
Queue可以很多人監聽,同一個訊息只能一個客戶端收到
只要一個訊息完全處理完,方法執行結束,可以接收下一個訊息
RabbitMQ訊息確認機制-可靠抵達
保證可靠抵達,可以使用事務訊息,效能下降250倍,為此引入確認機制。
-
publisher confirmCallBack 確認模式
#開啟發送端確認 #spring.rabbitmq.publisher-confirms=true 已被棄用 # NONE CORRELATED SIMPLE spring.rabbitmq.publisher-confirm-type=correlated
//定製RabbitTemplate @PostConstruct //MyRabbitConfig物件建立完成以後,執行這個方法 public void initRabbitTemplate(){ //設定確認回撥 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){ /* * * @param correlationData 當前訊息的唯一關聯資料 * @param ack 訊息是否成功收到 * @param cause 失敗的原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("confirm...correlationData["+correlationData+"]==>ack["+ack+"]==>cause["+cause+"]"); } }); }
-
publischer returnCallBack 未投遞到 queue退回模式
#開啟發送端訊息抵達確認 spring.rabbitmq.publisher-returns=true #只要抵達佇列,以非同步方式優先回調我們這個returnconfirm spring.rabbitmq.template.mandatory=true
//rabbitTemplate.setReturnsCallback();已被棄用 rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returnedMessage) { System.out.println(returnedMessage); } });
-
consumer ack機制(保證每個訊息被正確消費,此時broker刪除這個訊息)
1、預設自動確認,只要訊息接收到,客戶端自動確認,服務端就會移除這個訊息
問題:收到多個訊息,自動回覆ack,只有一個訊息處理成功,宕機,訊息丟失
解決:消費者手動確認
#手動ack訊息 spring.rabbitmq.listener.simple.acknowledge-mode=manual
2、如何簽收?
@RabbitHandler public void recieverMessage(Message message, OrderReturnReasonEntity content, Channel channel){ //...訊息處理完成 //channel內按順序自增的 long deliveryTag = message.getMessageProperties().getDeliveryTag(); // 簽收,非批量模式 try{ if(deliveryTag%2 == 0){ //簽收貨物 channel.basicAck(deliveryTag,false); } else { //退貨 channel.basicNack(deliveryTag,false,false); //channel.basicReject(); 同上,只是不可批量 //沒有簽收貨物 } } catch (Exception e) { //網路中斷 } }
延時佇列關閉訂單模擬
-
建立Exchange Queue Binding
@Configuration public class MyMQConfig { //死信佇列 @Bean public Queue orderDelayQueue(){ /* Queue(String name, 佇列名字 boolean durable, 是否持久化 boolean exclusive, 是否排他 boolean autoDelete, 是否自動刪除 Map<String, Object> arguments) 屬性 */ HashMap<String, Object> arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", "order-event-exchange"); arguments.put("x-dead-letter-routing-key", "order.release.order"); arguments.put("x-message-ttl", 60000); // 訊息過期時間 1分鐘 Queue queue = new Queue("order.delay.queue", true, false, false, arguments); return queue; } //普通佇列 @Bean public Queue orderReleaseOrderQueue(){ Queue queue = new Queue("order.release.order.queue", true, false, false); return queue; } //TopicExchange @Bean public Exchange orderEventExchange(){ /* * String name, * boolean durable, * boolean autoDelete, * Map<String, Object> arguments * */ return new TopicExchange("order-event-exchange", true, false); } @Bean public Binding orderCreateOrderBinding(){ /* * String destination, 目的地(佇列名或者交換機名字) * DestinationType destinationType, 目的地型別(Queue、Exhcange) * String exchange, * String routingKey, * Map<String, Object> arguments * */ return new Binding("order.delay.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.create.order", null); } @Bean public Binding orderReleaseOrderBinding(){ return new Binding("order.release.order.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.release.order", null); } }
建立訂單
@GetMapping("/test/createOrder") public String creatOrderTest(){ OrderEntity entity = new OrderEntity(); entity.setOrderSn(UUID.randomUUID().toString()); entity.setModifyTime(new Date()); rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",entity); return "ok"; }
消費者,接收過期訂單
@RabbitListener(queues = "order.release.order.queue") public void listener(OrderEntity entity, Channel channel, Message message) throws IOException { System.out.println("收到過期的訂單資訊:準備關閉訂單"+entity.getOrderSn()); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); }
如何保證訊息可靠性
丟失
-
網路問題,訊息沒到伺服器
- try-catch
- 日誌記錄
- 定期傳送
-
訊息抵達broker,未持久化宕機
publisher也加入確認回撥機制
-
自動ACK狀態,消費者收到沒來及處理宕機
手動ACK
重複
-
消費成功,事務提交,ack時機器宕機,Broker訊息重新變為ready
-
消費失敗,重試機制 允許
業務邏輯設計冪等性
防重表
訊息redelivered屬性
積壓
-
消費者宕機
-
消費能力不足
-
傳送流量太大
上線更多消費者
上線專門的佇列消費服務,批量取出訊息,記錄資料庫,離線慢慢處理