rabbitmq訊息佇列設定過期時間和過期訊息處理
阿新 • • 發佈:2018-11-30
rabbitmq訊息佇列設定過期時間和過期訊息處理
適用場景
- 電商秒殺搶購活動中處理使用者下單和付款時間不一致,設定過期時間,過期則不允許付款
參考 https://blog.csdn.net/zhu_tianwei/article/details/53563311
程式碼塊
生產者
public class Producer {
private static String queue_name = "message_ttl_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.31.2");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setPort(5672);
factory.setVirtualHost("/guest");
Connection connection = factory .newConnection();
Channel channel = connection.createChannel();
HashMap<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-dead-letter-exchange", "amq.direct");
arguments.put("x-dead-letter-routing-key", "message_ttl_routingKey");
channel .queueDeclare("delay_queue", true, false, false, arguments);
// 宣告佇列
channel.queueDeclare(queue_name, true, false, false, null);
// 繫結路由
channel.queueBind(queue_name, "amq.direct", "message_ttl_routingKey");
String message = "hello world!" + System.currentTimeMillis();
// 設定延時屬性
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
// 永續性 non-persistent (1) or persistent (2)
AMQP.BasicProperties properties = builder.expiration("5000").deliveryMode(2).build();
// routingKey =delay_queue 進行轉發
channel.basicPublish("", "delay_queue", properties, message.getBytes());
System.out.println("sent message: " + message + ",date:" + System.currentTimeMillis());
// 關閉頻道和連線
channel.close();
connection.close();
}
}
消費者
public class Receiver {
private static String queue_name = "message_ttl_queue";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.31.2");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setPort(5672);
factory.setVirtualHost("/guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 宣告佇列
channel.queueDeclare(queue_name, true, false, false, null);
// 繫結路由
channel.queueBind(queue_name, "amq.direct", "message_ttl_routingKey");
// channel.exchangeDeclare("exchange_name", "direct", true);
// channel.queueBind("queue_name", "exchange_name", "");
channel.basicConsume(queue_name, false,
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
String str = new String(body);
System.out.println(str + " " + System.currentTimeMillis());
long deliveryTag = envelope.getDeliveryTag();
// (process the message components here ...)
channel.basicAck(deliveryTag, false);
}
});
System.out.println(channel);
}
}
自動回覆與手動回覆
channel.basicConsume(queue_name, false, ...)中flase表示手動回覆,此時要手動回覆,不然訊息會持久存在佇列中,回覆程式碼如下
long deliveryTag = envelope.getDeliveryTag();
// (process the message components here ...)
channel.basicAck(deliveryTag, false);