springboot內嵌ActiveMQ
阿新 • • 發佈:2022-04-12
目錄
springboot內嵌ActiveMQ
ps: springboot 2.2.7
1、增加依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <!--使用springboot2.1+時候,MQ連線池配置依賴--> <dependency> <groupId>org.messaginghub</groupId> <artifactId>pooled-jms</artifactId> </dependency>
2、增加MQ優化的配置
package com.platform.gis.common.config; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.RedeliveryPolicy; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.config.JmsListenerContainerFactory; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.listener.SimpleMessageListenerContainer; import javax.jms.ConnectionFactory; @Configuration public class ActiveMQConfig { @Bean public JmsListenerContainerFactory<?> queueListenerFactory() { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); SimpleMessageListenerContainer container=new SimpleMessageListenerContainer(); container.setConcurrentConsumers(3); container.setConnectionFactory(connectionFactory()); factory.setPubSubDomain(false); factory.setConnectionFactory(connectionFactory()); //連線數 factory.setConcurrency("3-15"); //重連間隔時間 factory.setRecoveryInterval(1000L); factory.setSessionAcknowledgeMode(4); return factory; } @Bean public ConnectionFactory connectionFactory() { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); connectionFactory.setTrustAllPackages(true); connectionFactory.setRedeliveryPolicy(redeliveryPolicy()); return connectionFactory; } @Bean public RedeliveryPolicy redeliveryPolicy(){ RedeliveryPolicy redeliveryPolicy=new RedeliveryPolicy(); //是否在每次嘗試重新發送失敗後,增長這個等待時間 redeliveryPolicy.setUseExponentialBackOff(true); //重發次數,預設為6次 redeliveryPolicy.setMaximumRedeliveries(3); //重發時間間隔,預設為1秒 redeliveryPolicy.setInitialRedeliveryDelay(1); //第一次失敗後重新發送之前等待500毫秒,第二次失敗再等待500 * 2毫秒,這裡的2就是value redeliveryPolicy.setBackOffMultiplier(2); //是否避免訊息碰撞 redeliveryPolicy.setUseCollisionAvoidance(false); //設定重發最大拖延時間-1 表示沒有拖延只有UseExponentialBackOff(true)為true時生效 redeliveryPolicy.setMaximumRedeliveryDelay(-1); return redeliveryPolicy; } @Bean public JmsTemplate jmsTemplate(){ JmsTemplate jmsTemplate=new JmsTemplate(connectionFactory()); //進行持久化配置 1表示非持久化,2表示持久化 jmsTemplate.setDeliveryMode(1); //jmsTemplate.setConnectionFactory(activeMQConnectionFactory); //客戶端簽收模式 jmsTemplate.setSessionAcknowledgeMode(4); return jmsTemplate; } @Bean public JmsMessagingTemplate jmsMessageTemplate(){ return new JmsMessagingTemplate(connectionFactory()); } }
3、啟動activeMq的內嵌連線
@EnableJms
工具類啟動mq服務
package com.platform.gis.common.util; import org.apache.activemq.broker.BrokerService; public class ActiveMQUtils { private static final String BROKERURL = "tcp://localhost:61616"; public static void startBrokerService(){ try { BrokerService broker = new BrokerService(); broker.addConnector(BROKERURL); broker.setPersistent(false); broker.start(); }catch (Exception e){ } } }
public static void main(String[] args) {
//啟動內嵌Mq
ActiveMQUtils.startBrokerService();
SpringContextUtil.setApplicationContext(SpringApplication.run(AdminServiceApplication.class, args));
}
4、增加生產者
package com.platform.gis.common.producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;
import javax.jms.Destination;
@Service
public class JmsProduce {
@Autowired
private JmsMessagingTemplate jmsTemplate;
// 傳送訊息,destination是傳送到的佇列,message是待發送的訊息
public void sendMessage(Destination destination, final Object message) {
jmsTemplate.convertAndSend(destination, message);
}
}
5、增加消費者
package com.platform.gis.admin.consumer;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.platform.gis.admin.constant.SysConstanst;
import com.platform.gis.admin.dao.SysMessageDao;
import com.platform.gis.admin.log.po.SysInterfaceLog;
import com.platform.gis.admin.message.po.SysMessage;
import com.platform.gis.admin.message.vo.SysMessageVo;
import com.platform.gis.admin.service.SysLogService;
import com.platform.gis.admin.service.SysSendMessageService;
import com.platform.gis.common.constant.CommonConstant;
import com.platform.gis.common.util.Pages;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.HashMap;
import java.util.Map;
@Component
public class JmsConsumer {
protected final Logger LOG = LoggerFactory.getLogger(getClass());
@Autowired
SysSendMessageService sysSendMessageService;
@Autowired
SysLogService sysLogService;
@Autowired
private SysMessageDao sysMessageDao;
@Autowired
protected RedisTemplate<String, Object> jsonRedisTemplate;
/**
* 使用JmsListener配置消費者監聽的佇列,其中text是接收到的訊息
* <p>
* 傳送郵件佇列
*/
@JmsListener(destination = SysConstanst.MESSAGE_QUEUE_EMAIL)
public void emailReceiveQueue(Message message, Session session) throws JMSException {
SysMessageVo sysMessageVo = new SysMessageVo();
String text = "";
try {
TextMessage msg = (TextMessage) message;
text = msg.getText();
JSONObject jsonObject = JSON.parseObject(text);
sysMessageVo = JSON.toJavaObject(jsonObject, SysMessageVo.class);
if (sysMessageVo != null) {
sysSendMessageService.sendMail(sysMessageVo);
sysMessageVo.setStatus("1");
}
//手動確認
message.acknowledge();
} catch (Exception e) {
LOG.error(e.getLocalizedMessage(), e);
if (sysMessageVo != null) {
sysMessageVo.setStatus("2");
sysMessageVo.setErrorReason(ExceptionUtils.getStackTrace(e));
}
//訊息重發
session.recover();
} finally {
if (sysMessageVo != null) {
SysMessage sysMessage = Pages.convert(sysMessageVo, SysMessage.class);
sysMessageDao.updateById(sysMessage);
}
}
}
/**
* 使用JmsListener配置消費者監聽的佇列,其中text是接收到的訊息
* <p>
* 傳送簡訊佇列
*/
@JmsListener(destination = SysConstanst.MESSAGE_QUEUE_SMS)
public void smsReceiveQueue(Message message, Session session) throws JMSException {
SysMessageVo sysMessageVo = new SysMessageVo();
String text = "";
Map<String, String> msgResult = new HashMap<String, String>();
try {
TextMessage msg = (TextMessage) message;
text = msg.getText();
JSONObject jsonObject = JSON.parseObject(text);
sysMessageVo = JSON.toJavaObject(jsonObject, SysMessageVo.class);
if (sysMessageVo != null) {
msgResult = sysSendMessageService.sendSMS(sysMessageVo);
JSONObject smsResult = JSONObject.parseObject(msgResult.get("result"));
if ("SUC0000".equals(smsResult.get("rtcode").toString())) {//簡訊傳送成功
sysMessageVo.setStatus("1");
} else {
String errorMessage = smsResult.get("rtmsg").toString();
LOG.error("SMS failed to send,the reason is:" + errorMessage);
sysMessageVo.setErrorReason(errorMessage);
}
}
//手動確認
message.acknowledge();
} catch (Exception e) {
LOG.error(e.getLocalizedMessage(), e);
if (sysMessageVo != null) {
sysMessageVo.setStatus("2");
sysMessageVo.setErrorReason(ExceptionUtils.getStackTrace(e));
}
//訊息重發
session.recover();
} finally {
if (sysMessageVo != null) {
sysMessageVo.setContent(msgResult.get("sendJson"));
SysMessage sysMessage = Pages.convert(sysMessageVo, SysMessage.class);
sysMessageDao.updateById(sysMessage);
}
}
}
/**
* 定時執行審計日誌的儲存
*/
@Scheduled(fixedDelay = SysConstanst.SCHEDULED_AUDIT_LOG_TIME)
public void auditLogQueue() {
sysLogService.saveAuditLogToDb();
}
/**
* 介面請求日誌的儲存
*/
@JmsListener(destination = CommonConstant.MESSAGE_QUEUE_INTERFACE_LOG)
public void interfaceLogQueue(Message message, Session session) throws JMSException {
SysInterfaceLog sysInterfaceLog = null;
String text = "";
try {
TextMessage msg = (TextMessage) message;
text = msg.getText();
JSONObject jsonObject = JSON.parseObject(text);
sysInterfaceLog = JSON.toJavaObject(jsonObject, SysInterfaceLog.class);
if (sysInterfaceLog != null) {
sysLogService.saveInterfaceLog(sysInterfaceLog);
}
//手動確認
message.acknowledge();
} catch (Exception e) {
LOG.error(e.getLocalizedMessage(), e);
//訊息重發
session.recover();
}
}
}
6、增加訊息呼叫MQ
注入
@Autowired
private JmsProduce producer;
增加管道(生產不同的queue:不同的queue相當於不同的消費邏輯)
Destination destination = destination = new ActiveMQQueue(SysConstanst.MESSAGE_QUEUE_EMAIL);
producer.sendMessage(destination, JSON.toJSONString(sysMessageVo));