1. 程式人生 > 其它 >springboot內嵌ActiveMQ

springboot內嵌ActiveMQ

目錄

springboot內嵌ActiveMQ

ps: springboot 2.2.7

1、增加依賴

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!--使用springboot2.1+時候,MQ連線池配置依賴-->
<dependency>
	<groupId>org.messaginghub</groupId>
	<artifactId>pooled-jms</artifactId>
</dependency>

2、增加MQ優化的配置

package com.platform.gis.common.config;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.SimpleMessageListenerContainer;

import javax.jms.ConnectionFactory;

@Configuration
public class ActiveMQConfig {

    @Bean
    public JmsListenerContainerFactory<?> queueListenerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        SimpleMessageListenerContainer container=new SimpleMessageListenerContainer();
        container.setConcurrentConsumers(3);
        container.setConnectionFactory(connectionFactory());
        factory.setPubSubDomain(false);
        factory.setConnectionFactory(connectionFactory());
        //連線數
        factory.setConcurrency("3-15");
        //重連間隔時間
        factory.setRecoveryInterval(1000L);
        factory.setSessionAcknowledgeMode(4);
        return factory;
    }
    @Bean
    public ConnectionFactory connectionFactory() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setTrustAllPackages(true);
        connectionFactory.setRedeliveryPolicy(redeliveryPolicy());
        return connectionFactory;
    }

    @Bean
    public RedeliveryPolicy redeliveryPolicy(){
        RedeliveryPolicy redeliveryPolicy=new RedeliveryPolicy();
        //是否在每次嘗試重新發送失敗後,增長這個等待時間
        redeliveryPolicy.setUseExponentialBackOff(true);
        //重發次數,預設為6次
        redeliveryPolicy.setMaximumRedeliveries(3);
        //重發時間間隔,預設為1秒
        redeliveryPolicy.setInitialRedeliveryDelay(1);
        //第一次失敗後重新發送之前等待500毫秒,第二次失敗再等待500 * 2毫秒,這裡的2就是value
        redeliveryPolicy.setBackOffMultiplier(2);
        //是否避免訊息碰撞
        redeliveryPolicy.setUseCollisionAvoidance(false);
        //設定重發最大拖延時間-1 表示沒有拖延只有UseExponentialBackOff(true)為true時生效
        redeliveryPolicy.setMaximumRedeliveryDelay(-1);
        return redeliveryPolicy;
    }

    @Bean
    public JmsTemplate jmsTemplate(){
        JmsTemplate jmsTemplate=new JmsTemplate(connectionFactory());
        //進行持久化配置 1表示非持久化,2表示持久化
        jmsTemplate.setDeliveryMode(1);
        //jmsTemplate.setConnectionFactory(activeMQConnectionFactory);
        //客戶端簽收模式
        jmsTemplate.setSessionAcknowledgeMode(4);
        return jmsTemplate;
    }

    @Bean
    public JmsMessagingTemplate jmsMessageTemplate(){
        return new JmsMessagingTemplate(connectionFactory());
    }

}

3、啟動activeMq的內嵌連線

@EnableJms

工具類啟動mq服務

package com.platform.gis.common.util;

import org.apache.activemq.broker.BrokerService;

public class ActiveMQUtils {

    private static final String BROKERURL = "tcp://localhost:61616";

    public static void startBrokerService(){
        try {
            BrokerService broker = new BrokerService();
            broker.addConnector(BROKERURL);
            broker.setPersistent(false);
            broker.start();
        }catch (Exception e){

        }
    }
}
	public static void main(String[] args) {
		//啟動內嵌Mq
		ActiveMQUtils.startBrokerService();
		SpringContextUtil.setApplicationContext(SpringApplication.run(AdminServiceApplication.class, args));
		
	}

4、增加生產者

package com.platform.gis.common.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;

import javax.jms.Destination;

@Service
public class JmsProduce {

    @Autowired
    private JmsMessagingTemplate jmsTemplate;

    // 傳送訊息,destination是傳送到的佇列,message是待發送的訊息
    public void sendMessage(Destination destination, final Object message) {
        jmsTemplate.convertAndSend(destination, message);
    }
}

5、增加消費者

package com.platform.gis.admin.consumer;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.platform.gis.admin.constant.SysConstanst;
import com.platform.gis.admin.dao.SysMessageDao;
import com.platform.gis.admin.log.po.SysInterfaceLog;
import com.platform.gis.admin.message.po.SysMessage;
import com.platform.gis.admin.message.vo.SysMessageVo;
import com.platform.gis.admin.service.SysLogService;
import com.platform.gis.admin.service.SysSendMessageService;
import com.platform.gis.common.constant.CommonConstant;
import com.platform.gis.common.util.Pages;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.HashMap;
import java.util.Map;

@Component
public class JmsConsumer {

    protected final Logger LOG = LoggerFactory.getLogger(getClass());

    @Autowired
    SysSendMessageService sysSendMessageService;

    @Autowired
    SysLogService sysLogService;

    @Autowired
    private SysMessageDao sysMessageDao;

    @Autowired
    protected RedisTemplate<String, Object> jsonRedisTemplate;

    /**
     * 使用JmsListener配置消費者監聽的佇列,其中text是接收到的訊息
     * <p>
     * 傳送郵件佇列
     */
    @JmsListener(destination = SysConstanst.MESSAGE_QUEUE_EMAIL)
    public void emailReceiveQueue(Message message, Session session) throws JMSException {
        SysMessageVo sysMessageVo = new SysMessageVo();
        String text = "";
        try {
            TextMessage msg = (TextMessage) message;
            text = msg.getText();
            JSONObject jsonObject = JSON.parseObject(text);
            sysMessageVo = JSON.toJavaObject(jsonObject, SysMessageVo.class);
            if (sysMessageVo != null) {
                sysSendMessageService.sendMail(sysMessageVo);
                sysMessageVo.setStatus("1");
            }
            //手動確認
            message.acknowledge();
        } catch (Exception e) {
            LOG.error(e.getLocalizedMessage(), e);
            if (sysMessageVo != null) {
                sysMessageVo.setStatus("2");
                sysMessageVo.setErrorReason(ExceptionUtils.getStackTrace(e));
            }
            //訊息重發
            session.recover();
        } finally {
            if (sysMessageVo != null) {
                SysMessage sysMessage = Pages.convert(sysMessageVo, SysMessage.class);
                sysMessageDao.updateById(sysMessage);
            }
        }
    }

    /**
     * 使用JmsListener配置消費者監聽的佇列,其中text是接收到的訊息
     * <p>
     * 傳送簡訊佇列
     */
    @JmsListener(destination = SysConstanst.MESSAGE_QUEUE_SMS)
    public void smsReceiveQueue(Message message, Session session) throws JMSException {
        SysMessageVo sysMessageVo = new SysMessageVo();
        String text = "";
        Map<String, String> msgResult = new HashMap<String, String>();
        try {
            TextMessage msg = (TextMessage) message;
            text = msg.getText();
            JSONObject jsonObject = JSON.parseObject(text);
            sysMessageVo = JSON.toJavaObject(jsonObject, SysMessageVo.class);
            if (sysMessageVo != null) {
                msgResult = sysSendMessageService.sendSMS(sysMessageVo);
                JSONObject smsResult = JSONObject.parseObject(msgResult.get("result"));
                if ("SUC0000".equals(smsResult.get("rtcode").toString())) {//簡訊傳送成功
                    sysMessageVo.setStatus("1");
                } else {
                    String errorMessage = smsResult.get("rtmsg").toString();
                    LOG.error("SMS failed to send,the reason is:" + errorMessage);
                    sysMessageVo.setErrorReason(errorMessage);
                }
            }
            //手動確認
            message.acknowledge();
        } catch (Exception e) {
            LOG.error(e.getLocalizedMessage(), e);
            if (sysMessageVo != null) {
                sysMessageVo.setStatus("2");
                sysMessageVo.setErrorReason(ExceptionUtils.getStackTrace(e));
            }
            //訊息重發
            session.recover();
        } finally {
            if (sysMessageVo != null) {
                sysMessageVo.setContent(msgResult.get("sendJson"));
                SysMessage sysMessage = Pages.convert(sysMessageVo, SysMessage.class);
                sysMessageDao.updateById(sysMessage);
            }
        }
    }

    /**
     * 定時執行審計日誌的儲存
     */
    @Scheduled(fixedDelay = SysConstanst.SCHEDULED_AUDIT_LOG_TIME)
    public void auditLogQueue() {
        sysLogService.saveAuditLogToDb();
    }


    /**
     * 介面請求日誌的儲存
     */
    @JmsListener(destination = CommonConstant.MESSAGE_QUEUE_INTERFACE_LOG)
    public void interfaceLogQueue(Message message, Session session) throws JMSException {
        SysInterfaceLog sysInterfaceLog = null;
        String text = "";
        try {
            TextMessage msg = (TextMessage) message;
            text = msg.getText();
            JSONObject jsonObject = JSON.parseObject(text);
            sysInterfaceLog = JSON.toJavaObject(jsonObject, SysInterfaceLog.class);
            if (sysInterfaceLog != null) {
                sysLogService.saveInterfaceLog(sysInterfaceLog);
            }
            //手動確認
            message.acknowledge();
        } catch (Exception e) {
            LOG.error(e.getLocalizedMessage(), e);
            //訊息重發
            session.recover();
        }
    }

}

6、增加訊息呼叫MQ

注入

@Autowired
private JmsProduce producer;

增加管道(生產不同的queue:不同的queue相當於不同的消費邏輯)

Destination destination = destination = new ActiveMQQueue(SysConstanst.MESSAGE_QUEUE_EMAIL);
producer.sendMessage(destination, JSON.toJSONString(sysMessageVo));

7、檢視是否呼叫成功