SpringBoot之ActiveMQ實現延遲訊息
阿新 • • 發佈:2019-12-31
一、安裝activeMQ
安裝步驟參照網上教程,本文不做介紹
二、修改activeMQ配置檔案
broker新增配置資訊 schedulerSupport="true"
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true" >
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" >
<!-- The constantPendingMessageLimitStrategy is used to prevent
slow topic consumers to block producers and affect other consumers
by limiting the number of messages that are retained
For more information,see:
http://activemq.apache.org/slow-consumer-handling.html
-->
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>複製程式碼
三、建立SpringBoot工程
]()
-
配置ActiveMQ工廠資訊,信任包必須配置否則會報錯
package com.example.demoactivemq.config;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.ArrayList;
import java.util.List;
/**
* @author shanks on 2019-11-12
*/
@Configuration
public class ActiveMqConfig {
@Bean
public ActiveMQConnectionFactory factory(@Value("${spring.activemq.broker-url}") String url){
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
// 設定信任序列化包集合
List<String> models = new ArrayList<>();
models.add("com.example.demoactivemq.domain");
factory.setTrustedPackages(models);
return factory;
}
}
複製程式碼
-
訊息實體類
package com.example.demoactivemq.domain;
import lombok.Builder;
import lombok.Data;
import java.io.Serializable;
/**
* @author shanks on 2019-11-12
*/
@Builder
@Data
public class MessageModel implements Serializable {
private String titile;
private String message;
}
複製程式碼
-
生產者
package com.example.demoactivemq.producer;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.jms.JmsProperties;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;
import javax.jms.*;
import java.io.Serializable;
/**
* 訊息生產者
*
* @author shanks
*/
@Service
@Slf4j
public class Producer {
public static final Destination DEFAULT_QUEUE = new ActiveMQQueue("delay.queue");
@Autowired
private JmsMessagingTemplate template;
/**
* 傳送訊息
*
* @param destination destination是傳送到的佇列
* @param message message是待傳送的訊息
*/
public <T extends Serializable> void send(Destination destination,T message) {
template.convertAndSend(destination,message);
}
/**
* 延時傳送
*
* @param destination 傳送的佇列
* @param data 傳送的訊息
* @param time 延遲時間
*/
public <T extends Serializable> void delaySend(Destination destination,T data,Long time) {
Connection connection = null;
Session session = null;
MessageProducer producer = null;
// 獲取連線工廠
ConnectionFactory connectionFactory = template.getConnectionFactory();
try {
// 獲取連線
connection = connectionFactory.createConnection();
connection.start();
// 獲取session,true開啟事務,false關閉事務
session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
// 建立一個訊息佇列
producer = session.createProducer(destination);
producer.setDeliveryMode(JmsProperties.DeliveryMode.PERSISTENT.getValue());
ObjectMessage message = session.createObjectMessage(data);
//設定延遲時間
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,time);
// 傳送訊息
producer.send(message);
log.info("傳送訊息:{}",data);
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (producer != null) {
producer.close();
}
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
複製程式碼
-
消費者
package com.example.demoactivemq.producer;
import com.example.demoactivemq.domain.MessageModel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
/**
* 消費者
*/
@Component
@Slf4j
public class Consumer {
@JmsListener(destination = "delay.queue")
public void receiveQueue(MessageModel message) {
log.info("收到訊息:{}",message);
}
}
複製程式碼
- application.yml
spring:
activemq:
broker-url: tcp://localhost:61616複製程式碼
- 測試類
package com.example.demoactivemq;
import com.example.demoactivemq.domain.MessageModel;
import com.example.demoactivemq.producer.Producer;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest(classes = DemoActivemqApplication.class)
@RunWith(SpringRunner.class)
class DemoActivemqApplicationTests {
/**
* 訊息生產者
*/
@Autowired
private Producer producer;
/**
* 及時訊息佇列測試
*/
@Test
public void test() {
MessageModel messageModel = MessageModel.builder()
.message("測試訊息")
.titile("訊息000")
.build();
// 傳送訊息
producer.send(Producer.DEFAULT_QUEUE,messageModel);
}
/**
* 延時訊息佇列測試
*/
@Test
public void test2() {
for (int i = 0; i < 5; i++) {
MessageModel messageModel = MessageModel.builder()
.titile("延遲10秒執行")
.message("測試訊息" + i)
.build();
// 傳送延遲訊息
producer.delaySend(Producer.DEFAULT_QUEUE,messageModel,10000L);
}
try {
// 休眠100秒,等等訊息執行
Thread.currentThread().sleep(100000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
複製程式碼
執行結果
2019-11-12 22:18:52.939 INFO 17263 --- [ main] c.e.demoactivemq.producer.Producer : 傳送訊息:MessageModel(titile=延遲10秒執行,message=測試訊息0)
2019-11-12 22:18:52.953 INFO 17263 --- [ main] c.e.demoactivemq.producer.Producer : 傳送訊息:MessageModel(titile=延遲10秒執行,message=測試訊息1)
2019-11-12 22:18:52.958 INFO 17263 --- [ main] c.e.demoactivemq.producer.Producer : 傳送訊息:MessageModel(titile=延遲10秒執行,message=測試訊息2)
2019-11-12 22:18:52.964 INFO 17263 --- [ main] c.e.demoactivemq.producer.Producer : 傳送訊息:MessageModel(titile=延遲10秒執行,message=測試訊息3)
2019-11-12 22:18:52.970 INFO 17263 --- [ main] c.e.demoactivemq.producer.Producer : 傳送訊息:MessageModel(titile=延遲10秒執行,message=測試訊息4)
2019-11-12 22:19:03.012 INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer : 收到訊息:MessageModel(titile=延遲10秒執行,message=測試訊息0)
2019-11-12 22:19:03.017 INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer : 收到訊息:MessageModel(titile=延遲10秒執行,message=測試訊息1)
2019-11-12 22:19:03.019 INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer : 收到訊息:MessageModel(titile=延遲10秒執行,message=測試訊息2)
2019-11-12 22:19:03.020 INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer : 收到訊息:MessageModel(titile=延遲10秒執行,message=測試訊息3)
2019-11-12 22:19:03.021 INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer : 收到訊息:MessageModel(titile=延遲10秒執行,message=測試訊息4)複製程式碼
比你優秀的人比你還努力,你有什麼資格不去奮鬥!!!