HM-SpringCloud微服務系列12.3【惰性佇列】
阿新 • • 發佈:2022-05-10
1. 訊息堆積問題
當生產者傳送訊息的速度超過了消費者處理訊息的速度,就會導致佇列中的訊息堆積,直到佇列儲存訊息達到上限。之後傳送的訊息就會成為死信,可能會被丟棄,這就是訊息堆積問題。
解決訊息堆積有3種思路:
-
從消費者角度
- 增加更多消費者,提高消費速度。也就是我們之前說的work queue模式
- 在消費者內開啟執行緒池加快訊息處理速度
-
從佇列角度
- 擴大佇列容積,提高堆積上限
要提升佇列容積,把訊息儲存在記憶體中顯然是不行的。
2. 惰性佇列
從RabbitMQ的3.6.0版本開始,就增加了Lazy Queues的概念,也就是惰性佇列。
惰性佇列的特徵如下:
- 接收到訊息後直接存入磁碟而非記憶體
- 消費者要消費訊息時才會從磁碟中讀取並載入到記憶體
- 支援數百萬條的訊息儲存
2.1 基於命令列設定lazy-queue
而要設定一個佇列為惰性佇列,只需要在宣告佇列時,指定x-queue-mode屬性為lazy即可。
可以通過命令列將一個執行中的佇列修改為惰性佇列:
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
命令解讀:
-
rabbitmqctl
:RabbitMQ的命令列工具 -
set_policy
:新增一個策略 -
Lazy
:策略名稱,可以自定義 -
"^lazy-queue$"
-
'{"queue-mode":"lazy"}'
:設定佇列模式為lazy模式 -
--apply-to queues
:策略的作用物件,是所有的佇列
2.2 基於SpringAMQP宣告lazy-queue
2.2.1 基於@Bean的方式
2.2.2 基於@RabbitListener註解的方式
2.2.3 測試
在consumer服務的config包下新建LazyConfig配置類
package cn.itcast.mq.config; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.QueueBuilder; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class LazyConfig { @Bean public Queue lazyQueue() { return QueueBuilder .durable("lazy.queue") .lazy() .build(); } @Bean public Queue normalQueue() { return QueueBuilder .durable("normal.queue") .build(); } }
啟動consumer服務,檢視mq管理平臺
在publisher服務的SpringAmqpTest測試類下新添測試方法testLazyQueue()和testNormalQueue()
@Test
public void testLazyQueue() throws InterruptedException {
for (int i=0; i<100000; i++) { //累計傳送10W條訊息
Message message = MessageBuilder
.withBody("hello lazy".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT) // 非持久化
.build();
rabbitTemplate.convertAndSend("lazy.queue", message);
}
}
@Test
public void testNormalQueue() throws InterruptedException {
for (int i=0; i<100000; i++) { //累計傳送10W條訊息
Message message = MessageBuilder
.withBody("hello nromal".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT) // 非持久化
.build();
rabbitTemplate.convertAndSend("normal.queue", message);
}
}
先執行testLazyQueue(),再執行testNormalQueue()
執行中,及時檢視mq管理平臺中兩個queue的訊息傳送狀態
3. 總結
訊息堆積問題的解決方案?
- 佇列上繫結多個消費者,提高消費速度
- 給消費者開啟執行緒池,提高消費速度
- 使用惰性佇列,可以再mq中儲存更多訊息
惰性佇列的優點有哪些?
- 基於磁碟儲存,訊息上限高
- 沒有間歇性的page-out,效能比較穩定
惰性佇列的缺點有哪些?
- 基於磁碟儲存,訊息時效性會降低
- 效能受限於磁碟的IO