RabbitMQ高階特性-消費端ACK與重回佇列
阿新 • • 發佈:2018-12-31
消費端ACK與重回佇列
消費端ACK
- 消費端的手工ACK和NACK, ACK是確認成功消費, NACK表示訊息處理失敗, 會重發訊息
- 消費端進行消費的時候, 如果由於業務異常我們可以進行日誌的記錄, 然後進行補償
- 如果由於伺服器宕機等嚴重問題, 就需要手工進行ACK保障消費端消費成功
重回佇列
- 消費端重回佇列是為了對沒有處理成功的訊息, 把訊息重新回遞給Broker
- 一般在實際應用中, 都會關閉重回佇列, 也就是設定為False
程式碼演示
自定義消費者
package com.qiyexue.api.ack;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
* 自定義消費者
* 設定手動ack和重回佇列
* @author 七夜雪
* @date 2018-12-16 8:20
*/
public class MyConsumer extends DefaultConsumer {
private Channel channel;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("-------------自定義消費者------------");
System.out.println("consumerTag : " + consumerTag);
System.out.println("envelope : " + envelope);
System.out.println("properties : " + properties);
System.out.println("body : " + new String(body));
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
if ((Integer)properties.getHeaders().get("num") == 0) {
// 三個引數 : DeliveryTag, 是否批量拒絕, 是否可以重回佇列
channel.basicNack(envelope.getDeliveryTag(), false, true);
} else {
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
}
Producer
package com.qiyexue.api.ack;
import com.rabbitmq.client.*;
import java.util.HashMap;
import java.util.Map;
/**
* 生產者
*
* @author 七夜雪
* @date 2018-12-15 19:56
*/
public class Producer {
public static void main(String[] args) throws Exception {
// 1. 建立ConnectionFactory, 並設定屬性
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.72.138");
factory.setPort(5672);
factory.setVirtualHost("/");
// 2. 建立連線
Connection connection = factory.newConnection();
// 3. 建立channel
Channel channel = connection.createChannel();
String exchangeName = "test_ack_exchange";
String routingKey = "ack.qiye";
for (int i = 0; i < 5; i++) {
// 傳送訊息
String msg = "Hello, 七夜雪 " + i;
Map<String, Object> hearder = new HashMap<String, Object>();
hearder.put("num", i);
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.contentEncoding("UTF-8")
.headers(hearder).build();
channel.basicPublish(exchangeName, routingKey, true, properties, msg.getBytes());
}
// 關閉連線
channel.close();
connection.close();
}
}
Consumer
package com.qiyexue.api.ack;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 消費者,關閉自動ack
*
* @author 七夜雪
* @date 2018-12-15 20:07
*/
public class Consumer {
public static void main(String[] args) throws Exception {
// 1. 建立連線工廠並設定屬性
ConnectionFactory factory = new ConnectionFactory();;
factory.setHost("192.168.72.138");
factory.setPort(5672);
factory.setVirtualHost("/");
// 2. 建立連線
Connection connection = factory.newConnection();
// 3. 建立channel
Channel channel = connection.createChannel();
// 4. 宣告Exchange
String exchangeName = "test_ack_exchange";
String exchangeType = "topic";
String routingKey = "ack.*";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, null);
// 5. 宣告訊息佇列
String queueName = "test_ack_queue";
channel.queueDeclare(queueName, true, false, false, null);
// 6. 繫結佇列和Exchange
channel.queueBind(queueName, exchangeName, routingKey);
// 7. 設定消費者為自定義的消費者, 將autoAck設定為false
channel.basicConsume(queueName, false, new MyConsumer(channel));
}
}