1. 程式人生 > 其它 >HM-SpringCloud微服務系列12.3【惰性佇列】

HM-SpringCloud微服務系列12.3【惰性佇列】

1. 訊息堆積問題

當生產者傳送訊息的速度超過了消費者處理訊息的速度,就會導致佇列中的訊息堆積,直到佇列儲存訊息達到上限。之後傳送的訊息就會成為死信,可能會被丟棄,這就是訊息堆積問題。

解決訊息堆積有3種思路:

  • 從消費者角度

    • 增加更多消費者,提高消費速度。也就是我們之前說的work queue模式
    • 在消費者內開啟執行緒池加快訊息處理速度
  • 從佇列角度

    • 擴大佇列容積,提高堆積上限

要提升佇列容積,把訊息儲存在記憶體中顯然是不行的。

2. 惰性佇列

從RabbitMQ的3.6.0版本開始,就增加了Lazy Queues的概念,也就是惰性佇列。

惰性佇列的特徵如下:

  • 接收到訊息後直接存入磁碟而非記憶體
  • 消費者要消費訊息時才會從磁碟中讀取並載入到記憶體
  • 支援數百萬條的訊息儲存

2.1 基於命令列設定lazy-queue

而要設定一個佇列為惰性佇列,只需要在宣告佇列時,指定x-queue-mode屬性為lazy即可。

可以通過命令列將一個執行中的佇列修改為惰性佇列:

rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues  

命令解讀:

  • rabbitmqctl :RabbitMQ的命令列工具
  • set_policy :新增一個策略
  • Lazy :策略名稱,可以自定義
  • "^lazy-queue$"
    :用正則表示式匹配佇列的名字
  • '{"queue-mode":"lazy"}' :設定佇列模式為lazy模式
  • --apply-to queues :策略的作用物件,是所有的佇列

2.2 基於SpringAMQP宣告lazy-queue

2.2.1 基於@Bean的方式

2.2.2 基於@RabbitListener註解的方式

2.2.3 測試

在consumer服務的config包下新建LazyConfig配置類

package cn.itcast.mq.config;

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class LazyConfig {

    @Bean
    public Queue lazyQueue() {
        return QueueBuilder
                .durable("lazy.queue")
                .lazy()
                .build();
    }

    @Bean
    public Queue normalQueue() {
        return QueueBuilder
                .durable("normal.queue")
                .build();
    }

}

啟動consumer服務,檢視mq管理平臺

在publisher服務的SpringAmqpTest測試類下新添測試方法testLazyQueue()和testNormalQueue()

@Test
public void testLazyQueue() throws InterruptedException {
    for (int i=0; i<100000; i++) { //累計傳送10W條訊息
        Message message = MessageBuilder
            .withBody("hello lazy".getBytes(StandardCharsets.UTF_8))
            .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT) // 非持久化
            .build();
        rabbitTemplate.convertAndSend("lazy.queue", message);
    }
}

@Test
public void testNormalQueue() throws InterruptedException {
    for (int i=0; i<100000; i++) { //累計傳送10W條訊息
        Message message = MessageBuilder
            .withBody("hello nromal".getBytes(StandardCharsets.UTF_8))
            .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT) // 非持久化
            .build();
        rabbitTemplate.convertAndSend("normal.queue", message);
    }
}

先執行testLazyQueue(),再執行testNormalQueue()

執行中,及時檢視mq管理平臺中兩個queue的訊息傳送狀態

3. 總結

訊息堆積問題的解決方案?

  • 佇列上繫結多個消費者,提高消費速度
  • 給消費者開啟執行緒池,提高消費速度
  • 使用惰性佇列,可以再mq中儲存更多訊息

惰性佇列的優點有哪些?

  • 基於磁碟儲存,訊息上限高
  • 沒有間歇性的page-out,效能比較穩定

惰性佇列的缺點有哪些?

  • 基於磁碟儲存,訊息時效性會降低
  • 效能受限於磁碟的IO