1. 程式人生 > >RabbitMQ(四)RabbitMQ死信郵箱(DLX)

RabbitMQ(四)RabbitMQ死信郵箱(DLX)

DLX,Dead-Letter-Exchange(死信郵箱)

利用DLX,當訊息在一個佇列中變成死信後,它能被重新publish到另一個Exchange,這個Exchange就是DLX。訊息變成死信一向有以下幾種情況:

  • 訊息被拒絕(basic.reject or basic.nack)並且requeue=false
  • 訊息TTL過期
  • 佇列達到最大長度

DLX也是一下正常的Exchange同一般的Exchange沒有區別,它能在任何的佇列上被指定,實際上就是設定某個佇列的屬性,當這個佇列中有死信時,RabbitMQ就會自動的將這個訊息重新發布到設定的Exchange中去,進而被路由到另一個佇列,publish可以監聽這個佇列中訊息做相應的處理,

這個特性可以彌補RabbitMQ 3.0.0以前支援的immediate引數中的向publish確認的功能。

JAVA的SDK設定

channel.exchangeDeclare("some.exchange.name", "direct");

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "some.exchange.name");
channel.queueDeclare("myqueue", false, false, false, args);

你也可以為這個DLX指定routing key,如果沒有特殊指定,則使用原佇列的routing key 
args.put("x-dead-letter-routing-key", "some-routing-key");

還可以使用policy來配置:
rabbitmqctl set_policy DLX ".*" '{"dead-letter-exchange":"my-dlx"}' --apply-to queues

“死信”被republish時,會在訊息的header中增加一個叫做“x-death"的陣列內容,包含了以下欄位內容:

  • queue - the name of the queue the message was in before it was dead-lettered,
  • reason - see below,
  • time - the date and time the message was dead lettered as a 64-bit AMQP format timestamp,
  • exchange - the exchange the message was published to (note that this will be a dead letter exchange if the message is dead lettered multiple times), and
  • routing-keys - the routing keys (including CC keys but excluding BCC ones) the message was published with.
  • original-expiration (if the message was dead-letterered due to per-message TTL) - the originalexpiration property of the message. The expiration property is removed from the message on dead-lettering in order to prevent it from expiring again in any queues it is routed to.

The reason is a name describing why the message was dead-lettered and is one of the following:

  • rejected - the message was rejected with requeue=false,
  • expired - the TTL of the message expired; or
  • maxlen - the maximum allowed queue length was exceeded.

在使用immediate引數時,如果消費者拒收訊息這個應答是沒有辦法通知到publish的,但是利用DLX,拒收的訊息會變成“死信”,這個死信會被republish到DLX路由的佇列中去,分析這個新的資訊的header中的x-death內容,可以判斷出訊息變成死信的原因,這樣publish端可以做更靈活的處理。

TTL和DLX結合使用的一種場景

假如我需要一個這樣的應用場景,consumer可以無條件退出,退出後我希望它監聽的佇列也一併刪除掉,但是如果佇列如果有訊息,要求訊息能被轉發到DLX佇列。

如果用queue的AUTO_DELETE功能,則consumer退出後,不管佇列中是否有訊息,佇列都會被刪除,這個方法不行。

我們可以這樣的變通實現:

1、將佇列的"x-expires"引數為300S,即佇列在沒有消費者空閒300S後將被自動刪除 )

2、將佇列的"x-message-ttl"引數為30S,即訊息只存活30S

3、在生產者publish訊息時,設定 immediate屬性,即要求一publish時一定有一個消費者存在

這樣子生產者不停的將訊息成功放進了佇列,由於publish時判斷了immediate屬性,也就是說publish訊息時,消費者是一定存在的,假如由於消費者處理的不及時導致訊息有所堆積,在30S內還沒有處理的話,這些訊息將會被republish到DLX路由的佇列中儲存,即使在這時消費者異常退出了,也沒關係,因為佇列至少也將在300S後才自動刪除,有足夠長的時間讓所有消費都republish至DLX佇列

這個應用場景是我自己亂想的,純粹是為了學習RabbitMQ而想的,實際應用中可以有其它代替方案來實現,就當學習了。