RabbitMQ 訊息中介軟體如何保證消費者customer能夠成功處理訊息?
阿新 • • 發佈:2019-05-02
一、確保消費者customer處理訊息成功
預設情況下消費者C1接收到訊息1無論是否正常接受和處理都會立即應答rabbit伺服器,然後訊息1就會從佇列中被刪除,假如C1突然出現異常狀況導致訊息1沒有被處理完畢,那麼訊息1就處理失敗了,也不會有其他消費者去處理訊息1。事實上我們希望的是訊息1如果沒有被C1正確處理完畢,那麼就傳送給其他消費者處理,為了達到這個目的,只需要做兩件事情,第一關閉rabbitMq的自動應答機制,第二消費者正確處理完訊息後手動應答。
RabbitMQ應答機制:
- 自動確認,預設是自動確認,即獲取訊息後,直接確認。
- 手動確認,給當前訊息設定狀態,當手動ack後服務端才會刪除該訊息,如果返回nack,重新入隊。
customer在監聽佇列接收訊息的時候,申明取消自動應答,手動返回完成。
channel.basicConsume(QUEUE_NAME, false, consumer);
在完成消費操作時,返回確認狀態。
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
- delivery.getEnvelope().getDeliveryTag(): 訊息id
- true: 這裡true或者false都代表已經應答
二、存在兩個大問題
問題: 1)重複消費 2) 訊息堆積
1. 訊息重發導致訊息重複消費的問題(訊息中介軟體冪等性)
如果在消費方customer在完成消費了之後,由於網路問題沒有及時應答,就會存在大量訊息堆積在MQ伺服器。
由於RabbitMQ有訊息重新發送的機制,如果沒有及時迴應那麼就會繼續重發,重發就會導致訊息重複消費。
(1)在生產者producer產生訊息的時候可以給訊息一個唯一的id。
(2)在執行完畢之後,利用redis快取訊息id,判斷時候消費過。
單個消費者:
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String value = redis.get("key_"+envelope.getDeliveryTag()); if (value != null){ //之前已經執行過了,所以直接應答 channel.basicAck(deliveryTag, true); return ; } String exchange = envelope.getExchange();//交換 long deliveryTag = envelope.getDeliveryTag();//訊息id String routingKey = envelope.getRoutingKey();//路由key String message = new String(body, "utf-8"); System.out.println(message); //先在redis中放入訊息id,記得加上過期時間 // 返回確認狀態 }
多個消費者: 分散式鎖解決
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try{
//在賦值鎖的時候可以加上過期時間
boolean flag = redisTemplate.opsForValue().setIfAbsent("key_"+envelope.getDeliveryTag(),envelope.getDeliveryTag());
//如果沒有被賦值則返回true
if(flag){
String exchange = envelope.getExchange();//交換
long deliveryTag = envelope.getDeliveryTag();//訊息id
String routingKey = envelope.getRoutingKey();//路由key
String message = new String(body, "utf-8");
System.out.println(message);
// 返回確認狀態
}
}catch(Exception e){
//列印日誌
//刪除redis中的記錄
//直接return;不應答,等待再次重新發送。
}
}
2. 訊息堆積解決
1)加大rabbitMQ的記憶體空間