1. 程式人生 > 其它 >SpringAMQP Work Queue訊息傳送和接收

SpringAMQP Work Queue訊息傳送和接收

Work queues,也被稱為(Task queues),任務模型。簡單來說就是讓多個消費者繫結到一個佇列,共同消費佇列中的訊息

當訊息處理比較耗時的時候,可能生產訊息的速度會遠遠大於訊息的消費速度。長此以往,訊息就會堆積越來越多,無法及時處理。

此時就可以使用work 模型,多個消費者共同處理訊息處理,速度就能大大提高了。

1.訊息傳送

這次我們迴圈傳送,模擬大量訊息堆積現象。

在publisher服務中的SpringAmqpTest類中新增一個測試方法:

/**
     * workQueue
     * 向佇列中不停傳送訊息,模擬訊息堆積。
     */
@Test
public void testWorkQueue() throws InterruptedException {
    // 佇列名稱
    String queueName = "simple.queue";
    // 訊息
    String message = "hello, message_";
    for (int i = 0; i < 50; i++) {
        // 傳送訊息
        rabbitTemplate.convertAndSend(queueName, message + i);
        Thread.sleep(20);
    }
}

  

2.訊息接收

要模擬多個消費者繫結同一個佇列,我們在consumer服務的SpringRabbitListener中新增2個新的方法:

@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
    System.out.println("消費者1接收到訊息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(20);
}

@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
    System.err.println("消費者2........接收到訊息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(200);
}

  

3.測試

啟動ConsumerApplication後,在執行publisher服務中剛剛編寫的傳送測試方法testWorkQueue。

可以看到消費者1很快完成了自己的25條訊息。消費者2卻在緩慢的處理自己的25條訊息。

也就是說訊息是平均分配給每個消費者,並沒有考慮到消費者的處理能力。這樣顯然是有問題的。

4.能者多勞

在spring中有一個簡單的配置,可以解決這個問題。我們修改consumer服務的application.yml檔案,新增配置:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能獲取一條訊息,處理完成才能獲取下一個訊息

  

5.總結

Work模型的使用:

  • 多個消費者繫結到一個佇列,同一條訊息只會被一個消費者處理

  • 通過設定prefetch來控制消費者預取的訊息數量