Spring boo整合ActiveMQ
阿新 • • 發佈:2018-12-11
ActiveMQ介紹 訊息中介軟體ActiveMQ是JMS(Java Message Service)規範的一種實現。 兩個重要概念
- 目的地(destination):訊息傳送到哪裡
- 訊息代理(message broker):提供一種通訊機制
當訊息傳送者傳送訊息以後,將由訊息代理接管,訊息代理保證訊息傳遞到指定目的地。
兩種訊息模型
- P2P模式:點對點訊息通訊(point-to-point)
- Pub/Sub模式:釋出(publish)/訂閱(subscribe)訊息通訊
訊息中介軟體的使用場景
- 非同步處理
- 流量消鋒
- 系統解耦
Spring boot中使用ActiveMQ 新增activemq啟動器
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
配置ActiveMQ
#配置ActiveMQ
#指定ActiveMQ broker的URL
spring.activemq.broker-url=tcp://192.168.25.128:61616
#是否是記憶體模式
spring.activemq.in-memory=true
#是否建立PooledConnectionFactory,而非ConnectionFactory
spring.activemq.pool.enabled=false
點對點模式 1、每個訊息只有一個消費者,訊息一旦被消費,就從訊息佇列中消失 2、訊息的傳送者和接收者在時候上沒有依賴性,當傳送者傳送了訊息之後,不管接收者有沒有處於執行狀態,訊息都會發送到訊息佇列中 3、接收者在成功接收訊息之後需要向佇列應答成功如果希望傳送的每個訊息都被成功處理,就可以採用點對點的訊息模型
- 訊息生產者(Producer)
@Service
public class Producer {
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
private Queue queue;
public void sendQueueMessage(String message) {
/*
* 傳送訊息自動轉換成原始訊息
* 訊息不需要實現MessageCreate介面
*/
jmsTemplate.convertAndSend(queue,message);
}
}
- 訊息接收者(Consumer)
@Component
public class ConsumerQueue {
private static final Logger logger = LoggerFactory.getLogger(ConsumerQueue.class);
//監聽者的方式來監聽指定地點的訊息,採用註解@JmsListener來設定監聽方法。
@JmsListener(destination = "wang.zx.queue")
public void receiveQueue(String message) {
logger.info("queue訊息 ---> {}",message);
}
}
- 傳送訊息
@RunWith(SpringRunner.class)
@SpringBootTest
@EnableJms//對 JMS 註解的支援
public class SpringbootactivemqApplicationTests {
@Autowired
private Producer producer;
@Test
public void contextLoads() {
producer.sendQueueMessage("Spring boot --- queue");
}
}
- 效果展示
釋出訂閱模式
1、每個訊息可以有多個消費者 2、釋出者和訂閱者之間有時間上的依賴性,針對某個主題的訂閱者,必須創3、建一個訂閱者之後,才能釋出者釋出的訊息 為了訊息可以消費,訂閱者必須儲存執行狀態
Spring boot預設傳送的訊息模型為點對點模型,如果需要釋出/訂閱模型,就需要在配置檔案中配置,配置之後就不支援點對點模型了(後面會將同時釋出點對點訊息和釋出/訂閱訊息)
spring.jms.pub-sub-domain=true
- 訊息釋出者
@Service
public class Producer {
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
private Topic topic;
public void sendQueueMessage(String message) {
/*
* 傳送訊息自動轉換成原始訊息
* 訊息不需要實現MessageCreate介面
*/
//topic:目的地 message:要傳送的訊息
jmsTemplate.convertAndSend(topic,message);
}
}
- 訊息訂閱者(多個訂閱者)
@Component
public class ConsumerTopic {
private static final Logger logger = LoggerFactory.getLogger(ConsumerTopic.class);
@JmsListener(destination = "wang.zx.topic")
public void receiveTopic1(String message) {
logger.info("訂閱者1收到訊息: ---> {}",message);
}
@JmsListener(destination = "wang.zx.topic")
public void receiveTopic2(String message) {
logger.info("訂閱者2收到訊息: ---> {}",message);
}
@JmsListener(destination = "wang.zx.topic")
public void receiveTopic3(String message) {
logger.info("訂閱者3收到訊息: ---> {}",message);
}
}
- 釋出訊息
@RunWith(SpringRunner.class)
@SpringBootTest
@EnableJms//對 JMS 註解的支援
public class SpringbootactivemqApplicationTests {
@Autowired
private Producer producer;
@Test
public void contextLoads() {
producer.sendTopicMessage("spring boot --- topic");
}
}
- 效果展示
同時支援點對點和釋出/訂閱 訊息的接收,也就是@JmsListener如果不指定獨立的containerFactory的話是隻能消費queue訊息的,如果要能夠同時接收topic訊息,需要給topic對應的@JmsListener增加containerFactory配置
- JmsConfig 配置JmsListenerContainerFactory
/**
* @author WangZX
* @create 2018-09-22 9:38
*/
@Configuration
public class JmsConfig {
@Bean
public Queue queue() {
return new ActiveMQQueue("wang.zx.queue");
}
@Bean
public Topic topic() {
return new ActiveMQTopic("wang.zx.topic");
}
//配置釋出/訂閱模式的JmsListenerContainerFactory
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerFactoryTopic(ConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
//支援釋出訂閱功能
bean.setPubSubDomain(true);
bean.setConnectionFactory(activeMQConnectionFactory);
return bean;
}
//配置點對點模式的JmsListenerContainerFactory
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerFactoryQueue(ConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setConnectionFactory(activeMQConnectionFactory);
bean.setPubSubDomain(false);
return bean;
}
}
- 訊息生產者
@Service
public class Producer {
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
private Queue queue;
@Autowired
private Topic topic;
public void sendQueueMessage(String message) {
//queue:目的地 message:要傳送的訊息
jmsTemplate.convertAndSend(queue,message);
}
public void sendTopicMessage(String message) {
//topic:目的地 message:要傳送的訊息
jmsTemplate.convertAndSend(topic,message);
}
}
- 點對點消費者
@Component
public class ConsumerQueue {
private static final Logger logger = LoggerFactory.getLogger(ConsumerQueue.class);
//同時支援點對點和訊息佇列需要指定 containerFactory
@JmsListener(destination = "wang.zx.queue", containerFactory = "jmsListenerContainerFactoryQueue")
public void receiveQueue(String message) {
logger.info("queue訊息 ---> {}",message);
}
}
- 訂閱者
@Component
public class ConsumerTopic {
private static final Logger logger = LoggerFactory.getLogger(ConsumerTopic.class);
//指定支援釋出訂閱的containerFactory
@JmsListener(destination = "wang.zx.topic", containerFactory = "jmsListenerContainerFactoryTopic")
public void receiveTopic1(String message) {
logger.info("訂閱者1收到訊息: ---> {}",message);
}
@JmsListener(destination = "wang.zx.topic", containerFactory = "jmsListenerContainerFactoryTopic")
public void receiveTopic2(String message) {
logger.info("訂閱者2收到訊息: ---> {}",message);
}
@JmsListener(destination = "wang.zx.topic", containerFactory = "jmsListenerContainerFactoryTopic")
public void receiveTopic3(String message) {
logger.info("訂閱者3收到訊息: ---> {}",message);
}
}
- 訊息傳送
@RunWith(SpringRunner.class)
@SpringBootTest
@EnableJms
public class SpringbootactivemqApplicationTests {
@Autowired
private Producer producer;
@Test
public void contextLoads() {
//點對點訊息
producer.sendQueueMessage("Spring boot --- queue");
//釋出/訂閱訊息
producer.sendTopicMessage("spring boot --- topic");
}
}
效果展示