RabbitMQ 高階特性——訊息的可靠性投遞
阿新 • • 發佈:2021-01-27
在使用rabbitmq的時候,作為訊息傳送方希望杜絕任何訊息丟失過著投遞失敗的場景,rabbitMq為我們提供了兩種方式用來控制訊息的投遞可靠性模式:
- confirm 確認模式
- return 回退模式
rabbitmq整個訊息投遞的路徑為:生產者(producer)->rabbitmq broker->exchange->queue->consumer
- 訊息從producer到exchange則會返回一個confirmCallBack
- 訊息從exchange到queue投遞失敗則會返回一個returnCallBack
我們利用spring整合rabbitmq的方式來簡單的實踐下(spring如何整合rabbitmq,請點選這裡 RabbitMQ 入門篇之——Spring整合rabbitmq):
我們先來定義佇列和互動機:
<!--訊息可靠性投遞(生產端)--> <rabbit:queue id="spring_confirm_queue" name="spring_confirm_queue" auto-delete="true"/> <rabbit:direct-exchange name="spring_confirm_exchange" id="spring_confirm_exchange" > <rabbit:bindings> <rabbit:binding queue="spring_confirm_queue" key="confirm"></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange>
生產者程式碼:
1.confirm
package com.cjian.rabbitmq.spring_rabbit; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; /** * @description: confirm 確認模式 * @author: CJ * @time: 2021/1/22 16:36 */ @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml") public class ConfirmReturnProducer { @Autowired private RabbitTemplate rabbitTemplate; /* * 訊息傳送給交換機後執行ConfirmCallBack * 確認步驟: * 1.開啟確認模式:connectionFactory 開啟 publisher-confirms="true" * 2.在rabbitTemplate 中定義ConfirmCallBack回撥函式 * * */ @Test public void testConfirm(){ rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * * @param correlationData 相關配置資訊 * @param ack 交換機是否成功的收到了訊息 true成功 false失敗 * @param cause 失敗的原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("執行confirm,ack:"+ack); if(!ack){ System.out.println("成功的接到了訊息"+cause); }else{ System.out.println("接收訊息失敗"+cause); } } }); //傳送一條訊息 rabbitTemplate.convertAndSend("spring_confirm_exchange", "confirm", "hello rabbimq confirm"); }
正常傳送訊息:
模擬下發送異常:
2.return
package com.cjian.rabbitmq.spring_rabbit;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* @description: return模式
* @author: cWX969834
* @time: 2021/1/22 16:36
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ConfirmReturnProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
/*
回退模式:當訊息傳送給交換機後,交換機路由到佇列失敗時,才會執行ReturnCallBack
步驟:1.開啟回退模式 publisher-returns="true"
2.設定ReturnCallBack
3.設定交換機處理訊息的模式:
①如果訊息沒有路由到佇列,則丟棄訊息,也是預設的
②如果訊息沒有路由到佇列,返回訊息給傳送方
*/
@Test
public void testReturn(){
//設定交換機處理失敗訊息的模式
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
*
* @param message 訊息物件
* @param replyCode 失敗的錯誤碼
* @param replyText 錯誤資訊
* @param exchange 交換機
* @param routingKey 路由鍵
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("return 執行了");
System.out.println("message:"+message);
System.out.println("replyCode:"+replyCode);
System.out.println("replyText:"+replyText);
System.out.println("exchange:"+exchange);
System.out.println("routingKey:"+routingKey);
}
});
rabbitTemplate.convertAndSend("spring_confirm_exchange", "confirm11", "hello rabbimq return");
}
}
我們直接模擬傳送給佇列失敗的情形:
訊息的可靠性投遞總結:
- 設定connection-factory的publisher-confirms="true",開啟確認模式;
- 使用rabbitTemplate.setConfirmCallback設定回撥函式,當訊息傳送到交換機後回撥confirm方法。在方法中判斷ack,true:傳送成功,false:傳送失敗;
- 設定connection-factory的publisher-return="true",開啟回退模式;
- 使用rabbitTemplate.setReturnCallback設定回退函式,當訊息從交換機路由到佇列失敗後,如果設定了
rabbitTemplate.setMandatory(true)引數,則會將訊息退回給生產者,並執行回撥函式returnedMessage.
- 在rabbitmq中也提供了事務機制,但是效能較差,此處就不說了。
使用channel.txSelect()用於將當前channel設定成事務模式;
使用channel.txCommit()用於提交事務;
使用channel.txRollBack()用於回滾事務。