SpringAMQP Work Queue訊息傳送和接收
阿新 • • 發佈:2021-11-02
當訊息處理比較耗時的時候,可能生產訊息的速度會遠遠大於訊息的消費速度。長此以往,訊息就會堆積越來越多,無法及時處理。
此時就可以使用work 模型,多個消費者共同處理訊息處理,速度就能大大提高了。
1.訊息傳送
這次我們迴圈傳送,模擬大量訊息堆積現象。
在publisher服務中的SpringAmqpTest類中新增一個測試方法:
/** * workQueue * 向佇列中不停傳送訊息,模擬訊息堆積。 */ @Test public void testWorkQueue() throws InterruptedException { // 佇列名稱 String queueName = "simple.queue"; // 訊息 String message = "hello, message_"; for (int i = 0; i < 50; i++) { // 傳送訊息 rabbitTemplate.convertAndSend(queueName, message + i); Thread.sleep(20); } }
2.訊息接收
要模擬多個消費者繫結同一個佇列,我們在consumer服務的SpringRabbitListener中新增2個新的方法:
@RabbitListener(queues = "simple.queue") public void listenWorkQueue1(String msg) throws InterruptedException { System.out.println("消費者1接收到訊息:【" + msg + "】" + LocalTime.now()); Thread.sleep(20); } @RabbitListener(queues = "simple.queue") public void listenWorkQueue2(String msg) throws InterruptedException { System.err.println("消費者2........接收到訊息:【" + msg + "】" + LocalTime.now()); Thread.sleep(200); }
3.測試
啟動ConsumerApplication後,在執行publisher服務中剛剛編寫的傳送測試方法testWorkQueue。
可以看到消費者1很快完成了自己的25條訊息。消費者2卻在緩慢的處理自己的25條訊息。
也就是說訊息是平均分配給每個消費者,並沒有考慮到消費者的處理能力。這樣顯然是有問題的。
4.能者多勞
在spring中有一個簡單的配置,可以解決這個問題。我們修改consumer服務的application.yml檔案,新增配置:
spring: rabbitmq: listener: simple: prefetch: 1 # 每次只能獲取一條訊息,處理完成才能獲取下一個訊息
5.總結
Work模型的使用:
-
多個消費者繫結到一個佇列,同一條訊息只會被一個消費者處理
-
通過設定prefetch來控制消費者預取的訊息數量