1. 程式人生 > >RabbitMQ死信機制實現延遲佇列

RabbitMQ死信機制實現延遲佇列

延遲佇列

延遲佇列儲存的物件肯定是對應的延時訊息,所謂”延時訊息”是指當訊息被髮送以後,並不想讓消費者立即拿到訊息,而是等待指定時間後,消費者才拿到這個訊息進行消費。

應用場景

三方支付,掃碼支付呼叫上游的掃碼介面,當掃碼有效期過後去呼叫查詢介面查詢結果。實現方式:每當一筆掃碼支付請求後,立即將此訂單號放入延遲佇列中(RabbitMQ),佇列過期時間為二維碼有效期,此佇列沒有設定消費者,過了有效期後訊息會重新路由到指定的的佇列,有消費者去執行訂單查詢。

RabbitMQ本身沒有直接支援延遲佇列功能,但是可以通過以下特性模擬出延遲佇列的功能。 但是我們可以通過RabbitMQ的兩個特性來曲線實現延遲佇列:

Time To Live(TTL)   和   Dead Letter Exchanges(DLX)

Time To Live(TTL)

RabbitMQ可以針對Queue和Message設定 x-message-tt,來控制訊息的生存時間,如果超時,則訊息變為dead letter(死信)RabbitMQ針對佇列中的訊息過期時間有兩種方法可以設定。

A: 通過佇列屬性設定,佇列中所有訊息都有相同的過期時間。

<!-- 將訊息放入此佇列裡,此佇列設定過期時間,不製造消費者讓其過期,過期後變成死信,訊息會放入指定的新佇列裡,實現訊息的延遲消費 -->
<rabbit:queue name="paycenter.scanpay.orderquery.delay.icbc" durable="true" auto-delete="false" exclusive="false" >
    <rabbit:queue-arguments>
        <entry key="x-message-ttl">
           <value type="java.lang.Long">${qrcode.expire.icbc}</value>
        </entry>
        <!--訊息過期根據重新路由 -->
        <entry key="x-dead-letter-exchange" value="directExchange"/>
        <entry key="x-dead-letter-routing-key" value="paycenter.scanpay.orderquery"/>
    </rabbit:queue-arguments>
</rabbit:queue>

B: 對訊息進行單獨設定,每條訊息TTL可以不同。

<!-- 將訊息放入此佇列裡,次佇列設定過期時間,不製造消費者讓其過期,過期後變成死信,訊息會放入指定的新佇列裡,實現訊息的延遲消費 -->
<rabbit:queue name="paycenter.scanpay.orderquery.delay.icbc" durable="true" auto-delete="false" exclusive="false" >
    <rabbit:queue-arguments>
        <!--訊息過期根據重新路由 -->
        <entry key="x-dead-letter-exchange" value="directExchange"/>
        <entry key="x-dead-letter-routing-key" value="paycenter.scanpay.orderquery"/>
    </rabbit:queue-arguments>
</rabbit:queue>
amqpTemplate.convertAndSend(mqMessage.getExchange(), mqMessage.getRoutingKey(), result, new ExpirationMessagePostProcessor(expireTime));
package com.emax.paycenter.mq.pruducer;

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;

public class ExpirationMessagePostProcessor implements MessagePostProcessor {
	private final Long ttl;

	public ExpirationMessagePostProcessor(Long ttl) {
		this.ttl = ttl;
	}

	@Override
	public Message postProcessMessage(Message message) throws AmqpException {
		message.getMessageProperties().setExpiration(ttl.toString());
		return message;
	}
}

如果同時使用,則訊息的過期時間以兩者之間TTL較小的那個數值為準。訊息在佇列的生存時間一旦超過設定的TTL值,就成為dead letter

 Dead Letter Exchanges(DLX)

RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可選)兩個引數,如果佇列內出現了dead letter,則按照這兩個引數重新路由。
x-dead-letter-exchange:出現dead letter之後將dead letter重新發送到指定exchange
x-dead-letter-routing-key:指定routing-key傳送
隊列出現dead letter的情況有:
訊息或者佇列的TTL過期
佇列達到最大長度
訊息被消費端拒絕(basic.reject or basic.nack)並且requeue=false
利用DLX,當訊息在一個佇列中變成死信後,它能被重新publish到另一個Exchange。這時候訊息就可以重新被消費。
 

注意一:ttl設定之後,下次修改時間,會報錯,這時候,需要先刪除該佇列,重啟專案。否則會報錯。

注意二:消費者中,拋異常了沒處理,會一直重複消費

注意三:下面的程式碼我模擬了1-10號訊息,訊息的內容裡面是1-10。過期的時間是10-1秒。這裡要注意,雖然10是第一個傳送,但是它過期的時間最長。

過了10s以後,消費者開始收到資料,但是它是一次性收到如下結果:10、9 、8 、7 、6、5 、4 、3 、2 、1
Consumer第一個收到的還是10。10是第一個放進佇列,但是它的過期時間最長。所以由此可見,即使一個訊息比在同一佇列中的其他訊息提前過期,提前過期的也不會優先進入死信佇列,它們還是按照入庫的順序讓消費者消費。如果第一進去的訊息過期時間是1小時,那麼死信佇列的消費者也許等1小時才能收到第一個訊息。參考官方文件發現“Only when expired messages reach the head of a queue will they actually be discarded (or dead-lettered).”只有當過期的訊息到了佇列的頂端(隊首),才會被真正的丟棄或者進入死信佇列。


所以在考慮使用RabbitMQ來實現延遲任務佇列的時候,需要確保業務上每個任務的延遲時間是一致的。如果遇到不同的任務型別需要不同的延時的話,需要為每一種不同延遲時間的訊息建立單獨的訊息佇列。(也可以考慮快取佇列,DelayQueue實現定時延遲執行任務,但是也有缺點:就是專案重啟快取裡的資料就會丟失,DelayQueue的使用詳見其他博文)

for(int i = 10; i>0; i-- ){
	amqpTemplate.convertAndSend(mqMessage.getExchange(), mqMessage.getRoutingKey(), result, new ExpirationMessagePostProcessor(expireTime));
}