Spring-amqp 1.6.1消費者手工對訊息進行確認
前言
在使用Spring amqp建立消費者並接收訊息時,通常會用到下面兩個介面。
public interface MessageListener {
void onMessage(Message message);
}
public interface ChannelAwareMessageListener {
void onMessage(Message message, Channel channel) throws Exception;
}
我們會實現介面,並通過onMessage方法來接收訊息。在接收訊息後,處理業務時如果出現異常,那麼消費者會不斷接收到重發的訊息。有時候在出現某些異常,無法處理,因此並不希望繼續接收到重發,因此需要用到手工確認模式,來按需進行重發。
1.預設情況下為什麼會自動重發?
在配置消費端時,通常使用下面的配置。而對於rabbit:listener-container標籤並未指定“確認屬性” acknowledge。預設情況下該屬性為auto。
<rabbit:listener-container
connection-factory="connectionFactory" >
<rabbit:listener ref="consumer" method="listen" queue-names="myQueue" />
</rabbit:listener-container >
當onMessage方法產生異常後,框架會呼叫下面的方法處理異常。
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.rollbackOnExceptionIfNecessary(Throwable)
public void rollbackOnExceptionIfNecessary(Throwable ex) throws Exception {
boolean ackRequired = !this.acknowledgeMode.isAutoAck() && !this .acknowledgeMode.isManual();
try {
if (this.transactional) {
if (logger.isDebugEnabled()) {
logger.debug("Initiating transaction rollback on application exception: " + ex);
}
RabbitUtils.rollbackIfNecessary(this.channel);
}
if (ackRequired) {
// We should always requeue if the container was stopping
boolean shouldRequeue = this.defaultRequeuRejected ||
ex instanceof MessageRejectedWhileStoppingException;
Throwable t = ex;
while (shouldRequeue && t != null) {
if (t instanceof AmqpRejectAndDontRequeueException) {
shouldRequeue = false;
}
t = t.getCause();
}
if (logger.isDebugEnabled()) {
logger.debug("Rejecting messages (requeue=" + shouldRequeue + ")");
}
for (Long deliveryTag : this.deliveryTags) {
// With newer RabbitMQ brokers could use basicNack here...
this.channel.basicReject(deliveryTag, shouldRequeue);
}
if (this.transactional) {
// Need to commit the reject (=nack)
RabbitUtils.commitIfNecessary(this.channel);
}
}
}
catch (Exception e) {
logger.error("Application exception overridden by rollback exception", ex);
throw e;
}
finally {
this.deliveryTags.clear();
}
}
isAutoAck方法
public boolean isAutoAck() {
return this == NONE;
}
可以看到,如果acknowledge即非manul,也非none時(呼叫處方法名字是isAutoAck,但內部確實判斷是否為NONE),那麼會呼叫this.channel.basicReject。因此傳送否定確認,最終不斷收到重複傳送的訊息。
關於否認可以參考下面的連結。
2.對訊息進行手工確認
為了在Spring-amqp框架中進行手工確認,在接收訊息時需要實現如下的介面。
public interface ChannelAwareMessageListener {
void onMessage(Message message, Channel channel) throws Exception;
}
此外消費者的確認模式需要配置為manual,其中確認模式包括NONE,MANUL,與AUTO三種[1]。
<rabbit:listener-container
connection-factory="connectionFactory" acknowledge="manual">
那麼當收到訊息後,如果要否認,或確認則通過呼叫channel物件的下面的兩個方法即可。其中basicAck進行確認,而basicNack進行否認。
long deliveryTag = message.getMessageProperties().getDeliveryTag();
throw new IllegalArgumentException("Illegal");
channel.basicAck(deliveryTag, false);
channel.basicNack(deliveryTag, false, true);
上面程式碼中deliveryTag即訊息消交付的一個標識,其作用域為channel。而basicNAck與basicReject都可以進行否則,二者區別參考下面的官網解釋。
當在onMessage方法中呼叫basicAck確認訊息後,佇列中持久化的訊息會被刪除。而呼叫basicNack後,會收到rabbitmq重發的訊息。若未呼叫basicAck確認則訊息會產生堆積[2]。那麼當消費者下再一次連線rabbitmq時訊息會重發給消費者。