1. 程式人生 > 其它 >穀粒商城高階篇—RabbitMQ

穀粒商城高階篇—RabbitMQ

延時佇列(實現定時任務)

未付款訂單,超時自動取消並釋放佔有。

常用解決方案:定時任務輪詢

缺點:消耗記憶體,增加資料庫壓力,時間誤差大

解決:RabbitMQ 訊息TTL和死信Exchange結合

  • 訊息TTL:訊息存活時間,RabbitMQ可以對佇列和訊息分別設定TTL,同時設定取小的。

  • 下列條件,訊息會進入死信路由

    1. 訊息被Consumer拒收,並reject方法的引數requeue是false。
    2. 訊息TTL到了,訊息過期
    3. 佇列的長度限制滿了,前面的訊息會被丟棄或扔到死信路由
  • Dead letter Exchange其實是一種普通的exchange,和建立其他exchange沒有兩樣,只是

控制訊息在一段時間變成死信,控制死信的訊息被路由到某個指定交換機,實驗延時佇列。

延時佇列實現-1

延時佇列實現-2

推薦方式一給佇列設定過期時間,因為RabbitMQ是惰性檢查。

SpringBoot整合RabbitMQ

  1. 引入依賴

    <!--RabbitMQ-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
  2. 自動配置了 RabbitTemplate、AmqpAdmin、CachingConnectionFactory、RabbitMessagingTemplate

    #配置RabbitMQ
    spring.rabbitmq.host=127.0.0.1
    spring.rabbitmq.port=5672
    spring.rabbitmq.virtual-host=/
    

    測試 amqpAdmin、rabbitTemplate

    @Autowired
    AmqpAdmin amqpAdmin;
    
    @Autowired
    RabbitTemplate rabbitTemplate;
    
    /* *
    * 如何建立Exchange、Queue、Binding
    * 如何收發訊息
    */
    @Test
    public void createExchange() {
        //Exchange
        amqpAdmin.declareExchange(new DirectExchange("hello-java-exchange",true,false));
        log.info("Exchange[{}]建立成功","hello-java-exchange");
    }
    @Test
    public void createQueue(){
        amqpAdmin.declareQueue(new Queue("hello-java-queue",true,false,false));
        log.info("Queue[{}]建立成功","hello-java-queue");
    }
    @Test
    public void createBinding(){
        amqpAdmin.declareBinding(new Binding("hello-java-queue",
                                             Binding.DestinationType.QUEUE,
                                             "hello-java-exchange",
                                             "hello.java",
                                             null));
        log.info("Binding[{}]建立成功","hello-java-binding");
    }
    @Test
    public void sendMessageTest(){
        OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
        reasonEntity.setId(1L);
        reasonEntity.setCreateTime(new Date());
        reasonEntity.setName("哈哈");
        // 如果傳送訊息是個物件,會使用序列化機制,將物件寫出去,物件必須實現Serializable
        String msg = "hello world!";
        rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",reasonEntity);
        log.info("訊息傳送完成{}",reasonEntity);
    }
    
  3. @EnableRabbit

  4. 監聽訊息 @RabbitListener

     @RabbitListener(queues = {"hello-java-queue"})
        public void recieverMessage(Message message,
                                    OrderReturnReasonEntity content,
                                    Channel channel){
            //訊息體
            byte[] body = message.getBody();
            //訊息頭
            MessageProperties messageProperties = message.getMessageProperties();
            System.out.println("接收到訊息。。。內容:"+message+"==>型別:"+message.getClass());
        }
    
    • @RabbitListener 類+方法上
    • @RabbitHandler 標在方法上(過載區分不同的訊息)

Queue可以很多人監聽,同一個訊息只能一個客戶端收到

只要一個訊息完全處理完,方法執行結束,可以接收下一個訊息

RabbitMQ訊息確認機制-可靠抵達

保證可靠抵達,可以使用事務訊息,效能下降250倍,為此引入確認機制。

  • publisher confirmCallBack 確認模式

    #開啟發送端確認
    #spring.rabbitmq.publisher-confirms=true 已被棄用
    # NONE CORRELATED SIMPLE
    spring.rabbitmq.publisher-confirm-type=correlated
    
    //定製RabbitTemplate
    @PostConstruct  //MyRabbitConfig物件建立完成以後,執行這個方法
    public void initRabbitTemplate(){
        //設定確認回撥
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){
    
            /* *
                 * @param correlationData 當前訊息的唯一關聯資料
                 * @param ack 訊息是否成功收到
                 * @param cause 失敗的原因
                 */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("confirm...correlationData["+correlationData+"]==>ack["+ack+"]==>cause["+cause+"]");
            }
        });
    }
    
  • publischer returnCallBack 未投遞到 queue退回模式

    #開啟發送端訊息抵達確認
    spring.rabbitmq.publisher-returns=true
    #只要抵達佇列,以非同步方式優先回調我們這個returnconfirm
    spring.rabbitmq.template.mandatory=true
    
    //rabbitTemplate.setReturnsCallback();已被棄用
    rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
        @Override
        public void returnedMessage(ReturnedMessage returnedMessage) {
            System.out.println(returnedMessage);
        }
    });
    
  • consumer ack機制(保證每個訊息被正確消費,此時broker刪除這個訊息)

    1、預設自動確認,只要訊息接收到,客戶端自動確認,服務端就會移除這個訊息

    ​ 問題:收到多個訊息,自動回覆ack,只有一個訊息處理成功,宕機,訊息丟失

    ​ 解決:消費者手動確認

    #手動ack訊息
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    

    2、如何簽收?

    @RabbitHandler
    public void recieverMessage(Message message,
                                OrderReturnReasonEntity content,
                                Channel channel){
       	//...訊息處理完成
        //channel內按順序自增的
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
       	// 簽收,非批量模式
        try{
            if(deliveryTag%2 == 0){
                //簽收貨物
                channel.basicAck(deliveryTag,false);
            } else {
                //退貨
                channel.basicNack(deliveryTag,false,false);
                //channel.basicReject(); 同上,只是不可批量
             //沒有簽收貨物   
            }
        } catch (Exception e) {
            //網路中斷
        }
    }
    

延時佇列關閉訂單模擬

  1. 建立Exchange Queue Binding

    @Configuration
    public class MyMQConfig {
    
        //死信佇列
        @Bean
        public Queue orderDelayQueue(){
            /*
                    Queue(String name,  佇列名字
                    boolean durable,  是否持久化
                    boolean exclusive,  是否排他
                    boolean autoDelete, 是否自動刪除
                    Map<String, Object> arguments) 屬性
                 */
            HashMap<String, Object> arguments = new HashMap<>();
            arguments.put("x-dead-letter-exchange", "order-event-exchange");
            arguments.put("x-dead-letter-routing-key", "order.release.order");
            arguments.put("x-message-ttl", 60000); // 訊息過期時間 1分鐘
            Queue queue = new Queue("order.delay.queue", true, false, false, arguments);
    
            return queue;
        }
    
        //普通佇列
        @Bean
        public Queue orderReleaseOrderQueue(){
            Queue queue = new Queue("order.release.order.queue", true, false, false);
    
            return queue;
        }
    
        //TopicExchange
        @Bean
        public Exchange orderEventExchange(){
            /*
                 *   String name,
                 *   boolean durable,
                 *   boolean autoDelete,
                 *   Map<String, Object> arguments
                 * */
            return new TopicExchange("order-event-exchange", true, false);
    
        }
    
        @Bean
        public Binding orderCreateOrderBinding(){
            /*
                 * String destination, 目的地(佇列名或者交換機名字)
                 * DestinationType destinationType, 目的地型別(Queue、Exhcange)
                 * String exchange,
                 * String routingKey,
                 * Map<String, Object> arguments
                 * */
            return new Binding("order.delay.queue",
                               Binding.DestinationType.QUEUE,
                               "order-event-exchange",
                               "order.create.order",
                               null);
    
        }
    
        @Bean
        public Binding orderReleaseOrderBinding(){
            return new Binding("order.release.order.queue",
                               Binding.DestinationType.QUEUE,
                               "order-event-exchange",
                               "order.release.order",
                               null);
        }
    
    }
    

    建立訂單

    @GetMapping("/test/createOrder")
    public String creatOrderTest(){
        OrderEntity entity = new OrderEntity();
        entity.setOrderSn(UUID.randomUUID().toString());
        entity.setModifyTime(new Date());
    
        rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",entity);
        return "ok";
    }
    

    消費者,接收過期訂單

    @RabbitListener(queues = "order.release.order.queue")
    public void listener(OrderEntity entity, Channel channel, Message message) throws IOException {
        System.out.println("收到過期的訂單資訊:準備關閉訂單"+entity.getOrderSn());
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
    

如何保證訊息可靠性

丟失

  • 網路問題,訊息沒到伺服器

    1. try-catch
    2. 日誌記錄
    3. 定期傳送
  • 訊息抵達broker,未持久化宕機

    ​ publisher也加入確認回撥機制

  • 自動ACK狀態,消費者收到沒來及處理宕機

    ​ 手動ACK

重複

  • 消費成功,事務提交,ack時機器宕機,Broker訊息重新變為ready

  • 消費失敗,重試機制 允許

    業務邏輯設計冪等性

    防重表

    訊息redelivered屬性

積壓

  • 消費者宕機

  • 消費能力不足

  • 傳送流量太大

    上線更多消費者

    上線專門的佇列消費服務,批量取出訊息,記錄資料庫,離線慢慢處理