RabbitMQ高階特性-消費端的ack和重回佇列
阿新 • • 發佈:2020-06-24
[TOC]
消費端的手工ack和nack
什麼是ack和nack
ack
表示告知RabbitMQ
已經成功消費訊息
nack
表示告知RabbitMQ
消費端處理訊息失敗
手工ack和nack使用場景
- 消費端進行消費的時候,由於業務異常,我們可以進行日誌記錄,後續做補償操作。
- 消費端由於伺服器宕機等嚴重問題,比如訊息消費一半時宕機,
RabbitMQ
既收不到ack
也收不到nack
,此時消費端採用手工ack
,等消費端服務重啟好後,RabbitMQ
回重發此未能消費成功的訊息,保障訊息消費成功
消費端的重回佇列
消費端重回佇列是為了對沒有處理成功的訊息,把訊息重新遞給Broker
一般我們在實際應用中,都會關閉重回佇列
程式碼實現
Producer
package com.wyg.rabbitmq.javaclient.consumer_ack;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 消費者手工ack和nack
*
* @author [email protected]
* @date 2019-11-22 13:25
* @since JDK1.8
* @version V1.0
*/
public class Producer {
private static final String HOST = "localhost";
private static final int PORT = 5672;
private static final String USERNAME = "guset" ;
private static final String PASSWORD = "guset";
public static void main(String[] args) throws IOException,TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setVirtualHost("/");
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_ack_exchange";
String routingKey = "ack.abc";
// 申明exchange
channel.exchangeDeclare(exchangeName,"topic");
for (int i = 0; i < 6; i++) {
Map<String,Object> map = new HashMap<>();
map.put("num",i);
AMQP.BasicProperties props =
new AMQP.BasicProperties.Builder().deliveryMode(2).contentEncoding("UTF-8").headers(map).build();
String msg = "這是第" + i + "條ack訊息";
channel.basicPublish(exchangeName,routingKey,false,props,msg.getBytes("UTF-8"));
}
channel.close();
connection.close();
}
}
複製程式碼
Consumer
package com.wyg.rabbitmq.javaclient.consumer_ack;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.*;
/**
* 消費者手工ack和nack
*
* @author [email protected]
* @date 2019-11-22 14:07
* @since JDK1.8
* @version V1.0
*/
public class Consumer {
private static final String HOST = "localhost";
private static final int PORT = 5672;
private static final String USERNAME = "guset";
private static final String PASSWORD = "guset";
public static void main(String[] args) throws IOException,TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setVirtualHost("/");
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queueName = "test_ack_queue";
String exchangeName = "test_ack_exchange";
String routingKey = "ack.#";
// 申明exchange
channel.exchangeDeclare(exchangeName,"topic");
// 申明佇列
channel.queueDeclare(queueName,true,null);
// 佇列繫結到exchange
channel.queueBind(queueName,exchangeName,null);
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag,Delivery message) throws IOException {
// consumer手動 ack 給broker
int num = (int)message.getProperties().getHeaders().get("num");
// 根據headers裡的num做判斷,num<3,發ack給broker,並將訊息重新入隊
if (num < 3) {
System.out.println("---消費端nack---DeliveryTag:" + message.getEnvelope().getDeliveryTag() + ","
+ new String(message.getBody(),"UTF-8"));
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
} else {
// 根據headers裡的num做判斷,num>=3,發nack給broker,並將訊息重新入隊
System.out.println("---消費端nack---DeliveryTag:" + message.getEnvelope().getDeliveryTag() + ","UTF-8"));
channel.basicNack(message.getEnvelope().getDeliveryTag(),true);
}
}
};
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
System.out.println("---消費者--:cancelCallback");
}
};
// 消費訊息,autoAck一定要設為false,手工ack
channel.basicConsume(queueName,deliverCallback,cancelCallback);
}
}
複製程式碼
執行結果
發現前3條訊息成功消費,手工發ack
給Broker
最後3條訊息,發nack
給Broker
,並不斷重回佇列尾端,broker再將其推給消費端,一直迴圈消費失敗