1. 程式人生 > 實用技巧 >Springboot整合Rabbitmq實現延時佇列之rabbitmq_delayed_message_exchange外掛方式

Springboot整合Rabbitmq實現延時佇列之rabbitmq_delayed_message_exchange外掛方式

很多時候我們想定時去做某件事情的時候我們會首先想到定時任務,quartz是個不錯的選擇,但是也有缺點,假如配置在專案中,叢集部署會有重複執行的問題,如果持久化在mysql中,解決了叢集的問題,但是過於依賴mysql,耦合嚴重,當然還有日誌量龐大、執行時間精度、過於耗費系統資源等等問題。所以這時候使用訊息佇列中介軟體的的延時佇列就是一個很好得解決方案,我們設定要觸發消費的時間和必要的引數入隊mq,到時監聽queue的消費者自然拿到訊息然後去走業務流程,這裡介紹的是基於rabbitmq中介軟體實現的TTL版的延時佇列。

什麼是TTL?
先簡單介紹下rabbitmq執行的流程,和Spring boot整合ActiveMQ不太一樣,除了佇列(queue)之外還引入了交換機(exchange)的概念。
rabbitmq的交換機有4種模式,我不詳細介紹,簡單說下大體執行流程:

①:生產者將訊息(msg)和路由鍵(routekey)傳送指定的交換機(exchange)上
②:交換機(exchange)根據路由鍵(routekey)找到繫結自己的佇列(queue)並把訊息給它
③:佇列(queue)再把訊息傳送給監聽它的消費者(customer)
那麼延時佇列TTL又是什麼呢?這裡引入了一個死信(死亡資訊)的概念,有死信必定有死亡時間,也就是我們希望延時多久的時間:

①:生產者將訊息(msg)和路由鍵(routekey)傳送指定的死信交換機(delayexchange)上
②:死信交換機(delayexchange)根據路由鍵(routekey1)找到繫結自己的死信佇列(delayqueue)並把訊息給它
③:訊息(msg)到期死亡變成死信轉發給死信接收交換機(delayexchange)
④:死信接收交換機(receiveexchange)根據路由鍵(routekey2)找到繫結自己的死信接收佇列(receivequeue)並把訊息給它
⑤:死信接收佇列(receivequeue)再把訊息傳送給監聽它的消費者(customer)
ps:延時佇列也叫死信佇列。基於TTL模式的延時佇列會涉及到2個交換機、2個路由鍵、2個佇列…emmmmm比較麻煩
但是基於TTL的延時佇列存在一個問題,就是同一個佇列裡的訊息延時時間最好一致,比如說佇列裡的延時時間都是1小時,千萬不能佇列裡的訊息延時時間亂七八糟多久的都有,這樣的話先入隊的訊息如果延時時間過長會堵著後入隊延時時間小的訊息,導致後面的訊息到時也無法變成死信轉發出去,很坑!!!
舉個栗子:延時佇列裡先後進入A,B,C三條訊息,存活時間是3h,2h,1h,結果到了1小時C不會死,到了2hB不會死,到了3小時A死了,同時B,C也死了,意味著3h後A,B,C才能消費,很坑!!!
我本來使用時候以為會像redis的存活時間一樣,內部維護一個定時器去掃描死亡時間然後變成死信轉發,結果不是。。。
至於怎麼解決這個問題,一個佇列裡可以放不同死亡時間的訊息,還能夠非同步死亡轉發,請看下面:
TTL方式實現rabbitmq的延時佇列功能,在訊息死亡時間比較靈活複雜的時候我們不可能宣告很多死信佇列去管理,而且宣告一個就要幾個個bean,很蛋疼,所以希望能夠有種方式使其訊息死亡非同步化,到期即死即消費,不會被阻塞,這裡介紹使用外掛的方式,不過需要rabbitmq要是3.6版本以上,也就是說,加入你的rabbitmq版本太老只能用TTL。
基於外掛方式實現流程:
這裡和TTL方式有個很大的不同就是TTL存放訊息在死信佇列(delayqueue)裡,二基於外掛存放訊息在延時交換機裡(x-delayed-message exchange)。 ①:生產者將訊息(msg)和路由鍵(routekey)傳送指定的延時交換機(exchange)上
②:延時交換機(exchange)儲存訊息等待訊息到期根據路由鍵(routekey)找到繫結自己的佇列(queue)並把訊息給它
③:佇列(queue)再把訊息傳送給監聽它的消費者(customer) 外掛可以自行去官網下載:

下載的外掛放到rabbitmq的plugins裡,執行命令安裝外掛:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

流程介紹完了,看下具體程式碼吧!

1.首先pom依賴:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.配置檔案配置rabbitmq的資訊

# rabbitmq
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
# 手動ACK 不開啟自動ACK模式,目的是防止報錯後未正確處理訊息丟失 預設 為 none
spring.rabbitmq.listener.simple.acknowledge-mode=manual

3.編寫rabbitmq配置類,宣告幾個bean

/**
 * rabbitmq配置類
 * 員工系統配置延時佇列
 * @author 47
 * @date 2020/1/7
 */
@Configuration
public class RabbitUserConfig {

    /**
     * 延時佇列交換機
     * 注意這裡的交換機型別:CustomExchange 
     * @return
     */
    @Bean
    public CustomExchange delayExchange(){
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange("delay_exchange","x-delayed-message",true, false,args);
    }

    /**
     * 延時佇列
     * @return
     */
    @Bean
    public Queue delayQueue(){
        return new Queue("delay_queue",true);
    }

    /**
     * 給延時佇列繫結交換機
     * @return
     */
    @Bean
    public Binding cfgDelayBinding(Queue cfgDelayQueue,CustomExchange cfgUserDelayExchange){
        return BindingBuilder.bind(cfgDelayQueue).to(cfgUserDelayExchange).with("delay_key").noargs();
    }
}

4.編寫rabbitmq生產者:

/**
 * rabbitMq生產者類
 * @author 47
 * @date 2020/1/17
 */
@Component
@Slf4j
public class RabbitProduct{

    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void sendDelayMessage(List<Integer> list) {
    	 //這裡的訊息可以是任意物件,無需額外配置,直接傳即可
         log.info("===============延時佇列生產訊息====================");
         log.info("傳送時間:{},傳送內容:{}", LocalDateTime.now(), list.toString());
         this.rabbitTemplate.convertAndSend(
                 "delay_exchange",
                 "delay_key",
                 list,
                 message -> {
                 	 //注意這裡時間可以使long,而且是設定header
                     message.getMessageProperties().setHeader("x-delay",60000);
                     return message;
                 }
         );
     	 log.info("{}ms後執行", 60000);
    }

5.編寫rabbitmq消費者:

/**
 * activeMq消費者類
 * @author 47
 * @date 2020/1/7
 */
@Component
@Slf4j
public class RabbitConsumer {
    @Autowired
    private CcqCustomerCfgService ccqCustomerCfgService;

    /**
     * 預設情況下,如果沒有配置手動ACK, 那麼Spring Data AMQP 會在訊息消費完畢後自動幫我們去ACK
     * 存在問題:如果報錯了,訊息不會丟失,但是會無限迴圈消費,一直報錯,如果開啟了錯誤日誌很容易就吧磁碟空間耗完
     * 解決方案:手動ACK,或者try-catch 然後在 catch 裡面將錯誤的訊息轉移到其它的系列中去
     * spring.rabbitmq.listener.simple.acknowledge-mode = manual
     * @param list 監聽的內容
     */
    @RabbitListener(queues = "delay_queue")
    public void cfgUserReceiveDealy(List<Integer> list, Message message, Channel channel) throws IOException {
        log.info("===============接收佇列接收訊息====================");
        log.info("接收時間:{},接受內容:{}", LocalDateTime.now(), list.toString());
        //通知 MQ 訊息已被接收,可以ACK(從佇列中刪除)了
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        try {
            dosomething.....
        } catch (Exception e) {
            log.error("============消費失敗,嘗試訊息補發再次消費!==============");
            log.error(e.getMessage());
            /**
             * basicRecover方法是進行補發操作,
             * 其中的引數如果為true是把訊息退回到queue但是有可能被其它的consumer(叢集)接收到,
             * 設定為false是隻補發給當前的consumer
             */
            channel.basicRecover(false);
        }
    }
}

6.編寫測試類:

/**
 * @author 47
 * @date 2020/1/7
 */
@RestController
@RequestMapping("/test")
public class TestController {

    @Autowired
    private RabbitProduct rabbitProduct;
    
    @GetMapping("/sendMessage")
    public void sendMessage(){
    	List<Integer> list = new ArrayList<>();
    	list.add(1);
    	list.add(2);
        rabbitProduct.sendDelayMessage(list);
    }
}