SpringBoot整合ActiveMq,並將管理介面整合到後臺系統
阿新 • • 發佈:2018-12-11
1:下載安裝
下載地址: http://activemq.apache.org/download.html
開發環境使用Windows版本,執行在自己的主機,防止相互干擾。
解壓縮,點選下圖所示批處理檔案,即可執行。
預設的管理後臺地址為:ip:8161,可以訪問此網址驗證是否安裝成功。
管理平臺預設使用者名稱:admin,預設密碼:admin
2:基本介紹
MQ是訊息中介軟體,是一種在分散式系統中應用程式藉以傳遞訊息的媒介,常用的有ActiveMQ,RabbitMQ,Kafka。ActiveMQ是Apache下的開源專案,完全支援JMS1.1和J2EE1.4規範的JMS Provider實現。
特點:
1、支援多種語言編寫客戶端
2、對spring的支援,很容易和spring整合
3、支援多種傳輸協議:TCP,SSL,NIO,UDP等
4、支援AJAX
訊息形式:
1、點對點(queue)
2、一對多(topic)
3:專案整合
配置pom.xml:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.14.5</version> </dependency>
配置全域性配置檔案application-*.yml
#訊息佇列配置 activemq: user: admin password: admin broker-url: tcp://127.0.0.1:61616 pool: enabled: true max-connections: 10 #可以根據業務,配置多個佇列名 #訊息佇列的 queue 名稱 queueName: publish.queue #訊息佇列的 topic 名稱 topicName: publish.topic
新增啟動配置ActiveMqConfig.java:
package cc.ahxb.config;
/**
* Describe
*
* @author Gao
* @date 2018/12/1
*/
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import javax.jms.Queue;
import javax.jms.Topic;
/**
* @author: Gao
*/
@Configuration
public class ActiveMqConfig {
@Value("${queueName}")
private String queueName;
@Value("${topicName}")
private String topicName;
@Value("${spring.activemq.user}")
private String usrName;
@Value("${spring.activemq.password}")
private String password;
@Value("${spring.activemq.broker-url}")
private String brokerUrl;
/**
* 實際專案中,可以通過不同的queueName,分別例項化多個Queue,用在不同型別的訊息的區分
* @return
*/
@Bean
public Queue queue(){
return new ActiveMQQueue(queueName);
}
@Bean
public Topic topic(){
return new ActiveMQTopic(topicName);
}
@Bean
public ActiveMQConnectionFactory connectionFactory() {
return new ActiveMQConnectionFactory(usrName, password, brokerUrl);
}
@Bean
public JmsListenerContainerFactory jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory){
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setConnectionFactory(connectionFactory);
return bean;
}
@Bean
public JmsListenerContainerFactory jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory){
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
//設定為釋出訂閱方式, 預設情況下使用的生產消費者方式
bean.setPubSubDomain(true);
bean.setConnectionFactory(connectionFactory);
return bean;
}
}
4:定義訊息佇列消費者:
抽象介面
QueueConsumer.java
package cc.ahxb.mq.consumer;
import cc.ahxb.mqinterface.IMqConsumer;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.stereotype.Component;
import javax.jms.*;
/**
* Describe
*
* @author Jxx
* @date 2018/12/1
*/
//加上Component註解,啟動即監聽了訊息佇列。
@Component
public class QueueConsumer implements IMqConsumer {
@Autowired
private ActiveMQConnectionFactory connectionFactory;
/**
* 接收到的Queue型別的訊息,
* destination(訊息的目的地)為"publish.queue",在配置檔案中配置
* containerFactory(容器工廠),jmsListenerContainerQueue,在ActiveMqConfig 中初始化
* @param message 接收到的訊息
* @throws Exception
*/
@Override
@JmsListener(destination = "publish.queue", containerFactory = "jmsListenerContainerQueue")
public void onReceive(String message) throws Exception {
System.out.println("接收到publish.queue中訊息:"+message+"");
}
/**
* 設定消費者接收器(接收自定義佇列的訊息,會阻塞執行緒)
* @param messageListener 訊息監聽器
* @param destination 自定義佇列的名稱,必須與傳送端保持一致
*
*/
@Override
public void setMessageListener(MessageListener messageListener,String destination) throws Exception {
//1、建立工廠連線物件,需要制定ip和埠號
//2、使用連線工廠建立一個連線物件
Connection connection = connectionFactory.createConnection();
//3、開啟連線
connection.start();
//4、使用連線物件建立會話(session)物件
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5、使用會話物件建立目標物件,包含queue和topic(一對一和一對多)
Queue queue = session.createQueue(destination);
//6、使用會話物件建立生產者物件
MessageConsumer consumer = session.createConsumer(queue);
//7、向consumer物件中設定一個messageListener物件,用來接收訊息
consumer.setMessageListener(messageListener);
//8、程式等待接收使用者訊息
System.in.read();
//9、關閉資源
consumer.close();
session.close();
connection.close();
}
}
TopicConsumer.java
package cc.ahxb.mq.consumer;
import cc.ahxb.mqinterface.IMqConsumer;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.*;
/**
* Describe
*
* @author Jxx
* @date 2018/12/1
*/
//加上Component註解,啟動即監聽了訊息佇列。
@Component
public class TopicConsumer implements IMqConsumer {
@Autowired
private ActiveMQConnectionFactory connectionFactory;
/**
* 接收到的Topic型別的訊息,
* destination(訊息的目的地)為"publish.topic",在配置檔案中配置
* containerFactory(容器工廠),jmsListenerContainerTopic,在ActiveMqConfig 中初始化
* @param message 接收到的訊息
* @throws Exception
*/
@Override
@JmsListener(destination = "publish.topic", containerFactory = "jmsListenerContainerTopic")
public void onReceive(String message) throws Exception {
System.out.println("接收到publish.topic中訊息:"+message+"");
}
/**
* 設定消費者接收器(接收自定義佇列的訊息,會阻塞執行緒)
* @param messageListener 訊息監聽器
* @param destination 自定義佇列的名稱,必須與傳送端保持一致
* @throws Exception
*/
@Override
public void setMessageListener(MessageListener messageListener, String destination) throws Exception {
//2、使用連線工廠建立一個連線物件
Connection connection = connectionFactory.createConnection();
//3、開啟連線
connection.start();
//4、使用連線物件建立會話(session)物件
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5、使用會話物件建立目標物件,包含queue和topic(一對一和一對多)
Topic topic = session.createTopic(destination);
//6、使用會話物件建立生產者物件
MessageConsumer consumer = session.createConsumer(topic);
//7、向consumer物件中設定一個messageListener物件,用來接收訊息
consumer.setMessageListener(messageListener);
//8、程式等待接收使用者訊息
System.in.read();
//9、關閉資源
consumer.close();
session.close();
connection.close();
}
}
5:定義訊息佇列的生產者:
ActiveMqProducer.java
package cc.ahxb.mq.producer;
import cc.ahxb.mqinterface.IMqProducer;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;
import javax.jms.Queue;
import javax.jms.Topic;
/**
* Describe
*
* @author Jxx
* @date 2018/12/1
*/
@Component
public class ActiveMqProducer implements IMqProducer {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Queue queue;
@Autowired
private Topic topic;
/**
* 傳送Queue型別的訊息
* @param message
* @throws Exception
*/
@Override
public void sendQueueMessage(String message) throws Exception {
jmsMessagingTemplate.convertAndSend(this.queue,message);
}
/**
* 傳送Topic型別的訊息
* @param message
* @throws Exception
*/
@Override
public void sendTopicMessage(String message) throws Exception {
jmsMessagingTemplate.convertAndSend(this.topic,message);
}
/**
* 向自定義佇列傳送訊息
* @param queueName 自定義的 destination佇列名
* @param message 訊息內容
* @throws Exception
*/
@Override
public void sendQueueMessage(String queueName,String message) throws Exception {
jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName),message);
}
}
6:呼叫示例:
package cc.ahxb.controller;
import cc.ahxb.model.Log;
import cc.ahxb.mq.consumer.QueueConsumer;
import cc.ahxb.mq.producer.ActiveMqProducer;
import cc.ahxb.util.JsonUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.listener.adapter.AbstractAdaptableMessageListener;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.jms.*;
/**
* Describe
*
* @author Jxx
* @date 2018/12/1
*/
@Controller
@RequestMapping("/mq")
public class MqController {
@Autowired
private ActiveMqProducer activeMqProducer;
@GetMapping(value = "/sendMessage")
@ResponseBody
public JsonUtil sendMessage(){
JsonUtil result = new JsonUtil();
try {
activeMqProducer.sendQueueMessage("測試訊息佇列");
result.setMessage("傳送成功");
}catch (Exception e){
e.printStackTrace();
result.setMessage("傳送失敗,請重新發送");
}
return result;
}
}
補充:如果需要做到在Iframe中顯示管理介面,需要更改activeMQ 自帶伺服器的X-FRAME-OPTIONS策略