SpringBoot整合activeMQ的簡單案例
阿新 • • 發佈:2022-03-10
ActiveMQ是什麼
ActiveMQ是訊息佇列技術,為解決高併發問題而生
ActiveMQ生產者消費者模型(生產者和消費者可以跨平臺、跨系統)
ActiveMQ支援如下兩種訊息傳輸方式
點對點模式,生產者生產了一個訊息,只能由一個消費者進行消費
釋出/訂閱模式,生產者生產了一個訊息,可以由多個消費者進行消費
SpringBoot的簡單整合
1. ActiveMQ下載啟動
http://activemq.apache.org/download-archives.html ,本文用的是windows版的5.15.3版本,下載下來是壓縮包,自行解壓一個到目錄下,CMD進入到解壓目錄下的bin目錄下,執行 activemq.bat start 啟動。如果能成功訪問http://localhost:8161/admin(使用者名稱和密碼預設為admin),則啟動成功。
2.新增依賴
建立兩個springboot專案,分別作為訊息提供者(provider)和消費者(consumer),新增依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <!--訊息佇列連線池--> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.15.0</version> </dependency>
3. boot_mq_product
啟動類加入 @EnableJms
application.yml
server: port: 7777 spring: application: name: boot_mq_product activemq: broker-url: tcp://127.0.0.1:61616 #MQ伺服器地址 user: admin password: admin jms: pub-sub-domain: true #false = queue true = topic # 自己定義佇列名稱(P2P) myqueue: boot-activemq-queue # 釋出/訂閱模式的名稱 mytopic: boot-activemq-topic
Queue_Produce
@RestController
public class Queue_Produce {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Value("${myqueue}")
private String myQueue;
@PostMapping("send")
public void produceMsg(){
jmsMessagingTemplate.convertAndSend(myQueue,"我是一條訊息");
System.out.println("訊息推送到MQ成功");
}
@PostMapping("send1")
public void produceMsg1(){
User user = new User(1,"張三","123456");
String s = JSON.toJSONString(user);
jmsMessagingTemplate.convertAndSend(myQueue,s);
System.out.println("User推送到MQ成功");
}
}
Topic_Produce
@RestController
public class Topic_Produce {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Value("${mytopic}")
private String mytopic;
@PostMapping("sendTopic")
public void sendTopic() {
String msg="傳送Topic訊息內容 :"+"我是一條訂閱訊息";
jmsMessagingTemplate.convertAndSend(mytopic, msg);
System.out.println("訂閱訊息傳送成功");
}
}
boot_mq_consumer
application.yml
server:
port: 7778 # 埠號
spring:
application:
name: boot_mq_consumer
activemq:
broker-url: tcp://127.0.0.1:61616 # 自己的MQ伺服器地址,用自己的
jms:
pub-sub-domain: true # false = Queue true = Topic
# 自己定義的佇列名稱(P2P)
myqueue: boot-activemq-queue
# 釋出/訂閱模式的名稱
mytopic: boot-activemq-topic
Queue_Consumer
@Component
public class Queue_Consumer {
/**
* 使用JmsListener配置消費者監聽的佇列,其中name是接收到的訊息
* @param
* @return
*/
@JmsListener(destination = "${myqueue}")
public void handleMessage(String message) {
System.out.println("成功接受name: " + message);
}
}
Topic_Consumer
@Component
public class Topic_Consumer {
/**
* 使用JmsListener配置消費者監聽的佇列,其中name是接收到的訊息
* @param
* @return
*/
@JmsListener(destination = "${mytopic}")
public void handleMessage1(String message) {
System.out.println("消費者1成功接受: " + message);
}
/**
* 使用JmsListener配置消費者監聽的佇列,其中name是接收到的訊息
* @param
* @return
*/
@JmsListener(destination = "${mytopic}")
public void handleMessage2(String message) {
System.out.println("消費者2成功接受: " + message);
}
}
注意:
使用訂閱/釋出方式時,需修改配置檔案中的 pub-sub-domain 引數 (activeMQ預設使用P2P方式)
同時使用點對點和釋出/訂閱模式
1.修改配置檔案
中的 pub-sub-domain引數為true
2.consumer的啟動類
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(@Qualifier("jmsConnectionFactory") ConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setPubSubDomain(true);
bean.setConnectionFactory(activeMQConnectionFactory);
return bean;
}
3.修改Topic_Consumer
@Component
public class Topic_Consumer {
/**
* 使用JmsListener配置消費者監聽的佇列,其中name是接收到的訊息
* @param
* @return
*/
@JmsListener(destination = "${mytopic}", containerFactory = "jmsListenerContainerTopic")
public void handleMessage1(String message) {
System.out.println("消費者1成功接受: " + message);
}
/**
* 使用JmsListener配置消費者監聽的佇列,其中name是接收到的訊息
* @param
* @return
*/
@JmsListener(destination = "${mytopic}", containerFactory = "jmsListenerContainerTopic")
public void handleMessage2(String message) {
System.out.println("消費者2成功接受: " + message);
}
}