Spring-amqp 1.6.1 生產者與消費者訊息確認配置與使用
通過Publisher Confirms and Returns機制,生產者可以判斷訊息是否傳送到了exchange及queue,而通過消費者確認機制,Rabbitmq可以決定是否重發訊息給消費者,以保證訊息被處理。
1.什麼是Publisher Confirms and Returns?
Delivery processing acknowledgements from consumers to RabbitMQ are known as acknowledgements in AMQP 0-9-1 parlance; broker acknowledgements to publishers are a protocol extension called publisher confirms.
地址:http://www.rabbitmq.com/confirms.html
根據RabbitMq官網定義,rabbitmq代理(broker)對釋出者(publishers)的確認被稱作釋出者確認(publisher confirms),這種機制是Rabbitmq對標準Amqp協議的擴充套件。因此通過這種機制可以確認訊息是否傳送給了目標。
2.如何通過Spring amqp來使用Publisher Confirms and Returns機制?
Confirmed and returned messages are supported by setting the CachingConnectionFactory’s publisherConfirms and publisherReturns properties to ‘true’ respectively.When these options are set, Channel s created by the factory are wrapped in an PublisherCallbackChannel, which is used to facilitate the callbacks. When such a channel is obtained, the client can register a PublisherCallbackChannel.Listener with the Channel. The PublisherCallbackChannel implementation contains logic to route a confirm/return to the appropriate listener. These features are explained further in the following sections.
http://docs.spring.io/spring-amqp/docs/1.6.3.RELEASE/reference/html/_reference.html#cf-pub-conf-ret
通過Spring amqp文件可以看到,要使用這種機制需要將Template模版的設publisherConfirms 或publisherReturns 屬性設定為true,此外ConnectionFactory要配置為CachingConnectionFactory。
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory" >
<property name="host" value="192.168.2.133" />
<property name="port" value="5672" />
<property name="username" value="sun" />
<property name="password" value="123456" />
<property name="publisherConfirms" value="true" />
<property name="publisherReturns" value="true" />
</bean>
2.1 ConfirmCallback的使用及觸發的一種場景
RabbitTemplate template = (RabbitTemplate) ctx.getBean("amqpTemplate");
int i = 0;
template.setMandatory(true);
if(!template.isConfirmListener()){
template.setConfirmCallback(new ConfirmCallback() {
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("ack: " + ack + ". correlationData: " + correlationData + "cause : " + cause);
}
});
}
isConfirmListener由RabbitTemplate 提供,用於判斷是否建立了這個物件。
@Override
public boolean isConfirmListener() {
return this.confirmCallback != null;
}
而ConfirmListener是當訊息無法傳送到Exchange被觸發,此時Ack為False,這時cause包含傳送失敗的原因,例如exchange不存在時,cause的內容如下。
ack: false. correlationData: nullcause : channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'myExchange' in vhost '/', class-id=60, method-id=40)
而訊息可以傳送到Exchange時Ack為true。在傳送時可以通過使用RabbitTemplate 下面的方法來進行驗證。
convertAndSend(exchange, routingKey, object, correlationData);
correlationData是回撥時傳入回撥方法的引數,因此通過這個屬性來區分訊息,並進行重發。
2.2 ReturnCallback的使用及觸發的一種場景
ReturnCallBack使用時需要通過RabbitTemplate 的setMandatory方法設定變數mandatoryExpression的值,該值可以是一個表示式或一個Boolean值。當為TRUE時,如果訊息無法傳送到指定的訊息佇列那麼ReturnCallBack回撥方法會被呼叫。
與isConfirmListener類似,也有一個isReturnListener方法,但這個方法在1.6.1版本中返回true。
@Override
public boolean isReturnListener() {
return true;
}
設定ReturnCallBack回撥。
template.setReturnCallback(new ReturnCallback() {
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("text: " + replyText + " code: " + replyCode + " exchange: " + exchange + " routingKey :" + routingKey);
}
});
傳送訊息驗證ReturnCallBack。
convertAndSend(String routingKey, final Object object, CorrelationData correlationData)
template.convertAndSend("not exist routing key abc", (Object)"bbb", new CorrelationData("123"));
在測試程式碼中指定了一個routingKey,但通過這個routingKey,Exchange無法將訊息路由到任何佇列,因此導致ReturnCallBack被觸發,最後返回的資訊如下。
text: NO_ROUTE code: 312 exchange: myExchange routingKey :not exist routing key abc
此外在vhost中若找不到Exchange時,confirmCallBack會被觸發,而returnCallBack不會被觸發,原因參見下面的回答。
3.消費者對訊息的確認
Rabbitmq 官網教程第2課講述瞭如何通過Rabbitmq 提供的Api來實現消費者確認訊息已經處理。
而在Spring-rabbitmq中,預設啟用的是自動確認機制,當消費端在回撥方法中接收到訊息後,併成功處理,且未丟擲任何異常,那麼訊息會被確認。如果執行過程中產生異常,那麼Rabbitmq會嘗試重複傳送訊息給消費者。這點可以檢視原始碼,並通過在訊息處理方法中丟擲異常來驗證。當產生異常後,通過ip:15672登入Rabbitmq管理的Web頁面,選擇佇列選項可以看到未處理的訊息的數量(Unacked)。這個數量是與
<rabbit:listener-container/>
配置中的prefetchCount(預取數相關)。
因此消費者對訊息的確認最基礎的使用不需要額外進行配置。
若要在消費方配置相關引數,可以參考官方文件3.1.5小節。
4.配置多個消費者並關聯到同一個佇列
如下,若配置多個消費者關聯到一個佇列,那麼當傳送多條訊息時(訊息數量大於消費者數量時),那麼佇列中的訊息會輪流分發給各個消費者。
consumer,consumer2,consumer,consumer2,….的順序。
<rabbit:listener-container
connection-factory="connectionFactory" prefetch="1" receive-timeout="4000">
<rabbit:listener ref="consumer" method="listen" queue-names="myQueue" />
<rabbit:listener ref="consumer2" method="listen" queue-names="myQueue" />
</rabbit:listener-container>
如果其中一個消費者接收訊息後,併產生異常,那麼該訊息會發送給下一個消費者。例如consumer,在listen丟擲異常,那麼這條訊息會發送給consumer2。
5.示例中的配置與程式碼
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<property name="host" value="192.168.2.133" />
<property name="port" value="5672" />
<property name="username" value="sun" />
<property name="password" value="123456" />
<property name="publisherConfirms" value="true" />
<property name="publisherReturns" value="true" />
</bean>
<rabbit:admin connection-factory="connectionFactory" />
<rabbit:queue name="myQueue" id="myQueue" durable="true"
auto-delete="false" exclusive="false" />
<rabbit:queue name="myQueue" id="myQueue2" durable="true"
auto-delete="false" exclusive="false" />
<rabbit:direct-exchange name="myExchange">
<rabbit:bindings>
<rabbit:binding queue="myQueue" key="sun1" />
<rabbit:binding queue="myQueue" key="sun2" />
</rabbit:bindings>
</rabbit:direct-exchange>
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
exchange="myExchange" routing-key="sunv5" />
<rabbit:listener-container
connection-factory="connectionFactory">
<rabbit:listener ref="consumer" method="listen" queue-names="myQueue" />
</rabbit:listener-container>
<bean id="consumer" class="springamqp.Foo" />
ClassPathXmlApplicationContext ctx =
new ClassPathXmlApplicationContext("classpath:rabbit-context.xml");
RabbitTemplate template = (RabbitTemplate) ctx.getBean("amqpTemplate");
template.setMandatory(true);
if(!template.isConfirmListener()){
template.setConfirmCallback(new ConfirmCallback() {
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("ack: " + ack + ". correlationData: " + correlationData + "cause : " + cause);
}
});
}
template.setReturnCallback(new ReturnCallback() {
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("text: " + replyText + " code: " + replyCode + " exchange: " + exchange + " routingKey :" + routingKey);
}
});
int i = 0;
while(i < 50) {
template.convertAndSend("not exist routing key abc", (Object)"bbb", new CorrelationData("123"));
Thread.sleep(7000);
}
public class Foo {
public void listen(String foo) throws InterruptedException {
System.out.println("get msg");
System.out.println(foo);
Thread.sleep(2000);
// 丟擲異常,觀察訊息重發
throw new NullPointerException();
}
}