RabbitMQ 實戰(四)消費者 ack 以及 生產者 confirms
這篇文章主要講 RabbitMQ 中 消費者 ack 以及 生產者 confirms。
如上圖,生產者把訊息傳送到 RabbitMQ,然後 RabbitMQ 再把訊息投遞到消費者。
生產者和 RabbitMQ,以及 RabbitMQ 和消費者都是通過 TCP 連線,但是他們之間是通過通道(Channel)傳遞資料的。多個執行緒共享一個連線,但是每個執行緒擁有獨自的通道。
消費者 ack
問題:怎麼保證 RabbitMQ 投遞的訊息被成功投遞到了消費者?
RabbitMQ 投遞的訊息,剛投遞一半,產生了網路抖動,就有可能到不了消費者。
解決辦法:
RabbitMQ 對消費者說:“如果你成功接收到了訊息,給我說確認收到了,不然我就當你沒有收到,我還會重新投遞”
在 RabbitMQ 中,有兩種 acknowledgement 模式。
自動 acknowledgement 模式
這也稱作發後即忘模式。
在這種模式下,RabbitMQ 投遞了訊息,在投遞成功之前,如果消費者的 TCP 連線 或者 channel 關閉了,這條訊息就會丟失。
會有丟失訊息問題。
手動 acknowledgement 模式
在這種模式下,RabbitMQ 投遞了訊息,在投遞成功之前,如果消費者的 TCP 連線 或者 channel 關閉了,導致這條訊息沒有被 acked,RabbitMQ 會自動把當前訊息重新入隊,再次投遞。
會有重複投遞訊息的問題,所以消費者得準備好處理重複訊息的問題,就是所謂的:冪等性。
為了啟用 手動 ack 模式,消費者需要實現 ChannelAwareMessageListener
介面。
@Component
public class Consumer implements ChannelAwareMessageListener {
@Autowired
private MessageConverter messageConverter;
@Override
public void onMessage(Message message, Channel channel) throws Exception {
MessageProperties messageProperties = message.getMessageProperties();
// 代表投遞的識別符號,唯一標識了當前通道上的投遞,通過 deliveryTag ,消費者就可以告訴 RabbitMQ 確認收到了當前訊息,見下面的方法
long deliveryTag = messageProperties.getDeliveryTag();
// 如果是重複投遞的訊息,redelivered 為 true
Boolean redelivered = messageProperties.getRedelivered();
// 獲取生產者傳送的原始訊息
Object originalMessage = messageConverter.fromMessage(message);
Console.log("consume message = {} , deliveryTag = {} , redelivered = {}"
, originalMessage, deliveryTag, redelivered);
// 代表消費者確認收到當前訊息,第二個引數表示一次是否 ack 多條訊息
channel.basicAck(deliveryTag, false);
// 代表消費者拒絕一條或者多條訊息,第二個引數表示一次是否拒絕多條訊息,第三個引數表示是否把當前訊息重新入隊
// channel.basicNack(deliveryTag, false, false);
// 代表消費者拒絕當前訊息,第二個引數表示是否把當前訊息重新入隊
// channel.basicReject(deliveryTag,false);
}
}
channel.basicAck
代表消費者確認收到當前訊息,語義上表示消費者成功處理了當前訊息。
channel.basicNack
代表消費者拒絕一條或者多條訊息。basicNack 算是 basicReject 的一個擴充套件,因為 basicReject 不能一次拒絕多條訊息。
channel.basicReject
代表消費者拒絕這條訊息,語義上表示消費者沒有處理當前訊息。
對於 basicNack 和 basicReject ,如果引數
boolean requeue
傳入false
,訊息還是會從佇列裡面刪除。這三個方法只是語義上的不同。deliveryTag
deliveryTag 是 64 bit long 值,從 1 開始,不停的遞增 1。不同的 channel 有獨立的 deliveryTag。比如有兩個消費者,你會發現,都是從 1 開始遞增,互不影響。
由於上面建立的消費者,沒有指明監聽那個佇列,所以還需要建立一個 MessageListenerContainer
。
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, ChannelAwareMessageListener listener) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 指定消費者
container.setMessageListener(listener);
// 指定監聽的佇列
container.setQueueNames(QUEUE_NAME);
// 設定消費者的 ack 模式為手動確認模式
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setPrefetchCount(300);
return container;
}
這樣就開啟了消費者手動 ack 模式。
注意
如果開啟了消費者手動 ack 模式,但是又沒有呼叫手動確認方法(比如:channel.basicAck),那問題就大了,RabbitMQ 會在當前 channel 上一直阻塞,等待消費者 ack。
生產者 confirms
問題:怎麼保證生產者傳送的訊息被 RabbitMQ 成功接收?
生產者傳送的訊息,剛傳送一半,產生了網路抖動,就有可能到不了 RabbitMQ。
解決辦法:
生產者對 RabbitMQ 說:“如果你成功接收到了訊息,給我說確認收到了,不然我就當你沒有收到”
自定義訊息元資料
/**
* 自定義訊息元資料
*/
@NoArgsConstructor
@Data
public class RabbitMetaMessage implements Serializable{
/**
* 是否是 returnCallback
*/
private boolean returnCallback;
/**
* 承載原始訊息資料資料
*/
private Object payload;
public RabbitMetaMessage(Object payload) {
this.payload = payload;
}
}
- returnCallback 標記當前訊息是否觸發了 returnCallback(後面會解釋)
- payload 儲存原始訊息資料
生產者
先把訊息儲存到 redis,再發送到 rabbitmq
@RestController
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private DefaultKeyGenerator keyGenerator;
@GetMapping("/sendMessage")
public Object sendMessage() {
new Thread(() -> {
HashOperations hashOperations = redisTemplate.opsForHash();
for (int i = 0; i < 1; i++) {
String id = keyGenerator.generateKey() + "";
String value = "message " + i;
RabbitMetaMessage rabbitMetaMessage = new RabbitMetaMessage(value);
// 先把訊息儲存到 redis
hashOperations.put(RedisConfig.RETRY_KEY, id, rabbitMetaMessage);
Console.log("send message = {}", value);
// 再發送到 rabbitmq
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, value, (message) -> {
message.getMessageProperties().setMessageId(id);
return message;
}, new CorrelationData(id));
}
}).start();
return "ok";
}
}
配置 ConnectionFactory
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("192.168.238.132", 5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// 設定 生產者 confirms
connectionFactory.setPublisherConfirms(true);
// 設定 生產者 Returns
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
配置 RabbitTemplate
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 必須設定為 true,不然當 傳送到交換器成功,但是沒有匹配的佇列,不會觸發 ReturnCallback 回撥
// 而且 ReturnCallback 比 ConfirmCallback 先回調,意思就是 ReturnCallback 執行完了才會執行 ConfirmCallback
rabbitTemplate.setMandatory(true);
// 設定 ConfirmCallback 回撥
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
Console.log("ConfirmCallback , correlationData = {} , ack = {} , cause = {} ", correlationData, ack, cause);
// 如果傳送到交換器都沒有成功(比如說刪除了交換器),ack 返回值為 false
// 如果傳送到交換器成功,但是沒有匹配的佇列(比如說取消了繫結),ack 返回值為還是 true (這是一個坑,需要注意)
if (ack) {
String messageId = correlationData.getId();
RabbitMetaMessage rabbitMetaMessage = (RabbitMetaMessage) redisTemplate.opsForHash().get(RedisConfig.RETRY_KEY, messageId);
Console.log("rabbitMetaMessage = {}", rabbitMetaMessage);
if (!rabbitMetaMessage.isReturnCallback()) {
// 到這一步才能完全保證訊息成功傳送到了 rabbitmq
// 刪除 redis 裡面的訊息
redisTemplate.opsForHash().delete(RedisConfig.RETRY_KEY, messageId);
}
}
});
// 設定 ReturnCallback 回撥
// 如果傳送到交換器成功,但是沒有匹配的佇列,就會觸發這個回撥
rabbitTemplate.setReturnCallback((message, replyCode, replyText,
exchange, routingKey) -> {
Console.log("ReturnCallback unroutable messages, message = {} , replyCode = {} , replyText = {} , exchange = {} , routingKey = {} ", message, replyCode, replyText, exchange, routingKey);
// 從 redis 取出訊息,設定 returnCallback 設定為 true
String messageId = message.getMessageProperties().getMessageId();
RabbitMetaMessage rabbitMetaMessage = (RabbitMetaMessage) redisTemplate.opsForHash().get(RedisConfig.RETRY_KEY, messageId);
rabbitMetaMessage.setReturnCallback(true);
redisTemplate.opsForHash().put(RedisConfig.RETRY_KEY, messageId, rabbitMetaMessage);
});
return rabbitTemplate;
}
ReturnCallback 回撥
必須 rabbitTemplate.setMandatory(true)
,不然當 傳送到交換器成功,但是沒有匹配的佇列,不會觸發 ReturnCallback 回撥。而且 ReturnCallback 比 ConfirmCallback 先回調。
如何模擬 傳送到交換器成功,但是沒有匹配的佇列,先把專案啟動,然後再把佇列解綁,再發送訊息,就會觸發 ReturnCallback 回撥,而且發現訊息也丟失了,沒有到任何佇列。
這樣就解綁了。
執行專案,然後開啟瀏覽器,輸入 http://localhost:9999/sendMessage
控制檯打出如下日誌
這樣就觸發了 ReturnCallback 回撥 ,從 redis 取出訊息,設定 returnCallback 設定為 true。你會發現 ConfirmCallback 的 ack 返回值還是 true。
ConfirmCallback 回撥
這裡有個需要注意的地方,如果傳送到交換器成功,但是沒有匹配的佇列(比如說取消了繫結),ack 返回值為還是 true (這是一個坑,需要注意,就像上面那種情況!!!)。所以不能單靠這個來判斷訊息真的傳送成功了。這個時候會先觸發 ReturnCallback 回撥,我們把 returnCallback 設定為 true,所以還得判斷 returnCallback 是否為 true,如果為 ture,表示訊息傳送不成功,false 才能完全保證訊息成功傳送到了 rabbitmq。
如何模擬 ack 返回值為 false,先把專案啟動,然後再把交換器刪除,就會發現 ConfirmCallback 的 ack 為 false。
執行專案,然後開啟瀏覽器,輸入 http://localhost:9999/sendMessage
控制檯打出如下日誌
你會發現 ConfirmCallback 的 ack 返回值才是 false。
注意
不能單單依靠 ConfirmCallback 的 ack 返回值為 true,就斷定當前訊息傳送成功了。
原始碼地址
參考資料
結語
由於本人知識和能力有限,文中如有沒說清楚的地方,希望大家能在評論區指出,以幫助我將博文寫得更好。