RabbitMQ的訊息確認ACK機制
RabbitMQ的訊息確認ACK機制
1、什麼是訊息確認ACK。
答:如果在處理訊息的過程中,消費者的伺服器在處理訊息的時候出現異常,那麼可能這條正在處理的訊息就沒有完成訊息消費,資料就會丟失。為了確保資料不會丟失,RabbitMQ支援訊息確定-ACK。
2、ACK的訊息確認機制。
答:ACK機制是消費者從RabbitMQ收到訊息並處理完成後,反饋給RabbitMQ,RabbitMQ收到反饋後才將此訊息從佇列中刪除。
如果一個消費者在處理訊息出現了網路不穩定、伺服器異常等現象,那麼就不會有ACK反饋,RabbitMQ會認為這個訊息沒有正常消費,會將訊息重新放入佇列中。
如果在叢集的情況下,RabbitMQ會立即將這個訊息推送給這個線上的其他消費者。這種機制保證了在消費者服務端故障的時候,不丟失任何訊息和任務。
訊息的ACK確認機制預設是開啟的。
3、ACK機制的開發注意事項。
答:如果忘記了ACK,那麼後果很嚴重。當Consumer退出時候,Message會一直重新分發。然後RabbitMQ會佔用越來越多的內容,由於RabbitMQ會長時間執行,因此這個"記憶體洩漏"是致命的。
4、結合專案例項進行,理解一下ACK機制。之前寫過RabbitMQ的交換器Exchange之direct(釋出與訂閱 完全匹配),這裡藉助這個進行訊息持久化測試。生產者的程式碼不發生改變。控制層的觸發生產者生產訊息,這裡只生產一條訊息。方便觀察現象。
1 package com.example.bie.controller; 2 3 import org.springframework.beans.factory.annotation.Autowired; 4 import org.springframework.stereotype.Controller; 5 import org.springframework.web.bind.annotation.RequestMapping; 6 import org.springframework.web.bind.annotation.ResponseBody; 7 8 import com.example.bie.provider.RabbitMqLogErrorProduce; 9 import com.example.bie.provider.RabbitMqLogInfoProduce; 10 11 /** 12 * 13 * @author biehl 14 * 15 */ 16 @Controller 17 public class RabbitmqController { 18 19 @Autowired 20 private RabbitMqLogInfoProduce rabbitMqLogInfoProduce; 21 22 @Autowired 23 private RabbitMqLogErrorProduce rabbitMqLogErrorProduce; 24 25 @RequestMapping(value = "/logInfo") 26 @ResponseBody 27 public String rabbitmqSendLogInfoMessage() { 28 String msg = "生產者===>生者的LogInfo訊息message: "; 29 for (int i = 0; i < 1; i++) { 30 rabbitMqLogInfoProduce.producer(msg + i); 31 } 32 return "生產===> LogInfo訊息message ===> success!!!"; 33 } 34 35 @RequestMapping(value = "/logError") 36 @ResponseBody 37 public String rabbitmqSendLogErrorMessage() { 38 String msg = "生產者===>生者的LogError訊息message: "; 39 for (int i = 0; i < 1; i++) { 40 rabbitMqLogErrorProduce.producer(msg + i); 41 } 42 return "生產===> LogError訊息message ===> success!!!"; 43 } 44 45 }
消費者消費訊息,列印輸出後面手動丟擲執行時異常,觀察現象。
1 package com.example.bie.consumer; 2 3 import org.springframework.amqp.core.ExchangeTypes; 4 import org.springframework.amqp.rabbit.annotation.Exchange; 5 import org.springframework.amqp.rabbit.annotation.Queue; 6 import org.springframework.amqp.rabbit.annotation.QueueBinding; 7 import org.springframework.amqp.rabbit.annotation.RabbitHandler; 8 import org.springframework.amqp.rabbit.annotation.RabbitListener; 9 import org.springframework.stereotype.Component; 10 11 /** 12 * 13 * @author biehl 14 * 15 * 訊息接收者 16 * 17 * 1、@RabbitListener bindings:繫結佇列 18 * 19 * 2、@QueueBinding 20 * value:繫結佇列的名稱、exchange:配置交換器、key:路由鍵routing-key繫結佇列和交換器 21 * 22 * 3、@Queue value:配置佇列名稱、autoDelete:是否是一個可刪除的臨時佇列 23 * 24 * 4、@Exchange value:為交換器起個名稱、type:指定具體的交換器型別 25 * 26 * 27 */ 28 @Component 29 @RabbitListener(bindings = @QueueBinding( 30 31 value = @Queue(value = "${rabbitmq.config.queue.error}", autoDelete = "true"), 32 33 exchange = @Exchange(value = "${rabbitmq.config.exchange}", type = ExchangeTypes.DIRECT), 34 35 key = "${rabbitmq.config.queue.error.routing.key}")) 36 public class LogErrorConsumer { 37 38 /** 39 * 接收訊息的方法,採用訊息佇列監聽機制. 40 * 41 * @RabbitHandler意思是將註解@RabbitListener配置到類上面 42 * 43 * @RabbitHandler是指定這個方法可以進行訊息的接收並且消費. 44 * 45 * @param msg 46 */ 47 @RabbitHandler 48 public void consumer(String msg) { 49 // 列印訊息 50 System.out.println("ERROR消費者===>消費<===訊息message: " + msg); 51 throw new RuntimeException(); 52 } 53 54 }
觀察現象,如下所示:
在RabbitMQ的瀏覽器介面,可以看到一條訊息未被進行ACK的訊息確認機制,這條訊息被鎖定Unacked,所以一直在控制檯進行報錯。
控制檯效果如下所示,一直進行訊息的傳送,因為消費方一直沒有返回ACK確認,RabbitMQ認為訊息未進行正常的消費,會將訊息再次放入到佇列中,再次讓你消費,但是還是沒有返回ACK確認,依次迴圈,形成了死迴圈。
如何解決問題呢,如果訊息傳送的時候,程式出現異常,後果很嚴重的,會導致記憶體洩漏的,所以在程式處理中可以進行異常捕獲,保證消費者的程式正常執行,這裡不進行介紹了。第二種方式可以使用RabbitMQ的ack確認機制。開啟重試,然後重試次數,預設為3次。這裡設定為5次。
1 # 給當前專案起名稱. 2 spring.application.name=rabbitmq-ack-direct-consumer 3 4 # 配置埠號 5 server.port=8080 6 7 # 配置rabbitmq的引數. 8 # rabbitmq伺服器的ip地址. 9 spring.rabbitmq.host=192.168.110.133 10 # rabbitmq的埠號5672,區別於瀏覽器訪問介面的15672埠號. 11 spring.rabbitmq.port=5672 12 # rabbitmq的賬號. 13 spring.rabbitmq.username=guest 14 # rabbitmq的密碼. 15 spring.rabbitmq.password=guest 16 17 # 設定交換器的名稱,方便修改. 18 # 路由鍵是將交換器和佇列進行繫結的,佇列通過路由鍵繫結到交換器. 19 rabbitmq.config.exchange=log.exchange.direct 20 21 # info級別的佇列名稱. 22 rabbitmq.config.queue.info=log.info.queue 23 # info的路由鍵. 24 rabbitmq.config.queue.info.routing.key=log.info.routing.key 25 26 # error級別的佇列名稱. 27 rabbitmq.config.queue.error=log.error.queue 28 # error的路由鍵. 29 rabbitmq.config.queue.error.routing.key=log.error.routing.key 30 31 # 開啟重試 32 spring.rabbitmq.listener.simple.retry.enabled=true 33 # 重試次數,預設為3次 34 spring.rabbitmq.listener.simple.retry.max-attempts=5
效果如下所示:
可以看到控制檯嘗試了5次以後就不再進行重試了。
RabbitMQ的介面可以看到,開始的效果和上面的一致,但是5次嘗試以後,就變成了0條。RabbitMQ將這條訊息丟棄了。
作者:別先生
部落格園:https://www.cnblogs.com/biehongli/
如果您想及時得到個人撰寫文章以及著作的訊息推送,可以掃描上方二維碼,關注個人公眾號哦。