rabbitmq訊息零丟失方案,生產者confirm機制,rabbitmq持久化設定,消費者手動ack
阿新 • • 發佈:2020-09-23
如何保證rabbitmq訊息零丟失?
我們從三個角色開始分析
1.生產者傳送訊息不丟失
生產者發訊息到rabbitmq的網路傳輸過程中丟失了
以及訊息傳送到了rabbitmq但是mq內部出錯,沒有儲存
上面的問題有兩種方案
第一種:rabbitmq支援事務訊息,通過開啟事務->傳送訊息->異常捕獲並回滾->傳送成功提交事務的方式保證訊息傳送mq成功, 但是有個弊端,這種方式是同步的,會導致訊息的吞吐量下降,一般不使用這種方式
第二種:rabbitmq的channel開啟confirm,其實就是回撥機制,傳送完訊息後不用管,讓rabbitmq通知你訊息是傳送成功還是失敗,這種方式是非同步的,對訊息的吞吐量沒什麼影響,主要使用這種方法.
2.rabbitmq訊息儲存失敗
rabbitmq接收到訊息之後暫存在記憶體之中,如果在消費者還沒有消費的時候,訊息還在記憶體中,rabbitmq宕機了,那麼記憶體中的訊息就會丟失
一種方案:
rabbitmq對queue設定持久化,就是寫一條訊息就直接儲存到磁碟上
3.消費者消費訊息失敗
消費者預設是autoAck,就是收到訊息就自動提交ack,這就導致訊息還沒處理完,消費者宕機了,那麼正在處理的訊息就丟失了,恢復了之後,消費者會拉取新的訊息
一種方案:
消費者關閉自動提交,改為手動ack,等訊息全部處理完畢再提交ack,通知rabbitmq訊息處理完畢,再發新的訊息過來;
下面是整合spring的rabbitmq生產者程式碼實現:由於rabbitmq的佇列持久化設定在管理平臺就可以操作,消費者設定手動提交也比較簡單,主要貼上生產者的程式碼實現
配置傳送模板
package cn.picclife.cust.rrd.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Scope; /** * rabbitmq 配置 * @ClassName: RabbitMqConfig * @Description: 初始化Rabbitmq * @author bin.zhao */ @Configuration public class RabbitMqConfig { @Value("${spring.rabbitmq.host}") private String host; @Value("${spring.rabbitmq.virtual-host}") private String vHost; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; @Value("${spring.rabbitmq.queue}") private String queue; @Value("${spring.rabbitmq.exchange}") private String exchange; @Value("${spring.rabbitmq.routing.key}") private String rountingKey; /** * 針對消費者配置 * 1. 設定交換機型別 * 2. 將佇列繫結到交換機 */ //建立佇列,如果已建立好,就不用寫 // @Bean // public Queue topicQueue(){ // return new Queue(this.queue,true);//建立佇列並持久化 // } // //建立交換機 // @Bean // public TopicExchange topicExchange(){ // return new TopicExchange(this.exchange,true,false); // } // // //建立繫結 // @Bean // public Binding topicBinding(){ // return BindingBuilder.bind(topicQueue()).to(topicExchange()).with(this.rountingKey); // } // @Bean(name = "MQConnectionFactory") public CachingConnectionFactory connectionFactory() throws Exception { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setUsername(this.username); connectionFactory.setPassword(this.password); connectionFactory.setAddresses(this.host); connectionFactory.setVirtualHost(this.vHost); //訊息是否投遞到exchange成功 connectionFactory.setPublisherConfirms(true); return connectionFactory; } @Scope//預設單例模式 @Bean(name = "groupRabbitTemplate") public RabbitTemplate rabbitTemplate( //直接使用註解,把連線工廠注入到模板中,防止找不到,導致發訊息到mq失敗 @Qualifier("MQConnectionFactory") CachingConnectionFactory connectionFactory) throws Exception { RabbitTemplate template = new RabbitTemplate(connectionFactory); return template; } }
傳送訊息
/**
* @description 傳送訊息 * @date 2020 */ @Component public class AppRabbitMQ implements RabbitTemplate.ReturnCallback,RabbitTemplate.ConfirmCallback {private static final String EXCHANGE = ResourceUtils.getResource("config").getValue("spring.rabbitmq.exchange"); private static final String ROUTING_KEY = ResourceUtils.getResource("config").getValue("spring.rabbitmq.routing.key"); @Autowired private RabbitTemplate groupRabbitTemplate;/** *傳送訊息*/ public void sendMessage(String message) throws Exception { //設定由於網路問題導致的連線Rabbitmq失敗的重試策略 RetryTemplate retryTemplate = new RetryTemplate(); retryTemplate.setRetryPolicy(new SimpleRetryPolicy(3)); //傳送之前可以先把訊息儲存到資料庫 groupRabbitTemplate.setEncoding("UTF-8");
groupRabblitTemplate.setMandatory(true); //true當訊息無法被正常送達的時候被返回給生產者,false丟棄 groupRabbitTemplate.setConfirmCallback(this);//ack回撥 groupRabbitTemplate.setReturnCallback(this);//回退回調 try {//訊息傳送帶上correlationData這個物件中儲存有訊息的唯一id,以便在資料庫中查詢訊息或者從快取中獲取訊息為了傳送失敗從新發送. groupRabbitTemplate.convertAndSend(EXCHANGE,ROUTING_KEY, JSONObject.toJSONString(message), correlationData); logger.info("訊息傳送,id:{}",correlationData.getId()); Thread.sleep(100);//不讓執行緒直接結束,等待回撥函式confirm,如果不等,會直接異常,因為rabbitmq找不到回撥方法 }catch (Exception e){ logger.error("傳送訊息失敗:{}",ExceptionUtils.getStackTrace(e));
//可以重試傳送訊息,我這裡直接儲存到資料庫,後續定時任務掃描表格進行補發
//記錄失敗訊息到失敗資料表,並且更新訊息表狀態為傳送失敗 }finally { message=null;//強引用設定為null,便於gc回收 } }
/** @Description: 用於定時任務的訊息傳送
* @param:
* @date: 2020
* @return: void
*/
public void sendMessageOfTimeTask(String message){
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(new SimpleRetryPolicy(3));
CorrelationData correlationData = new CorrelationData(message.getId());
groupRabbitTemplate.setEncoding("UTF-8");
//true當訊息無法被正常送達的時候被返回給生產者,false丟棄
groupRabbitTemplate.setMandatory(true);//設定手工ack確認,
groupRabbitTemplate.setConfirmCallback(this);//ack回撥
groupRabbitTemplate.setReturnCallback(this);//回退回調
groupRabbitTemplate.convertAndSend(EXCHANGE,ROUTING_KEY,JSONObject.toJSONString(message),correlationData);
try {
Thread.sleep(100);//執行緒休眠,為了不讓方法直接結束,回撥函式無法正常回調confirm方法
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
message=null;//強引用設定為null,便於gc回收
}
}
/** * 如果訊息沒有到exchange,則confirm回撥,ack=false * 如果訊息到達exchange,則confirm回撥,ack=true */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { logger.info("訊息回撥confirm函式:{},ack:{},cause:{}", JSONObject.toJSONString(correlationData),ack,cause); if (ack) { //消費成功更新資料庫記錄為已傳送狀態 } else { logger.info("推送訊息失敗,id:{},原因:{}",correlationData.getId(),cause); //記錄失敗訊息到失敗資料表,並且更新訊息表狀態為傳送失敗 } } /** * exchange到queue成功,則不回撥return * exchange到queue失敗,則回撥return(需設定mandatory=true,否則不回回調,訊息就丟了) */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { logger.info("return--message:" + new String(message.getBody()) + ",replyCode:" + replyCode + ",replyText:" + replyText + ",exchange:" + exchange + ",routingKey:" + routingKey); //記錄失敗的訊息id,更新資料庫失敗表 String messgage = new String(message.getBody()); } }