RabbitMQ 死信佇列是什麼鬼?
阿新 • • 發佈:2021-01-01
作者:海向
來源:https://www.cnblogs.com/haixiang/p/10905189.html
死信佇列
死信佇列:沒有被及時消費的訊息存放的佇列。
訊息沒有被及時消費的原因:
- a.訊息被拒絕(basic.reject/ basic.nack)並且不再重新投遞 requeue=false
- b.TTL(time-to-live) 訊息超時未消費
- c.達到最大佇列長度
實現死信佇列步驟
首先需要設定死信佇列的 exchange 和 queue,然後進行繫結:
Exchange: dlx.exchange Queue: dlx.queue RoutingKey: # 代表接收所有路由 key
然後我們進行正常宣告交換機、佇列、繫結,只不過我們需要在普通佇列加上一個引數即可: arguments.put("x-dead-letter-exchange",' dlx.exchange' )
這樣訊息在過期、requeue失敗、 佇列在達到最大長度時,訊息就可以直接路由到死信佇列!
import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class DlxProducer { public static void main(String[] args) throws Exception { //設定連線以及建立 channel 湖綠 String exchangeName = "test_dlx_exchange"; String routingKey = "item.update"; String msg = "this is dlx msg"; //我們設定訊息過期時間,10秒後再消費 讓訊息進入死信佇列 AMQP.BasicProperties properties = new AMQP.BasicProperties().builder() .deliveryMode(2) .expiration("10000") .build(); channel.basicPublish(exchangeName, routingKey, true, properties, msg.getBytes()); System.out.println("Send message : " + msg); channel.close(); connection.close(); } } import com.rabbitmq.client.*; import java.io.IOException; import java.util.HashMap; import java.util.Map; public class DlxConsumer { public static void main(String[] args) throws Exception { //建立連線、建立channel忽略 內容可以在上面程式碼中獲取 String exchangeName = "test_dlx_exchange"; String queueName = "test_dlx_queue"; String routingKey = "item.#"; //必須設定引數到 arguments 中 Map<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-dead-letter-exchange", "dlx.exchange"); channel.exchangeDeclare(exchangeName, "topic", true, false, null); //將 arguments 放入佇列的宣告中 channel.queueDeclare(queueName, true, false, false, arguments); //一般不用程式碼繫結,在管理介面手動繫結 channel.queueBind(queueName, exchangeName, routingKey); //宣告死信佇列 channel.exchangeDeclare("dlx.exchange", "topic", true, false, null); channel.queueDeclare("dlx.queue", true, false, false, null); //路由鍵為 # 代表可以路由到所有訊息 channel.queueBind("dlx.queue", "dlx.exchange", "#"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; //6. 設定 Channel 消費者繫結佇列 channel.basicConsume(queueName, true, consumer); } }
總結
DLX也是一個正常的 Exchange,和一般的 Exchange 沒有區別,它能在任何的佇列上被指定,實際上就是設定某個佇列的屬性。
當這個佇列中有死信時,RabbitMQ 就會自動的將這個訊息重新發布到設定的 Exchange 上去,進而被路由到另一個佇列。可以監聽這個佇列中訊息做相應的處理。
近期熱文推薦:
1.Java 15 正式釋出, 14 個新特性,重新整理你的認知!!
2.終於靠開源專案弄到 IntelliJ IDEA 啟用碼了,真香!
3.我用 Java 8 寫了一段邏輯,同事直呼看不懂,你試試看。。
覺得不錯,別忘了隨手點贊+轉發哦!