1. 程式人生 > 程式設計 >SpringBoot使用RabbitMQ延時佇列(小白必備)

SpringBoot使用RabbitMQ延時佇列(小白必備)

1.什麼是MQ

MQ,是一種跨程序的通訊機制,用於上下游傳遞訊息。

在網際網路架構中,MQ是一種非常常見的上下游“邏輯解耦+物理解耦”的訊息通訊服務。

使用了MQ之後,訊息傳送上游只需要依賴MQ,不用依賴其他服務。

為什麼會產生訊息列隊?

  1. 不同程序(process)之間傳遞訊息時,兩個程序之間耦合程度過高,改動一個程序,引發必須修改另一個程序,為了隔離這兩個程序,在兩程序間抽離出一層(一個模組),所有兩程序之間傳遞的訊息,都必須通過訊息佇列來傳遞,單獨修改某一個程序,不會影響另一個;
  2. 不同程序(process)之間傳遞訊息時,為了實現標準化,將訊息的格式規範化了,並且,某一個程序接受的訊息太多,一下子無法處理完,並且也有先後順序,必須對收到的訊息進行排隊,因此誕生了事實上的訊息佇列;

延時列隊的使用場景?

  1. 訂單業務:在淘寶或者京東購買東西,使用者下單後未付款則30分鐘後取消訂單。
  2. 簡訊通知:手機使用者交完話費後,幾分鐘之內將會收到繳費資訊

2.什麼是RabbitMQ(這裡就做了一下簡單介紹)

RabbitMQ是一種訊息佇列 ,用於常見的程序通訊。支援點對點,請求應答和釋出訂閱模式 並且提供多種語言的支援。常見的java,c#,php都支援。

常被用在非同步處理,應用解耦。流量消鋒等複雜的業務場景中。和java的kafka一樣都屬於訊息中介軟體。

下載地址:

https://www.rabbitmq.com/download.html

進入RabbitMQ官網

1.第一步

在這裡插入圖片描述

第二步

在這裡插入圖片描述

下載好後不要著急安裝RabbitMQ,我們這裡還需要安裝Erlang

下載地址:http://www.erlang.org/download/otp_win64_17.3.exe

安裝步驟

步驟一

在這裡插入圖片描述

步驟二

在這裡插入圖片描述

步驟三

在這裡插入圖片描述

步驟四

在這裡插入圖片描述

安裝完成

現在安裝RabbitMQ

步驟一

在這裡插入圖片描述

步驟二

在這裡插入圖片描述

步驟三

在這裡插入圖片描述

安裝完成

啟動RabbitMQ管理工具

開始選單 — 最新新增 — 展開 — 選中雙擊

在這裡插入圖片描述

輸入命令:rabbitmq-plugins enable rabbitmq_management

效果如果圖

在這裡插入圖片描述

在瀏覽器中輸入地址檢視:http://127.0.0.1:15672/

在這裡插入圖片描述

出現次頁面代表成功,預設使用者和密碼都是guest/ guest

若不出現此頁面,就是安裝失敗了,不要慌,多半問題在系統使用者名稱必須是中文(放心有解決辦法):

Windows下安裝RabbitMQ後,按正常RabbitMQ會自動註冊服務並自動啟動,但是如果有的道友不注意中英文目錄就會出現服務啟動後幾秒鐘自動停止,而且反反覆覆。

出現這種情況一般都是由我們的使用者名稱是中文,而導致預設的DB和log訪問出現問。所以我建議以後大家在使用windows作業系統的時候儘量用英文來命名檔案或目錄,這樣會極大的減小以後安裝軟體出現莫名其妙的問題的bug。

接下來我們先解除安裝我們的RabbitMQ,然後在我們的系統變數裡設定一個RABBITMQ_BASE 的變數路徑為一個不含英文的路徑 比如 E:\rabbit,最後我們重新安裝RabbitMQ即可,然後就會看到RabbitMQ服務自動註冊了,並且不會自動停止。

SpringBoot整合RabbitMQ

1.新增依賴

pom.xml中新增 spring-boot-starter-amqp的依賴

 <!-- spring-boot-starter-amqp的依賴 -->
      <dependency>
   <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>

其他依賴

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
      </dependency>
  
      <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
      </dependency>
  
      <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
        <exclusions>
          <exclusion>
            <groupId>org.junit.vintage</groupId>
            <artifactId>junit-vintage-engine</artifactId>
          </exclusion>
        </exclusions>
      </dependency>
      
      <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
        <scope>test</scope>
      </dependency>

application.yml檔案中配置rabbitmq相關內容

spring:
  rabbitmq:
   host: localhost
   port: 5672
   username: guest
   password: guest

這裡我們環境就搭建起來了

2.具體編碼實現

配置列隊

 package com.example.spring_boot_rabbitmq;
  
  
  
  import lombok.extern.slf4j.Slf4j;
  import org.springframework.amqp.core.*;
  import org.springframework.context.annotation.Bean;
  import org.springframework.context.annotation.Configuration;
  
  import java.util.HashMap;
  import java.util.Map;
  
  /**
   * @author:zq
   * @date: Greated in 2019/12/19 11:46
   * 配置佇列
   */
  
  @Configuration
  @Slf4j
  public class DelayRabbitConfig {
  
    /**
     * 延遲佇列 TTL 名稱
     */
    private static final String ORDER_DELAY_QUEUE = "user.order.delay.queue";
    /**
     * DLX,dead letter傳送到的 exchange
     * 延時訊息就是傳送到該交換機的
     */
    public static final String ORDER_DELAY_EXCHANGE = "user.order.delay.exchange";
    /**
     * routing key 名稱
     * 具體訊息傳送在該 routingKey 的
     */
    public static final String ORDER_DELAY_ROUTING_KEY = "order_delay";
  
    public static final String ORDER_QUEUE_NAME = "user.order.queue";
    public static final String ORDER_EXCHANGE_NAME = "user.order.exchange";
    public static final String ORDER_ROUTING_KEY = "order";
  
    /**
     * 延遲佇列配置
     * <p>
     * 1、params.put("x-message-ttl",5 * 1000);
     * 第一種方式是直接設定 Queue 延遲時間 但如果直接給佇列設定過期時間,這種做法不是很靈活,(當然二者是相容的,預設是時間小的優先)
     * 2、rabbitTemplate.convertAndSend(book,message -> {
     * message.getMessageProperties().setExpiration(2 * 1000 + "");
     * return message;
     * });
     * 第二種就是每次傳送訊息動態設定延遲時間,這樣我們可以靈活控制
     **/
    @Bean
    public Queue delayOrderQueue() {
      Map<String,Object> params = new HashMap<>();
      // x-dead-letter-exchange 聲明瞭佇列裡的死信轉發到的DLX名稱,
      params.put("x-dead-letter-exchange",ORDER_EXCHANGE_NAME);
      // x-dead-letter-routing-key 聲明瞭這些死信在轉發時攜帶的 routing-key 名稱。
      params.put("x-dead-letter-routing-key",ORDER_ROUTING_KEY);
      return new Queue(ORDER_DELAY_QUEUE,true,false,params);
    }
    /**
     * 需要將一個佇列繫結到交換機上,要求該訊息與一個特定的路由鍵完全匹配。
     * 這是一個完整的匹配。如果一個佇列繫結到該交換機上要求路由鍵 “dog”,則只有被標記為“dog”的訊息才被轉發,
     * 不會轉發dog.puppy,也不會轉發dog.guard,只會轉發dog。
     * @return DirectExchange
     */
    @Bean
    public DirectExchange orderDelayExchange() {
      return new DirectExchange(ORDER_DELAY_EXCHANGE);
    }
    @Bean
    public Binding dlxBinding() {
      return BindingBuilder.bind(delayOrderQueue()).to(orderDelayExchange()).with(ORDER_DELAY_ROUTING_KEY);
    }
  
    @Bean
    public Queue orderQueue() {
      return new Queue(ORDER_QUEUE_NAME,true);
    }
    /**
     * 將路由鍵和某模式進行匹配。此時佇列需要繫結要一個模式上。
     * 符號“#”匹配一個或多個詞,符號“*”匹配不多不少一個詞。因此“audit.#”能夠匹配到“audit.irs.corporate”,但是“audit.*” 只會匹配到“audit.irs”。
     **/
    @Bean
    public TopicExchange orderTopicExchange() {
      return new TopicExchange(ORDER_EXCHANGE_NAME);
    }
  
    @Bean
    public Binding orderBinding() {
      // TODO 如果要讓延遲佇列之間有關聯,這裡的 routingKey 和 繫結的交換機很關鍵
      return BindingBuilder.bind(orderQueue()).to(orderTopicExchange()).with(ORDER_ROUTING_KEY);
    }
  
  }

建立一個Order實體類

package com.example.spring_boot_rabbitmq.pojo;
 
 import lombok.Data;
 
 import java.io.Serializable;
 
 /**
  * @author:zq
  * @date: Greated in 2019/12/19 11:49
  */
 @Data
 public class Order implements Serializable {
   private static final long serialVersionUID = -2221214252163879885L;
 
   private String orderId; // 訂單id
 
   private Integer orderStatus; // 訂單狀態 0:未支付,1:已支付,2:訂單已取消
 
   private String orderName; // 訂單名字
 
 }

接收者

package com.example.spring_boot_rabbitmq;
 
 import com.example.spring_boot_rabbitmq.pojo.Order;
 import com.rabbitmq.client.Channel;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.amqp.core.Message;
 import org.springframework.amqp.rabbit.annotation.RabbitListener;
 import org.springframework.stereotype.Component;
 
 import java.util.Date;
 
 /**
  * @author:zq
  * @date: Greated in 2019/12/19 11:53
  * 接收者
  */
 
 @Component
 @Slf4j
 public class DelayReceiver {
   @RabbitListener(queues = {DelayRabbitConfig.ORDER_QUEUE_NAME})
   public void orderDelayQueue(Order order,Message message,Channel channel) {
     log.info("###########################################");
     log.info("【orderDelayQueue 監聽的訊息】 - 【消費時間】 - [{}]- 【訂單內容】 - [{}]",new Date(),order.toString());
     if(order.getOrderStatus() == 0) {
       order.setOrderStatus(2);
       log.info("【該訂單未支付,取消訂單】" + order.toString());
     } else if(order.getOrderStatus() == 1) {
       log.info("【該訂單已完成支付】");
     } else if(order.getOrderStatus() == 2) {
       log.info("【該訂單已取消】");
     }
     log.info("###########################################");
   }
 
 }

傳送者

 package com.example.spring_boot_rabbitmq;
 
 
 import com.example.spring_boot_rabbitmq.pojo.Order;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.amqp.core.AmqpTemplate;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import java.util.Date;
 
 /**
 * @author:zq
 * @date: Greated in 2019/12/19 11:55
 * 傳送者
 */
 @Component
 @Slf4j
 public class DelaySender {
   @Autowired
   private AmqpTemplate amqpTemplate;
 
   public void sendDelay(Order order) {
     log.info("【訂單生成時間】" + new Date().toString() +"【1分鐘後檢查訂單是否已經支付】" + order.toString() );
     this.amqpTemplate.convertAndSend(DelayRabbitConfig.ORDER_DELAY_EXCHANGE,DelayRabbitConfig.ORDER_DELAY_ROUTING_KEY,order,message -> {
       // 如果配置了 params.put("x-message-ttl",5 * 1000); 那麼這一句也可以省略,具體根據業務需要是宣告 Queue 的時候就指定好延遲時間還是在傳送自己控制時間
       message.getMessageProperties().setExpiration(1 * 1000 * 60 + "");
       return message;
     });
   }
 
 }

測試,訪問http://localhost:8080/sendDelay檢視日誌輸出

package com.example.spring_boot_rabbitmq;

import com.example.spring_boot_rabbitmq.pojo.Order;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;


/**
 * @author:zq
 * @date: Greated in 2019/12/19 11:57
 * 測試
 */

@RestController
public class TestController {
  @Autowired
  private DelaySender delaySender;

  @GetMapping("/sendDelay")
  public Object sendDelay() {
    Order order1 = new Order();
    order1.setOrderStatus(0);
    order1.setOrderId("123456");
    order1.setOrderName("小米6");

    Order order2 = new Order();
    order2.setOrderStatus(1);
    order2.setOrderId("456789");
    order2.setOrderName("小米8");

    delaySender.sendDelay(order1);
    delaySender.sendDelay(order2);
    return "ok";
  }

}

輸出

在這裡插入圖片描述

到此已經SpringBoot使用RabbitMQ延時佇列已經完成,希望對你有所幫助,若有地方不理解或者有更好的辦法請留言,謝謝。

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援我們。