RabbitMQ死信機制實現延遲佇列
延遲佇列
延遲佇列儲存的物件肯定是對應的延時訊息,所謂”延時訊息”是指當訊息被髮送以後,並不想讓消費者立即拿到訊息,而是等待指定時間後,消費者才拿到這個訊息進行消費。
應用場景
三方支付,掃碼支付呼叫上游的掃碼介面,當掃碼有效期過後去呼叫查詢介面查詢結果。實現方式:每當一筆掃碼支付請求後,立即將此訂單號放入延遲佇列中(RabbitMQ),佇列過期時間為二維碼有效期,此佇列沒有設定消費者,過了有效期後訊息會重新路由到指定的的佇列,有消費者去執行訂單查詢。
RabbitMQ本身沒有直接支援延遲佇列功能,但是可以通過以下特性模擬出延遲佇列的功能。 但是我們可以通過RabbitMQ的兩個特性來曲線實現延遲佇列:
Time To Live(TTL)
RabbitMQ可以針對Queue和Message設定 x-message-tt,來控制訊息的生存時間,如果超時,則訊息變為dead letter(死信)RabbitMQ針對佇列中的訊息過期時間有兩種方法可以設定。
A: 通過佇列屬性設定,佇列中所有訊息都有相同的過期時間。
<!-- 將訊息放入此佇列裡,此佇列設定過期時間,不製造消費者讓其過期,過期後變成死信,訊息會放入指定的新佇列裡,實現訊息的延遲消費 --> <rabbit:queue name="paycenter.scanpay.orderquery.delay.icbc" durable="true" auto-delete="false" exclusive="false" > <rabbit:queue-arguments> <entry key="x-message-ttl"> <value type="java.lang.Long">${qrcode.expire.icbc}</value> </entry> <!--訊息過期根據重新路由 --> <entry key="x-dead-letter-exchange" value="directExchange"/> <entry key="x-dead-letter-routing-key" value="paycenter.scanpay.orderquery"/> </rabbit:queue-arguments> </rabbit:queue>
B: 對訊息進行單獨設定,每條訊息TTL可以不同。
<!-- 將訊息放入此佇列裡,次佇列設定過期時間,不製造消費者讓其過期,過期後變成死信,訊息會放入指定的新佇列裡,實現訊息的延遲消費 --> <rabbit:queue name="paycenter.scanpay.orderquery.delay.icbc" durable="true" auto-delete="false" exclusive="false" > <rabbit:queue-arguments> <!--訊息過期根據重新路由 --> <entry key="x-dead-letter-exchange" value="directExchange"/> <entry key="x-dead-letter-routing-key" value="paycenter.scanpay.orderquery"/> </rabbit:queue-arguments> </rabbit:queue>
amqpTemplate.convertAndSend(mqMessage.getExchange(), mqMessage.getRoutingKey(), result, new ExpirationMessagePostProcessor(expireTime));
package com.emax.paycenter.mq.pruducer;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
public class ExpirationMessagePostProcessor implements MessagePostProcessor {
private final Long ttl;
public ExpirationMessagePostProcessor(Long ttl) {
this.ttl = ttl;
}
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration(ttl.toString());
return message;
}
}
如果同時使用,則訊息的過期時間以兩者之間TTL較小的那個數值為準。訊息在佇列的生存時間一旦超過設定的TTL值,就成為dead letter
Dead Letter Exchanges(DLX)
RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可選)兩個引數,如果佇列內出現了dead letter,則按照這兩個引數重新路由。
x-dead-letter-exchange:出現dead letter之後將dead letter重新發送到指定exchange
x-dead-letter-routing-key:指定routing-key傳送
隊列出現dead letter的情況有:
訊息或者佇列的TTL過期
佇列達到最大長度
訊息被消費端拒絕(basic.reject or basic.nack)並且requeue=false
利用DLX,當訊息在一個佇列中變成死信後,它能被重新publish到另一個Exchange。這時候訊息就可以重新被消費。
注意一:ttl設定之後,下次修改時間,會報錯,這時候,需要先刪除該佇列,重啟專案。否則會報錯。
注意二:消費者中,拋異常了沒處理,會一直重複消費
注意三:下面的程式碼我模擬了1-10號訊息,訊息的內容裡面是1-10。過期的時間是10-1秒。這裡要注意,雖然10是第一個傳送,但是它過期的時間最長。
過了10s以後,消費者開始收到資料,但是它是一次性收到如下結果:10、9 、8 、7 、6、5 、4 、3 、2 、1
Consumer第一個收到的還是10。10是第一個放進佇列,但是它的過期時間最長。所以由此可見,即使一個訊息比在同一佇列中的其他訊息提前過期,提前過期的也不會優先進入死信佇列,它們還是按照入庫的順序讓消費者消費。如果第一進去的訊息過期時間是1小時,那麼死信佇列的消費者也許等1小時才能收到第一個訊息。參考官方文件發現“Only when expired messages reach the head of a queue will they actually be discarded (or dead-lettered).”只有當過期的訊息到了佇列的頂端(隊首),才會被真正的丟棄或者進入死信佇列。
所以在考慮使用RabbitMQ來實現延遲任務佇列的時候,需要確保業務上每個任務的延遲時間是一致的。如果遇到不同的任務型別需要不同的延時的話,需要為每一種不同延遲時間的訊息建立單獨的訊息佇列。(也可以考慮快取佇列,DelayQueue實現定時延遲執行任務,但是也有缺點:就是專案重啟快取裡的資料就會丟失,DelayQueue的使用詳見其他博文)
for(int i = 10; i>0; i-- ){
amqpTemplate.convertAndSend(mqMessage.getExchange(), mqMessage.getRoutingKey(), result, new ExpirationMessagePostProcessor(expireTime));
}