RabbitMQ訊息可靠送達
阿新 • • 發佈:2019-01-24
目前專案中採用RabbitMQ,在執行的過程當中,發現有一部分訊息存在丟失的情況,結合自己對RabbitMQ的理解,一般分為兩種情況,一種是客戶端丟失,即訊息沒有成功送達到RabbitMQ Server,一種是消費端並有成功消費訊息,針對這兩種情況,我們分別做了相應的方案。
一、訊息沒有送達到RabbitMQ Server,導致丟失
由於看不了RabbitMQ Server的訊息日誌,我們自己做了一個訊息日誌表,操作步驟如下:
1、在業務操作成功之後,把訊息持久化到DB中,這兩步操作放在同一個事務當中,要麼都成功,要麼都失敗。
2、事務提交成功之後,單獨啟動一個執行緒進行訊息的傳送
3、啟用RabbitMQ的訊息確認(message acknowledgement)機制,我們使用的是Spring,首先就要修改Spring-rabbitmq.xml中的配置。配置如下:
<rabbit:connection-factory id="rabbitmqConnectionFactory" virtual-host="${rabbitmq.vhost}" channel-cache-size="25" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" publisher-confirms="true"/>
<rabbit:admin connection-factory="rabbitmqConnectionFactory"/>
<rabbit:template id="amqpTemplate" exchange="${exchangeName}" connection-factory="rabbitmqConnectionFactory" confirm-callback="confirmCallBackListener" return-callback="returnCallBackListener" mandatory="true" />
關鍵引數已用紅色字型標識。注意:mandatory必須設定為true,否則回撥不會生效。
confirmCallBackListener類的程式碼如下:
returnCallBackListener類的程式碼如下:@Service("confirmCallBackListener") public class ConfirmCallBackListener implements ConfirmCallback{ private static Logger logger = LoggerFactory.getLogger(ConfirmCallBackListener.class); @Autowired private IMessageLogService messageLogService; @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("confirm--:correlationData:"+correlationData+",ack:"+ack+",cause:"+cause); try { if (!ack) { logger.info("send message failed: " + cause + correlationData.toString()); } else {//傳送成功,更新訊息日誌表資料 logger.info("send message successful !"); MessageLog vo = new MessageLog(); vo.setToken(correlationData.getId()); vo.setSendStatus(ConstantUtils.SENT); messageLogService.saveOrUpdateMessageLog(vo); logger.info("update message send status ==="+ JSON.toJSONString(vo)); } }catch (Exception e){ logger.error("message confirm exception :"+e); } } }
@Service("returnCallBackListener")
public class ReturnCallBackListener implements ReturnCallback{
private static Logger logger = LoggerFactory.getLogger(ReturnCallBackListener.class);
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
try {
System.out.println("return--message:"+new String(message.getBody())+",replyCode:"+replyCode+",replyText:"+replyText+",exchange:"+exchange+",routingKey:"+routingKey);
//做相應的業務操作
}catch (Exception e){
logger.error("message return callback exception:"+e);
}
}
}
4、如果出現RabbitMQ Server連線不上,我們做了一個定時任務,掃描message_log表,如果存在訊息沒有確認,則會向RabbitMQ Server一直重發,直到傳送成功。
二、消費端消費訊息失敗或有其它異常
1、消費者在消費訊息成功時給到生產者回執,這時傳送端收到消費者回執之後,更改message-log表的傳送狀態。則整個過程結束。
2、採用RabbitMQ本身的機制進行訊息回執。配置如下:
<rabbit:listener-container connection-factory="rabbitmqConnectionFactory" acknowledge="manual" >
<rabbit:listener queues="your_queue_name" ref="receiveConfirmListener" />
</rabbit:listener-container>
receiveCallBackListener類的程式碼如下:
@Service("receiveConfirmListener")
public class ReceiveConfirmListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try {
System.out.println("consumer--:" + message.getMessageProperties() + ":" + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
e.printStackTrace();
//TODO 業務處理
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
}
這樣基本就保證了訊息的可靠送達
如果有別的同學有更好的方案,歡迎吐槽。
注:MessageLog物件是一張訊息日誌表,表結構大致如下: