1. 程式人生 > 實用技巧 >RabbitMQ的訊息確認ACK機制

RabbitMQ的訊息確認ACK機制

RabbitMQ的訊息確認ACK機制

1、什麼是訊息確認ACK。

  答:如果在處理訊息的過程中,消費者的伺服器在處理訊息的時候出現異常,那麼可能這條正在處理的訊息就沒有完成訊息消費,資料就會丟失。為了確保資料不會丟失,RabbitMQ支援訊息確定-ACK。

2、ACK的訊息確認機制。

  答:ACK機制是消費者從RabbitMQ收到訊息並處理完成後,反饋給RabbitMQ,RabbitMQ收到反饋後才將此訊息從佇列中刪除。

    如果一個消費者在處理訊息出現了網路不穩定、伺服器異常等現象,那麼就不會有ACK反饋,RabbitMQ會認為這個訊息沒有正常消費,會將訊息重新放入佇列中。
    如果在叢集的情況下,RabbitMQ會立即將這個訊息推送給這個線上的其他消費者。這種機制保證了在消費者服務端故障的時候,不丟失任何訊息和任務。

    訊息永遠不會從RabbitMQ中刪除,只有當消費者正確傳送ACK反饋,RabbitMQ確認收到後,訊息才會從RabbitMQ伺服器的資料中刪除。
    訊息的ACK確認機制預設是開啟的。

3、ACK機制的開發注意事項。

  答:如果忘記了ACK,那麼後果很嚴重。當Consumer退出時候,Message會一直重新分發。然後RabbitMQ會佔用越來越多的內容,由於RabbitMQ會長時間執行,因此這個"記憶體洩漏"是致命的。

4、結合專案例項進行,理解一下ACK機制。之前寫過RabbitMQ的交換器Exchange之direct(釋出與訂閱 完全匹配),這裡藉助這個進行訊息持久化測試。生產者的程式碼不發生改變。控制層的觸發生產者生產訊息,這裡只生產一條訊息。方便觀察現象。

複製程式碼

 1 package com.example.bie.controller;
 2 
 3 import org.springframework.beans.factory.annotation.Autowired;
 4 import org.springframework.stereotype.Controller;
 5 import org.springframework.web.bind.annotation.RequestMapping;
 6 import org.springframework.web.bind.annotation.ResponseBody;
 7 
 8 import com.example.bie.provider.RabbitMqLogErrorProduce;
 9 import com.example.bie.provider.RabbitMqLogInfoProduce;
10 
11 /**
12  * 
13  * @author biehl
14  *
15  */
16 @Controller
17 public class RabbitmqController {
18 
19     @Autowired
20     private RabbitMqLogInfoProduce rabbitMqLogInfoProduce;
21 
22     @Autowired
23     private RabbitMqLogErrorProduce rabbitMqLogErrorProduce;
24 
25     @RequestMapping(value = "/logInfo")
26     @ResponseBody
27     public String rabbitmqSendLogInfoMessage() {
28         String msg = "生產者===>生者的LogInfo訊息message: ";
29         for (int i = 0; i < 1; i++) {
30             rabbitMqLogInfoProduce.producer(msg + i);
31         }
32         return "生產===>  LogInfo訊息message  ===> success!!!";
33     }
34 
35     @RequestMapping(value = "/logError")
36     @ResponseBody
37     public String rabbitmqSendLogErrorMessage() {
38         String msg = "生產者===>生者的LogError訊息message: ";
39         for (int i = 0; i < 1; i++) {
40             rabbitMqLogErrorProduce.producer(msg + i);
41         }
42         return "生產===>  LogError訊息message  ===> success!!!";
43     }
44 
45 }

複製程式碼

消費者消費訊息,列印輸出後面手動丟擲執行時異常,觀察現象。

複製程式碼

 1 package com.example.bie.consumer;
 2 
 3 import org.springframework.amqp.core.ExchangeTypes;
 4 import org.springframework.amqp.rabbit.annotation.Exchange;
 5 import org.springframework.amqp.rabbit.annotation.Queue;
 6 import org.springframework.amqp.rabbit.annotation.QueueBinding;
 7 import org.springframework.amqp.rabbit.annotation.RabbitHandler;
 8 import org.springframework.amqp.rabbit.annotation.RabbitListener;
 9 import org.springframework.stereotype.Component;
10 
11 /**
12  * 
13  * @author biehl
14  * 
15  *         訊息接收者
16  * 
17  *         1、@RabbitListener bindings:繫結佇列
18  * 
19  *         2、@QueueBinding
20  *         value:繫結佇列的名稱、exchange:配置交換器、key:路由鍵routing-key繫結佇列和交換器
21  * 
22  *         3、@Queue value:配置佇列名稱、autoDelete:是否是一個可刪除的臨時佇列
23  * 
24  *         4、@Exchange value:為交換器起個名稱、type:指定具體的交換器型別
25  * 
26  * 
27  */
28 @Component
29 @RabbitListener(bindings = @QueueBinding(
30 
31         value = @Queue(value = "${rabbitmq.config.queue.error}", autoDelete = "true"),
32 
33         exchange = @Exchange(value = "${rabbitmq.config.exchange}", type = ExchangeTypes.DIRECT),
34 
35         key = "${rabbitmq.config.queue.error.routing.key}"))
36 public class LogErrorConsumer {
37 
38     /**
39      * 接收訊息的方法,採用訊息佇列監聽機制.
40      * 
41      * @RabbitHandler意思是將註解@RabbitListener配置到類上面
42      * 
43      * @RabbitHandler是指定這個方法可以進行訊息的接收並且消費.
44      * 
45      * @param msg
46      */
47     @RabbitHandler
48     public void consumer(String msg) {
49         // 列印訊息
50         System.out.println("ERROR消費者===>消費<===訊息message: " + msg);
51         throw new RuntimeException();
52     }
53 
54 }

複製程式碼

觀察現象,如下所示:

在RabbitMQ的瀏覽器介面,可以看到一條訊息未被進行ACK的訊息確認機制,這條訊息被鎖定Unacked,所以一直在控制檯進行報錯。

控制檯效果如下所示,一直進行訊息的傳送,因為消費方一直沒有返回ACK確認,RabbitMQ認為訊息未進行正常的消費,會將訊息再次放入到佇列中,再次讓你消費,但是還是沒有返回ACK確認,依次迴圈,形成了死迴圈。

如何解決問題呢,如果訊息傳送的時候,程式出現異常,後果很嚴重的,會導致記憶體洩漏的,所以在程式處理中可以進行異常捕獲,保證消費者的程式正常執行,這裡不進行介紹了。第二種方式可以使用RabbitMQ的ack確認機制。開啟重試,然後重試次數,預設為3次。這裡設定為5次。

複製程式碼

 1 # 給當前專案起名稱.
 2 spring.application.name=rabbitmq-ack-direct-consumer
 3 
 4 # 配置埠號
 5 server.port=8080
 6 
 7 # 配置rabbitmq的引數.
 8 # rabbitmq伺服器的ip地址.
 9 spring.rabbitmq.host=192.168.110.133
10 # rabbitmq的埠號5672,區別於瀏覽器訪問介面的15672埠號.
11 spring.rabbitmq.port=5672
12 # rabbitmq的賬號.
13 spring.rabbitmq.username=guest
14 # rabbitmq的密碼.
15 spring.rabbitmq.password=guest
16 
17 # 設定交換器的名稱,方便修改.
18 # 路由鍵是將交換器和佇列進行繫結的,佇列通過路由鍵繫結到交換器.
19 rabbitmq.config.exchange=log.exchange.direct
20 
21 # info級別的佇列名稱.
22 rabbitmq.config.queue.info=log.info.queue
23 # info的路由鍵.
24 rabbitmq.config.queue.info.routing.key=log.info.routing.key
25 
26 # error級別的佇列名稱.
27 rabbitmq.config.queue.error=log.error.queue
28 # error的路由鍵.
29 rabbitmq.config.queue.error.routing.key=log.error.routing.key
30 
31 # 開啟重試
32 spring.rabbitmq.listener.simple.retry.enabled=true
33 # 重試次數,預設為3次
34 spring.rabbitmq.listener.simple.retry.max-attempts=5

複製程式碼

效果如下所示:

可以看到控制檯嘗試了5次以後就不再進行重試了。

RabbitMQ的介面可以看到,開始的效果和上面的一致,但是5次嘗試以後,就變成了0條。RabbitMQ將這條訊息丟棄了。

作者:別先生

部落格園:https://www.cnblogs.com/biehongli/

如果您想及時得到個人撰寫文章以及著作的訊息推送,可以掃描上方二維碼,關注個人公眾號哦。