RabbitMQ高階特性-Confirm確認訊息
阿新 • • 發佈:2018-12-31
Confirm確認訊息
-
訊息的確認, 是指生產者投遞訊息後, 如果Broker收到訊息, 則會給我們產生一個應答
-
生產者進行接收應答, 用來確定這條訊息是否正常傳送到Broker, 這種方式也是訊息的可靠性投遞的核心保障
如何實現Confirm確認訊息
- 在channel上開啟確認模式 : channel.confirmSelect()
- 在channel上新增監聽 : addConfirmListener, 監聽成功和失敗的返回結果, 根據具體的結果對訊息進行重新發送, 或記錄日誌等後續處理
Producer程式碼
package com.qiyexue.confirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* Confirm確認訊息模式生產者
*
* @author 七夜雪
* @create 2018-12-15 16:12
*/
public class ProducerByConfirm {
public static void main(String[] args) throws Exception {
// 1. 建立ConnectionFactory, 並設定屬性
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.72.138");
factory.setPort(5672);
factory.setVirtualHost("/");
// 2. 建立連線
Connection connection = factory.newConnection();
// 3. 建立channel
Channel channel = connection.createChannel();
// 4. 開啟Confirm模式
channel.confirmSelect();
String exchangeName = "test_confirm_exchange";
String routingKey = "confirm.qiye";
// 5. 傳送訊息
String msg = "Send Msg By Confirm ...";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
// 6. 設定監聽
channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("-------ACK Success--------");
}
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.err.println("-------ACK Failed--------");
}
});
// 因為設定了監聽, 這裡就不關閉channel和connection了
}
}
Consumer程式碼
package com.qiyexue.confirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
/**
* Confirm訊息確認的生產者
*
* @author 七夜雪
* @create 2018-12-15 16:19
*/
public class ConsumerByConfirm {
public static void main(String[] args) throws Exception {
// 1. 建立連線工廠並設定屬性
ConnectionFactory factory = new ConnectionFactory();;
factory.setHost("192.168.72.138");
factory.setPort(5672);
factory.setVirtualHost("/");
// 2. 建立連線
Connection connection = factory.newConnection();
// 3. 建立channel
Channel channel = connection.createChannel();
// 4. 宣告Exchange
String exchangeName = "test_confirm_exchange";
String exchangeType = "topic";
String routingKey = "confirm.*";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, null);
// 5. 宣告訊息佇列
String queueName = "test_confirm_queue";
channel.queueDeclare(queueName, true, false, false, null);
// 6. 繫結佇列和Exchange
channel.queueBind(queueName, exchangeName, routingKey);
// 7. 建立一個消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 8. 設定消費者從哪個佇列開始消費, 設定自動ACK
channel.basicConsume(queueName, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println(msg);
}
}
}