1. 程式人生 > >RabbitMQ訊息可靠送達

RabbitMQ訊息可靠送達

目前專案中採用RabbitMQ,在執行的過程當中,發現有一部分訊息存在丟失的情況,結合自己對RabbitMQ的理解,一般分為兩種情況,一種是客戶端丟失,即訊息沒有成功送達到RabbitMQ Server,一種是消費端並有成功消費訊息,針對這兩種情況,我們分別做了相應的方案。

一、訊息沒有送達到RabbitMQ Server,導致丟失

由於看不了RabbitMQ Server的訊息日誌,我們自己做了一個訊息日誌表,操作步驟如下:

1、在業務操作成功之後,把訊息持久化到DB中,這兩步操作放在同一個事務當中,要麼都成功,要麼都失敗。

2、事務提交成功之後,單獨啟動一個執行緒進行訊息的傳送

3、啟用RabbitMQ的訊息確認(message acknowledgement)機制,我們使用的是Spring,首先就要修改Spring-rabbitmq.xml中的配置。配置如下:

<rabbit:connection-factory  id="rabbitmqConnectionFactory"  virtual-host="${rabbitmq.vhost}" channel-cache-size="25"  host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}"  password="${rabbitmq.password}" publisher-confirms="true"/>
<rabbit:admin connection-factory="rabbitmqConnectionFactory"/>
<rabbit:template id="amqpTemplate"  exchange="${exchangeName}" connection-factory="rabbitmqConnectionFactory" confirm-callback="confirmCallBackListener" return-callback="returnCallBackListener" mandatory="true"
/>

關鍵引數已用紅色字型標識。注意:mandatory必須設定為true,否則回撥不會生效。

confirmCallBackListener類的程式碼如下:

@Service("confirmCallBackListener")
public class ConfirmCallBackListener implements ConfirmCallback{
    private static Logger logger = LoggerFactory.getLogger(ConfirmCallBackListener.class);
    @Autowired
    private IMessageLogService messageLogService;
       
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("confirm--:correlationData:"+correlationData+",ack:"+ack+",cause:"+cause);
        try {
            if (!ack) {
                logger.info("send message failed: " + cause + correlationData.toString());                
            } else {//傳送成功,更新訊息日誌表資料
                logger.info("send message successful !");
                MessageLog vo =  new MessageLog();
                vo.setToken(correlationData.getId());
                vo.setSendStatus(ConstantUtils.SENT);
                messageLogService.saveOrUpdateMessageLog(vo);
                logger.info("update message send status ==="+ JSON.toJSONString(vo));
            }

        }catch (Exception e){
            logger.error("message confirm exception :"+e);
        }
    }
} 
returnCallBackListener類的程式碼如下:
@Service("returnCallBackListener")
public class ReturnCallBackListener implements ReturnCallback{
    private static Logger logger = LoggerFactory.getLogger(ReturnCallBackListener.class);
 
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
       try {
           System.out.println("return--message:"+new String(message.getBody())+",replyCode:"+replyCode+",replyText:"+replyText+",exchange:"+exchange+",routingKey:"+routingKey);
           //做相應的業務操作
       }catch (Exception e){
           logger.error("message return callback exception:"+e);

       }
    }
} 
4、如果出現RabbitMQ Server連線不上,我們做了一個定時任務,掃描message_log表,如果存在訊息沒有確認,則會向RabbitMQ Server一直重發,直到傳送成功。

二、消費端消費訊息失敗或有其它異常

1、消費者在消費訊息成功時給到生產者回執,這時傳送端收到消費者回執之後,更改message-log表的傳送狀態。則整個過程結束。

2、採用RabbitMQ本身的機制進行訊息回執。配置如下:

<rabbit:listener-container connection-factory="rabbitmqConnectionFactory" acknowledge="manual" >
        <rabbit:listener queues="your_queue_name" ref="receiveConfirmListener" />
    </rabbit:listener-container>

receiveCallBackListener類的程式碼如下:

@Service("receiveConfirmListener")
public class ReceiveConfirmListener implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        try {
            System.out.println("consumer--:" + message.getMessageProperties() + ":" + new String(message.getBody()));
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            e.printStackTrace();
            //TODO 業務處理
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        }
    }
}
這樣基本就保證了訊息的可靠送達

如果有別的同學有更好的方案,歡迎吐槽。


注:MessageLog物件是一張訊息日誌表,表結構大致如下: