RabbitMq + Spring 實現ACK機制
概念性解讀(Ack的靈活)
首先啊,有的人不是太理解這個Ack是什麼,講的接地氣一點,其實就是一個通知,怎麼說呢,當我監聽消費者,正常情況下,不會出異常,但是如果是出現了異常,甚至是沒有獲取的異常,那是不是這條資料就會作廢,但是我們肯定不希望這樣的情況出現,我們想要的是,如果在出現異常的時候,我們識別到,如果確實是一個不良異常,肯定希望資料重新返回佇列中,再次執行我們的業務邏輯程式碼,此時我就需要一個Ack的通知,告訴佇列服務,我是否已經成功處理了這條資料,而如果不配置Ack的話呢,我測試過他會自動的忽略,也就是說此時的服務是no_ack=true的模式,就是說只要我發現你是消費了這個資料,至於異常不異常的,我不管了。通知Ack機制就是這麼來的,更加靈活的,我們需要Ack不自動,而是手動,這樣做的好處,就是使得我們開發人員更加人性化或者靈活的來處理我們的業務羅傑程式碼,更加方便的處理異常的問題以及資料的返回處理等。下面是通話機制的四條原則:
- Basic.Ack 發回給 RabbitMQ 以告知,可以將相應 message 從 RabbitMQ 的訊息快取中移除。
- Basic.Ack 未被 consumer 發回給 RabbitMQ 前出現了異常,RabbitMQ 發現與該 consumer 對應的連線被斷開,之後將該 message 以輪詢方式傳送給其他 consumer (假設存在多個 consumer 訂閱同一個 queue)。
- 在 no_ack=true 的情況下,RabbitMQ 認為 message 一旦被 deliver 出去了,就已被確認了,所以會立即將快取中的 message 刪除。所以在 consumer 異常時會導致訊息丟失。
- 來自 consumer 側的 Basic.Ack 與 傳送給 Producer 側的 Basic.Ack 沒有直接關係。
正題部分(配置手動Ack,實現異常訊息回滾)
A. 在消費者端的mq配置檔案上新增,配置 關鍵程式碼為 acknowledeg = "manual",意為表示該消費者的ack方式為手動(此時的queue已經和生產者的exchange通過某個routeKey綁定了)
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual"> <rabbit:listener queues="queue_xxx" ref="MqConsumer"/> <rabbit:listener queues="queue_xxx" ref="MqConsumer2"/> </rabbit:listener-container>
B. 新建一個類 MqConsumer ,並實現介面 ChannelAwareMessageListener ,實現onMessage方法,不需要指定方法。
springAMQP中已經實現了一個功能,如果該監聽器已經實現了下面2個介面,則直接呼叫onMessage方法
C. 關鍵點在實現了ChannelAwareMessageListener的onMessage方法後,會有2個引數。
一個是message(訊息實體),一個是channel就是當前的通道,很多地方都沒有說清楚怎麼去手動ack,其實手動ack就是在當前channel裡面呼叫basicAsk的方法,並傳入當前訊息的tagId就可以了。
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
其中deliveryTag是tag的id,由生產者生成。第二個引數我其實也沒理解用途,暫時還沒有模擬出場景,所以先不討論。
同樣的,如果要Nack或者拒絕訊息(reject)的時候,也是呼叫channel裡面的basicXXX方法就可以了(當然要制定tagId)。注意如果拋異常或Nack(並且requeue為true),訊息會一直重新入佇列,一不小心就會重複一大堆訊息不斷出現~。
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 訊息的標識,false只確認當前一個訊息收到,true確認所有consumer獲得的訊息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // ack返回false,並重新回到佇列,api裡面解釋得很清楚
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // 拒絕訊息
D. 針對上面所描述的情況,我們在搭建一個訊息佇列時候,我們的思路應該是這樣的,首先,我們要啟動ack的手動方式,緊接著,我們處理程式碼邏輯,如果發生了異常資訊,我們首先通知到ack,我已經表示接受到這條資料了,你可以進行刪除了,不需要讓他自動的重新進入佇列中,然後,我們啟用一個錯誤處理,手動將其重新插入佇列中,在此之前,有幾個類和Api一起來看一下。
1. SimpleMessageListenerContainer
這個是我們的基礎監聽,他的作用就是佇列的總監聽,可以為其配置ack模式,異常處理類等。。
2. org.springframework.amqp.support.converter.SimpleMessageConverter
這個類和下面的Converter類很容易搞混淆,這個類的作用是可以解析佇列中的 message 轉 obj
3. org.springframework.amqp.rabbit.retry.MessageRecoverer
這個介面,需要我們開發者自定義實現,其中的一個方法recover(Message message, Throwable cause),就可以看出來他是幹嘛的,就是說在監聽出錯,也就是沒有抓取的異常而是丟擲的異常會觸發該方法,我們就會在這個介面的實現中,將訊息重新入佇列
4. org.springframework.util.ErrorHandler
這個介面也是在出現異常時候,會觸發他的方法
E. 完整例項
1. spring配置佇列xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.1.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.1.xsd">
<!-- 連線服務配置 -->
<rabbit:connection-factory id="connectionFactory"
host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}"
password="${rabbitmq.password}" channel-cache-size="${rabbitmq.channel.cache.size}" />
<!-- 設定Ack模式為手動 -->
<bean id="ackManual"
class="org.springframework.beans.factory.config.FieldRetrievingFactoryBean">
<property name="staticField"
value="org.springframework.amqp.core.AcknowledgeMode.MANUAL" />
</bean>
<!-- 異常處理,記錄異常資訊 -->
<bean id="mqErrorHandler" class="com.zefun.wechat.utils.MQErrorHandler"/>
<!-- 將類自動注入,可解析msg資訊 -->
<bean id="msgConverter" class="org.springframework.amqp.support.converter.SimpleMessageConverter" />
<!-- 建立rabbitAdmin 代理類 -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/>
<rabbit:admin connection-factory="connectionFactory" />
<!-- 建立SimpleMessageListenerContainer的理想通道,主要實現異常事件處理邏輯 -->
<bean id="retryOperationsInterceptorFactoryBean"
class="org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean">
<property name="messageRecoverer">
<bean class="com.zefun.wechat.utils.MQRepublishMessageRecoverer"/>
</property>
<property name="retryOperations">
<bean class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
<bean
class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="500" />
<property name="multiplier" value="10.0" />
<property name="maxInterval" value="10000" />
</bean>
</property>
</bean>
</property>
</bean>
<!-- 定義佇列,在下面的交換機中引用次佇列,實現繫結 -->
<rabbit:queue id="queue_system_error_logger_jmail" name="${rabbitmq.system.out.log.error.mail}" durable="true"
auto-delete="false" exclusive="false" />
<!--路由設定 將佇列繫結,屬於direct型別 -->
<rabbit:direct-exchange id="directExchange"
name="directExchange" durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="queue_system_error_logger_jmail" key="${rabbitmq.system.out.log.error.mail}" />
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- logger 日誌傳送功能 -->
<bean class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="acknowledgeMode" ref="ackManual" />
<property name="queueNames" value="${rabbitmq.system.out.log.error.mail}" />
<property name="messageListener">
<bean class="com.zefun.wechat.listener.SystemOutLogErrorMessageNoitce" />
</property>
<property name="concurrentConsumers" value="${rabbitmq.concurrentConsumers}" />
<property name="adviceChain" ref="retryOperationsInterceptorFactoryBean" />
<property name="errorHandler" ref="mqErrorHandler" />
</bean>
</beans>
2. MessageRecoverer 配置,將小心重新入佇列
package com.zefun.wechat.utils;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Map;
import org.apache.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
public class MQRepublishMessageRecoverer implements MessageRecoverer {
private static final Logger logger = Logger.getLogger(MQRepublishMessageRecoverer.class);
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MessageConverter msgConverter;
@Override
public void recover(Message message, Throwable cause) {
Map<String, Object> headers = message.getMessageProperties().getHeaders();
headers.put("x-exception-stacktrace", getStackTraceAsString(cause));
headers.put("x-exception-message", cause.getCause() != null ? cause.getCause().getMessage() : cause.getMessage());
headers.put("x-original-exchange", message.getMessageProperties().getReceivedExchange());
headers.put("x-original-routingKey", message.getMessageProperties().getReceivedRoutingKey());
this.rabbitTemplate.send(message.getMessageProperties().getReceivedExchange(), message.getMessageProperties().getReceivedRoutingKey(), message);
logger.error("handler msg (" + msgConverter.fromMessage(message) + ") err, republish to mq.", cause);
}
private String getStackTraceAsString(Throwable cause) {
StringWriter stringWriter = new StringWriter();
PrintWriter printWriter = new PrintWriter(stringWriter, true);
cause.printStackTrace(printWriter);
return stringWriter.getBuffer().toString();
}
}
3. MQErrorHandler 寫法,在出現異常時,記錄異常
package com.zefun.wechat.utils;
import java.lang.reflect.Field;
import java.util.Date;
import org.apache.commons.lang.reflect.FieldUtils;
import org.apache.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.ErrorHandler;
import com.zefun.wechat.service.RedisService;
public class MQErrorHandler implements ErrorHandler {
private static final Logger logger = Logger.getLogger(MQErrorHandler.class);
@Autowired
private RedisService redisService;
@Autowired
private MessageConverter msgConverter;
@Override
public void handleError(Throwable cause) {
Field mqMsgField = FieldUtils.getField(MQListenerExecutionFailedException.class, "mqMsg", true);
if (mqMsgField != null) {
try {
Message mqMsg = (Message) mqMsgField.get(cause);
Object msgObj = msgConverter.fromMessage(mqMsg);
logger.error("handle MQ msg: " + msgObj + " failed, record it to redis.", cause);
redisService.zadd(App.MsgErr.MQ_MSG_ERR_RECORD_KEY, new Double(new Date().getTime()), msgObj.toString());
} catch (Exception e) {
e.printStackTrace();
}
} else {
logger.error("An error occurred.", cause);
}
}
}
4. SystemOutLogErrorMessageNoitce 實現 ChannelAwareMessageListener介面,處理郵件服務
package com.zefun.wechat.listener;
import javax.mail.internet.MimeMessage;
import org.apache.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.mail.javamail.JavaMailSenderImpl;
import org.springframework.mail.javamail.MimeMessageHelper;
import com.rabbitmq.client.Channel;
import com.zefun.wechat.utils.App;
import net.sf.json.JSONObject;
public class SystemOutLogErrorMessageNoitce implements ChannelAwareMessageListener {
private static final Logger logger = Logger.getLogger(MemberWechatMessageTextNoitce.class);
@Autowired
private MessageConverter msgConverter;
/** logger b */
@Autowired
private JavaMailSenderImpl senderImpl;
@Override
public void onMessage(Message message, Channel channel) throws Exception {
Object obj = null;
try {
obj = msgConverter.fromMessage(message);
} catch (MessageConversionException e) {
logger.error("convert MQ message error.", e);
} finally {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
if (deliveryTag != App.DELIVERIED_TAG) {
channel.basicAck(deliveryTag, false);
message.getMessageProperties().setDeliveryTag(App.DELIVERIED_TAG);
logger.info("revice and ack msg: " + (obj == null ? message : new String((byte[]) obj)));
}
}
if (obj == null) {
return;
}
JSONObject map = JSONObject.fromObject(obj);
sendMailSystemLoggerError(map.getString("date"), map.getString("subject"), map.getString("domain"), map.getString("requestURL"), map.getString("message"));
}
/**
* jmail logger
* @author 小高
* @date 2016年10月25日 下午3:24:46
* @param date 日期
* @param subject 主題賬戶
* @param domain 域名環境
* @param message logger日誌
* @param requestURL 請求路徑
* @throws Exception 異常資訊
*/
public void sendMailSystemLoggerError(String date, String subject, String domain, String requestURL, String message) throws Exception{
MimeMessage mailMessage = this.senderImpl.createMimeMessage();
MimeMessageHelper messageHelper = new MimeMessageHelper(mailMessage, true);
messageHelper.setTo("[email protected]");
messageHelper.setFrom("[email protected]");
messageHelper.setSubject(date + " 系統異常");
String msg = "<p>異常時間:" + date + "</p><p>門店企業:" + subject + "</p>"
+ "<p>部署環境:" + domain + "</p><p>異常連線:" + requestURL + "</p>"
+ "<p>異常內容:</p>" + message;
messageHelper.setText("<html><head></head><body>" + msg + "</body></html>", true);
senderImpl.send(mailMessage);
logger.info("jmail push message success");
}
}