rabbitmq的發布確認和事務
阿新 • • 發佈:2017-09-21
min stp tex onf prop actions before interval 傳輸 摘要: 介紹confirm的工作機制。使用spring-amqp介紹事務以及發布確認的使用方式。因為事務以及發布確認是針對channel來講,所以在一個連接中兩個channel,一個channel可以使用事務,另一個channel可以使用發布確認,並介紹了什麽時候該使用事務,什麽時候該使用發布確認
confirm的工作機制
? Confirms是增加的一個確認機制的類,繼承自標準的AMQP。這個類只包含了兩個方法:confirm.select和confirm.select-ok。另外,basic.ack方法被發送到客戶端。
? confirm.select是在一個channel中啟動發布確認。註意:一個具有事務的channel不能放入到確認模式,同樣確認模式下的channel不能用事務。
當confirm.select被發送/接收。發布者/broker開始計數(首先是發布然後confirm.select被記為1)。一旦channel為確認模式,發布者應該期望接收到basic.ack方法,delivery-tag屬性顯示確認消息的數量。
當broker確認了一個消息,會通知發布者消息被成功處理;?
?
? basic的規則是這樣的:?
一個未被路由的具有manadatory或者immediate的消息被正確確認後觸發basic.return;
另外,一個瞬時態的消息被確認目前已經入隊;
持久化的消息在持久化到磁盤或者每個隊列的消息被消費之後被確認。
關於confirm會有一些問題:
首先,broker不能保證消息會被confirm,只知道將會進行confirm。
第二,當未被確認的消息堆積時消息處理緩慢,對於確認模式下的發布,broker會做幾個操作,日誌記錄未被確認的消息
第三,如果發布者與broker之間的連接刪除了未能得到確認,它不一定知道消息丟失,所以可能會發布重復的消息。
最後,如果在broker中發生壞事會導致消息丟失,將會basic.nack那些消息
總之,Confirms給客戶端一種輕量級的方式,能夠跟蹤哪些消息被broker處理,哪些可能因為broker宕掉或者網絡失敗的情況而重新發布。
確認並且保證消息被送達,提供了兩種方式:發布確認和事務。(兩者不可同時使用)在channel為事務時,不可引入確認模式;同樣channel為確認模式下,不可使用事務。
事務
Spring AMQP做的不僅僅是回滾事務,而且可以手動拒絕消息,如當監聽容器發生異常時是否重新入隊。
持久化的消息是應該在broker重啟前都有效。如果在消息有機會寫入到磁盤之前broker宕掉,消息仍然會丟失。在某些情況下,這是不夠的,發布者需要知道消息是否處理正確。簡單的解決方案是使用事務,即提交每條消息。
案例:
RabbitTemplate的使用案例(同步),由調用者提供外部事務,在模板中配置了channe-transacted=true。通常是首選,因為它是非侵入性的(低耦合)
<rabbit:template id="rabbitTemplate" connection-factory="cachingConnectionFactory"
exchange="sslexchange" channel-transacted="true"/>
@Transactional
public void doSomething() {
ApplicationContext context =
new GenericXmlApplicationContext("spring-amqp-test.xml");
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
String incoming = (String) rabbitTemplate.receiveAndConvert();
// do some more database processing...
String outgoing = processInDatabaseAndExtractReply(incoming);
//數據庫操作中如果失敗了,outgoing這條消息不會被發送,incoming消息也會返回到broker服務器中,因為這是一條事務鏈。
//可做XA事務,在消息傳送與數據庫訪問中共享事務。
rabbitTemplate.convertAndSend(outgoing);
}
private String processInDatabaseAndExtractReply(String incoming){
return incoming;
}
異步使用案例(外部事務)
<bean id="rabbitTxManage" class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
<property name="connectionFactory" ref="cachingConnectionFactory"></property>
</bean>
<rabbit:listener-container connection-factory="cachingConnectionFactory" transaction-manager="rabbitTxManage" channel-transacted="true">
<rabbit:listener ref="foo" method="onMessage" queue-names="rabbit-ssl-test"/>
</rabbit:listener-container>
在容器中配置事務時,如果提供了transactionManager,channelTransaction必須為true;如果為false,外部的事務仍然可以提供給監聽容器,造成的影響是在回滾的業務操作中也會提交消息傳輸的操作。
使用事務有兩個問題:
? 一是會阻塞,發布者必須等待broker處理每個消息。如果發布者知道在broker死掉之前哪些消息沒有被處理就足夠了。
? 第二個問題是事務是重量級的,每次提交都需要fsync(),需要耗費大量的時間。
confirm模式下,broker將會確認消息並處理。這種模式下是異步的,生產者可以流水式的發布而不用等待broker,broker可以批量的往磁盤寫入。
發布確認
發布確認必須配置在CachingConnectionFactory上
<bean id="cachingConnectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<property name="host" value="192.168.111.128"></property>
<property name="port" value="5672"></property>
<property name="username" value="admin"/>
<property name="password" value="admin"/>
<property name="publisherConfirms" value="true"/>
<property name="publisherReturns" value="true"/>
</bean>
若使用confirm-callback或return-callback,必須要配置publisherConfirms或publisherReturns為true
每個rabbitTemplate只能有一個confirm-callback和return-callback
//確認消息是否到達broker服務器,也就是只確認是否正確到達exchange中即可,只要正確的到達exchange中,broker即可確認該消息返回給客戶端ack。
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息確認成功");
} else {
//處理丟失的消息(nack)
System.out.println("消息確認失敗");
}
}
});
使用return-callback時必須設置mandatory為true,或者在配置中設置mandatory-expression的值為true,可針對每次請求的消息去確定’mandatory’的boolean值,只能在提供’return -callback’時使用,與mandatory互斥。
rabbitTemplate.setMandatory(true);
//確認消息是否到達broker服務器,也就是只確認是否正確到達exchange中即可,只要正確的到達exchange中,broker即可確認該消息返回給客戶端ack。
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText,
String exchange, String routingKey) {
//重新發布
RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(errorTemplate,"errorExchange", "errorRoutingKey");
Throwable cause = new Exception(new Exception("route_fail_and_republish"));
recoverer.recover(message,cause);
System.out.println("Returned Message:"+replyText);
}
});
errorTemplate配置:
<rabbit:queue id="errorQueue" name="errorQueue" auto-delete="false" durable="true">
<rabbit:queue-arguments>
<entry key="x-ha-policy" value="all"/>
<entry key="ha-params" value="1"/>
<entry key="ha-sync-mode" value="automatic"/>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:direct-exchange id="errorExchange" name="errorExchange" auto-delete="false" durable="true">
<rabbit:bindings>
<rabbit:binding queue="errorQueue" key="errorRoutingKey"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="200" />
<property name="maxInterval" value="30000" />
</bean>
</property>
<property name="retryPolicy">
<bean class="org.springframework.retry.policy.SimpleRetryPolicy">
<property name="maxAttempts" value="5"/>
</bean>
</property>
</bean>
<rabbit:template id="errorTemplate" connection-factory="cachingConnectionFactory" exchange="errorExchange" queue="errorQueue" routing-key="errorRoutingKey" retry-template="retryTemplate" />
同一個連接不同channel使用事務和發布確認
private RabbitTemplate rabbitTemplate;
private TransactionTemplate transactionTemplate;
@Before
public void init() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("192.168.111.128");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
template = new RabbitTemplate(connectionFactory);
template.setChannelTransacted(true);
RabbitTransactionManager transactionManager = new RabbitTransactionManager(connectionFactory);
transactionTemplate = new TransactionTemplate(transactionManager);
connectionFactory.setPublisherConfirms(true);
rabbitTemplate = new RabbitTemplate(connectionFactory);
}
//發布確認測試
@Test
public void testPublishConfirm(){
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack){
System.out.println("消息確認成功");
}else{
System.out.println("消息確認失敗");
}
}
});
//發送到一個不存在的exchange,則會觸發發布確認
rabbitTemplate.convertAndSend("asd","aaa","message");
String message = (String) rabbitTemplate.receiveAndConvert(ROUTE);
assertEquals("message",message);
}
//事務測試
@Test
public void testSendAndReceiveInTransaction() throws Exception {
//由於有spring的事務參與,而發送操作在提交事務時,是不允許除template的事務有其他事務的參與,所以這裏不會提交
//隊列中就沒有消息,所以在channel.basicGet時命令返回的是basic.get-empty(隊列中沒有消息時),而有消息時,返回basic.get-ok
String result = transactionTemplate.execute(new TransactionCallback<String>() {
@Override
public String doInTransaction(TransactionStatus status) {
template.convertAndSend(ROUTE, "message");
return (String) template.receiveAndConvert(ROUTE);
}
});
//spring事務完成,對其中的操作需要提交,發送與接收操作被認為是一個事務鏈而提交
assertEquals(null, result);
//這裏的執行不受spring事務的影響
result = (String) template.receiveAndConvert(ROUTE);
assertEquals("message", result);
}
rabbitmq的發布確認和事務