RabbitMQ高階特性-死信佇列
阿新 • • 發佈:2020-06-24
死信
什麼是死信
訊息沒有任何消費者去消費就變為死信
訊息變為死信有以下幾種情況
-
訊息被拒絕(basic.reject/basic.nack),並且requeue=false
-
訊息TTL過期
-
佇列達到最大長度
死信佇列 Dead Letter Exchange(DLX)
DLX
利用DLX,當訊息在一個佇列中變成死信之後,它能被重新publish到另外一個exchange,這個exchange就是DLX。
DLX也是一個正常的Exchange,和一般的Exchange沒有區別,它能在任何佇列上被指定,實際就是設定某個佇列的屬性。當佇列中有死信時,RabbitMQ會自動將死信訊息傳送到設定的DLX,進而被路由到另外一個佇列,可以監聽這個佇列,做後續處理。
死信佇列設定
- 申明死信佇列的Exchange和queue,然後進行繫結
- 申明正常佇列Exchange和queue繫結,只不過要在佇列上加引數
arguments.put(x-dead-letter-exchange","you dlx");
程式碼實現
producer
package com.wyg.rabbitmq.javaclient.dlx;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 消費者手工ack和nack
*
* @author [email protected]
* @date 2019-11-22 13:25
* @since JDK1.8
* @version V1.0
*/
public class Producer {
private static final String HOST = "localhost";
private static final int PORT = 5672 ;
private static final String USERNAME = "guset";
private static final String PASSWORD = "guset";
public static void main(String[] args) throws IOException,TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setVirtualHost("/");
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_dlx_exchange";
String routingKey = "dlx.abc";
// 申明exchange
channel.exchangeDeclare(exchangeName,"topic");
String msg = "正常訊息1,routingKey:" + routingKey;
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().deliveryMode(2).expiration("2000").build();
channel.basicPublish(exchangeName,routingKey,false,null,msg.getBytes("UTF-8"));
// 該訊息無消費者消費
String msg2 = "過期死信訊息2,routingKey:" + routingKey;
channel.basicPublish(exchangeName,props,msg2.getBytes("UTF-8"));
String msg3 = "過期死信訊息3,routingKey:" + routingKey;
channel.basicPublish(exchangeName,msg3.getBytes("UTF-8"));
String msg4 = "過期死信訊息4,routingKey:" + routingKey;
channel.basicPublish(exchangeName,msg4.getBytes("UTF-8"));
channel.close();
connection.close();
}
}
複製程式碼
producer可以採用訊息過期產生死信
正常consumer
package com.wyg.rabbitmq.javaclient.dlx;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.*;
/**
* 消費者手工ack和nack
*
* @author [email protected]
* @date 2019-11-22 14:07
* @since JDK1.8
* @version V1.0
*/
public class Consumer {
private static final String HOST = "localhost";
private static final int PORT = 5672;
private static final String USERNAME = "guset";
private static final String PASSWORD = "guset";
public static void main(String[] args) throws IOException,TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setVirtualHost("/");
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 定義死信隊的Exchange
String dlxExchange = "dlx.exchange";
channel.exchangeDeclare(dlxExchange,"topic");
// 死信佇列名
String dlxQueue = "dlx.queue";
channel.queueDeclare(dlxQueue,true,null);
// # 表示所有的key都可以路由到s死信佇列
String dlxRoutingKey = "#";
// 繫結死信佇列和exchange
channel.queueBind(dlxQueue,dlxExchange,dlxRoutingKey,null);
// 定義正常的消費者j監聽佇列
String queueName = "test_dlx_queue";
String exchangeName = "test_dlx_exchange";
String routingKey = "dlx.#";
// 申明exchange
channel.exchangeDeclare(exchangeName,"topic");
// 申明佇列
Map<String,Object> arguments = new HashMap<>();
// 設定死信佇列,arguments要設定到申明的佇列上
arguments.put("x-dead-letter-exchange",dlxExchange);
channel.queueDeclare(queueName,arguments);
// 佇列繫結到exchange
channel.queueBind(queueName,exchangeName,routingKey);
channel.basicQos(1);
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag,Delivery message) throws IOException {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("---消費者-- " + new String(message.getBody(),"UTF-8"));
}
};
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
System.out.println("---消費者--:cancelCallback ");
}
};
// 消費訊息,autoAck一定要設為false,手工ack
channel.basicConsume(queueName,deliverCallback,cancelCallback);
}
}
複製程式碼
執行結果:只消費一條正常訊息,其他過期的未消費
DLXConusmer,監聽消費死信佇列中的訊息
package com.wyg.rabbitmq.javaclient.dlx;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.*;
/**
* 監聽私信佇列
*
* @author [email protected]
* @date 2019-11-22 21:52
* @since JDK1.8
* @version V1.0
*/
public class DLXConusmer {
private static final String HOST = "localhost";
private static final int PORT = 5672;
private static final String USERNAME = "guset";
private static final String PASSWORD = "guset";
public static void main(String[] args) throws IOException,TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setVirtualHost("/");
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queueName = "dlx.queue";
String exchangeName = "dlx.exchange";
String routingKey = "#";
// 申明exchange
channel.exchangeDeclare(exchangeName,"topic");
// 申明佇列
channel.queueDeclare(queueName,null);
// 佇列繫結到exchange
channel.queueBind(queueName,null);
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag,Delivery message) throws IOException {
try {
System.out.println("---死信佇列消費者---");
System.out.println(new String(message.getBody(),"UTF-8"));
} finally {
// consumer手動 ack 給broker
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
}
}
};
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
System.out.println("---消費者:cancelCallback");
}
};
// 消費訊息,autoAck一定要設定為false
channel.basicConsume(queueName,cancelCallback);
}
}
複製程式碼
執行結果:3條過期的訊息進入死信佇列,並被消費