1. 程式人生 > 程式設計 >SpringBoot之ActiveMQ實現延遲訊息

SpringBoot之ActiveMQ實現延遲訊息

一、安裝activeMQ

​ 安裝步驟參照網上教程,本文不做介紹

二、修改activeMQ配置檔案

​ broker新增配置資訊 schedulerSupport="true"

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true" >

        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" >
                    <!-- The constantPendingMessageLimitStrategy is used to prevent
                         slow topic consumers to block producers and affect other consumers
                         by limiting the number of messages that are retained
                         For more information,see:

                         http://activemq.apache.org/slow-consumer-handling.html

                    -->
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>複製程式碼

三、建立SpringBoot工程

![file]()

  1. 配置ActiveMQ工廠資訊,信任包必須配置否則會報錯
package com.example.demoactivemq.config;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.ArrayList;
import java.util.List;

/**
 * @author shanks on 2019-11-12
 */
@Configuration
public class ActiveMqConfig {

    @Bean
    public ActiveMQConnectionFactory factory(@Value("${spring.activemq.broker-url}") String url){
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
        // 設定信任序列化包集合
        List<String> models = new ArrayList<>();
        models.add("com.example.demoactivemq.domain");
        factory.setTrustedPackages(models);

        return factory;
    }

}
複製程式碼

  1. 訊息實體類
package com.example.demoactivemq.domain;

import lombok.Builder;
import lombok.Data;

import java.io.Serializable;

/**
 * @author shanks on 2019-11-12
 */

@Builder
@Data
public class MessageModel implements Serializable {
    private String titile;
    private String message;
}
複製程式碼

  1. 生產者
package com.example.demoactivemq.producer;


import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.jms.JmsProperties;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;

import javax.jms.*;
import java.io.Serializable;


/**
 * 訊息生產者
 *
 * @author shanks
 */
@Service
@Slf4j
public class Producer {

    public static final Destination DEFAULT_QUEUE = new ActiveMQQueue("delay.queue");

    @Autowired
    private JmsMessagingTemplate template;

    /**
     * 傳送訊息
     *
     * @param destination destination是傳送到的佇列
     * @param message     message是待傳送的訊息
     */
    public <T extends Serializable> void send(Destination destination,T message) {
        template.convertAndSend(destination,message);
    }

    /**
     * 延時傳送
     *
     * @param destination 傳送的佇列
     * @param data        傳送的訊息
     * @param time        延遲時間
     */
    public <T extends Serializable> void delaySend(Destination destination,T data,Long time) {
        Connection connection = null;
        Session session = null;
        MessageProducer producer = null;
        // 獲取連線工廠
        ConnectionFactory connectionFactory = template.getConnectionFactory();
        try {
            // 獲取連線
            connection = connectionFactory.createConnection();
            connection.start();
            // 獲取session,true開啟事務,false關閉事務
            session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
            // 建立一個訊息佇列
            producer = session.createProducer(destination);
            producer.setDeliveryMode(JmsProperties.DeliveryMode.PERSISTENT.getValue());
            ObjectMessage message = session.createObjectMessage(data);
            //設定延遲時間
            message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,time);
            // 傳送訊息
            producer.send(message);
            log.info("傳送訊息:{}",data);
            session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (producer != null) {
                    producer.close();
                }
                if (session != null) {
                    session.close();
                }
                if (connection != null) {
                    connection.close();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
複製程式碼

  1. 消費者
package com.example.demoactivemq.producer;


import com.example.demoactivemq.domain.MessageModel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

/**
 * 消費者
 */
@Component
@Slf4j
public class Consumer {


    @JmsListener(destination = "delay.queue")
    public void receiveQueue(MessageModel message) {
        log.info("收到訊息:{}",message);
    }
}
複製程式碼

  1. application.yml
spring:
  activemq:
    broker-url: tcp://localhost:61616複製程式碼

  1. 測試類
package com.example.demoactivemq;

import com.example.demoactivemq.domain.MessageModel;
import com.example.demoactivemq.producer.Producer;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest(classes = DemoActivemqApplication.class)
@RunWith(SpringRunner.class)
class DemoActivemqApplicationTests {

    /**
     * 訊息生產者
     */
    @Autowired
    private Producer producer;

    /**
     * 及時訊息佇列測試
     */
    @Test
    public void test() {
        MessageModel messageModel = MessageModel.builder()
                .message("測試訊息")
                .titile("訊息000")
                .build();
        // 傳送訊息
        producer.send(Producer.DEFAULT_QUEUE,messageModel);
    }

    /**
     * 延時訊息佇列測試
     */
    @Test
    public void test2() {
        for (int i = 0; i < 5; i++) {
            MessageModel messageModel = MessageModel.builder()
                    .titile("延遲10秒執行")
                    .message("測試訊息" + i)
                    .build();
            // 傳送延遲訊息
            producer.delaySend(Producer.DEFAULT_QUEUE,messageModel,10000L);
        }
        try {
            // 休眠100秒,等等訊息執行
            Thread.currentThread().sleep(100000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}
複製程式碼

執行結果

2019-11-12 22:18:52.939  INFO 17263 --- [           main] c.e.demoactivemq.producer.Producer       : 傳送訊息:MessageModel(titile=延遲10秒執行,message=測試訊息0)
2019-11-12 22:18:52.953  INFO 17263 --- [           main] c.e.demoactivemq.producer.Producer       : 傳送訊息:MessageModel(titile=延遲10秒執行,message=測試訊息1)
2019-11-12 22:18:52.958  INFO 17263 --- [           main] c.e.demoactivemq.producer.Producer       : 傳送訊息:MessageModel(titile=延遲10秒執行,message=測試訊息2)
2019-11-12 22:18:52.964  INFO 17263 --- [           main] c.e.demoactivemq.producer.Producer       : 傳送訊息:MessageModel(titile=延遲10秒執行,message=測試訊息3)
2019-11-12 22:18:52.970  INFO 17263 --- [           main] c.e.demoactivemq.producer.Producer       : 傳送訊息:MessageModel(titile=延遲10秒執行,message=測試訊息4)
2019-11-12 22:19:03.012  INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到訊息:MessageModel(titile=延遲10秒執行,message=測試訊息0)
2019-11-12 22:19:03.017  INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到訊息:MessageModel(titile=延遲10秒執行,message=測試訊息1)
2019-11-12 22:19:03.019  INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到訊息:MessageModel(titile=延遲10秒執行,message=測試訊息2)
2019-11-12 22:19:03.020  INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到訊息:MessageModel(titile=延遲10秒執行,message=測試訊息3)
2019-11-12 22:19:03.021  INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到訊息:MessageModel(titile=延遲10秒執行,message=測試訊息4)複製程式碼

比你優秀的人比你還努力,你有什麼資格不去奮鬥!!!