1. 程式人生 > 程式設計 >Springboot整合Active訊息佇列

Springboot整合Active訊息佇列

簡單理解:

Active是Apache公司旗下的一個訊息匯流排,ActiveMQ是一個開源相容Java Message Service(JMS) 面向訊息的中件間. 是一個提供鬆耦合的應用程式架構.

主要用來在服務與服務之間進行非同步通訊的。

一、搭建步驟
1、相應jar包

<!-- 整合訊息佇列ActiveMQ -->
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-activemq</artifactId>
  </dependency>
  
 <!-- 如果配置執行緒池則加入 -->
  <dependency> 
   <groupId>org.apache.activemq</groupId> 
   <artifactId>activemq-pool</artifactId> 
  </dependency>

2、application.properties檔案

#整合jms測試,安裝在別的機器,防火牆和埠號記得開放
spring.activemq.broker-url=tcp://47.96.44.110:61616

spring.activemq.user=admin
spring.activemq.password=admin
#下列配置要增加依賴
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=100

#叢集配置(後續需要在配上)
#spring.activemq.broker-url=failover:(tcp://localhost:61616,tcp://localhost:61617)
#訊息佇列預設是點對點的,如果需要釋出/訂閱模式那麼需要加上下面註解(如果同時需要點對點發布訂閱這裡也需註釋掉)
# spring.jms.pub-sub-domain=true

3、Springboot主類

<!-- 主類需要多加一個@EnableJms註解,不過貌似我沒有加的時候,也能執行,為安全起見姑且加上 -->
@SpringBootApplication
@EnableJms

4.5.......根據不同訊息模式來寫了。

二、點對點案例
我在這裡案例中建立了兩個點對點佇列,所以他會有兩個queue物件,同樣對應每個queue物件,都會有單一對應的消費者。

1、Springboot主類

@SpringBootApplication
@EnableJms
public class Main {

 public static void main(String[] args) {
  SpringApplication.run(Main.class,args);
 }
 
 //新建一個的Queue物件,交給sringboot管理,這個queue的名稱叫"first.queue".
 @Bean
 public Queue queue(){
  return new ActiveMQQueue("first.queue");
 }
}

2.1、first.queue對應消費者

@Component
public class FirstConsumer {

 //名為"first.queue"訊息佇列的消費者,通過JmsListener進行監聽有沒有訊息,有訊息會立刻讀取過來
 @JmsListener(destination="first.queue")
 public void receiveQueue(String text){
  System.out.println("FirstConsumer收到的報文為:"+text);
 }
}

2.2、two.queue對應消費者(後面會建立)

@Component
public class TwoConsumer {

 //名為"two.queue"訊息佇列的消費者
 @JmsListener(destination="two.queue")
 public void receiveQueue(String text){
  System.out.println("TwoConsumer收到的報文為:"+text);
 }
}

3、Service類

/**
 * 功能描述:訊息生產
 */
public interface ProducerService {

 // 功能描述:指定訊息佇列,還有訊息 
 public void sendMessage(Destination destination,final String message);
 

 // 功能描述:使用預設訊息佇列, 傳送訊息
 public void sendMessage( final String message);

}

4、ServiceImpl實現類

/**
 * 功能描述:訊息生產者實現類
 */
@Service
public class ProducerServiceImpl implements ProducerService{

 //這個佇列就是Springboot主類中bean的物件
 @Autowired
 private Queue queue;
 
 //用來發送訊息到broker的物件,可以理解連線資料庫的JDBC
 @Autowired
 private JmsMessagingTemplate jmsTemplate; 
 
 //傳送訊息,destination是傳送到的佇列,message是待發送的訊息
 @Override
 public void sendMessage(Destination destination,String message) {  
  jmsTemplate.convertAndSend(destination,message); 
 }
 
 //傳送訊息,queue是傳送到的佇列,message是待發送的訊息
 @Override
 public void sendMessage(final String message) { 
  jmsTemplate.convertAndSend(this.queue,message); 
 }  
}

5.QueueController類

/**
 * 功能描述:點對點訊息佇列控制層
 */
@RestController
@RequestMapping("/api/v1")
public class QueueController {
 
 @Autowired
 private ProducerService producerService;  

 // 這裡後面呼叫的是Springboot主類的quene佇列
 @GetMapping("first")
 public Object common(String msg){
  producerService.sendMessage(msg); 
  return "Success";
 }  
 
 // 這個佇列是新建的一個名為two.queue的點對點訊息佇列
 @GetMapping("two")
 public Object order(String msg){
  
  Destination destination = new ActiveMQQueue("two.queue");
  producerService.sendMessage(destination,msg);
  
  return "Success";
 }  
}

6、案例演示:

Springboot整合Active訊息佇列

從演示效果可以得出以下結論:

1:當springboot啟動時候,就生成了這兩個佇列,而且他們都會有一個消費者

2:當我通過頁面訪問的時候,就相當於生產者把訊息放到佇列中,一旦放進去就會被消費者監聽到,就可以獲取生產者放進去的值並在後臺打印出

順便對頁面中四個單詞進行解釋:

Number Of Pending Messages :待處理訊息的數量。我們每次都會被監聽處理掉,所以不存在待處理,如果存在就說這裡面哪裡出故障了,需要排查

Number Of Consumers :消費者數量

Messages Enqueued: 訊息排列,這個只增不見,代表已經處理多少訊息

Messages Dequeued: 訊息出隊。

三、釋出/訂閱者模式

在上面點對點程式碼的基礎上,添加發布/訂閱相關程式碼

1.appliaction.properties檔案

#訊息佇列預設是點對點的,如果需要釋出/訂閱模式那麼需要加上下面註解(如果同時需要點對點發布訂閱這裡也需註釋掉)
spring.jms.pub-sub-domain=true

2.Springboot主類新增

//新建一個topic佇列
 @Bean
 public Topic topic(){
  return new ActiveMQTopic("video.topic");
 }

3.新增多個消費者類

//這裡定義了三個消費者
@Component
public class TopicSub {
 
 @JmsListener(destination="video.topic")
 public void receive1(String text){
  System.out.println("video.topic 消費者:receive1="+text);
 }
  
 @JmsListener(destination="video.topic")
 public void receive2(String text){
  System.out.println("video.topic 消費者:receive2="+text);
 }
  
 @JmsListener(destination="video.topic")
 public void receive3(String text){
  System.out.println("video.topic 消費者:receive3="+text);
 } 
}

4.Service類

 //功能描述:訊息釋出者
 public void publish(String msg);

5.ServiceImpl實現類

//=======釋出訂閱相關程式碼=========
 
  @Autowired
  private Topic topic;
    
   @Override
  public void publish(String msg) {
   this.jmsTemplate.convertAndSend(this.topic,msg);
   
  }

6.Controller類

// 這個佇列是新建的一個名為two.queue的點對點訊息佇列
  @GetMapping("topic")
  public Object topic(String msg){

   producerService.publish(msg);
   
   return "Success";
  }

7.演示效果:

Springboot整合Active訊息佇列

從演示效果總結如下:

1:Springboot啟動的時候,在Topics目錄下,一共出現了5個消費者。first.queue一個消費者、two.queue一個消費者、video.topic三個消費者

2:當我在控制檯輸入資訊後,video.topic的三個消費者都會監聽video.topic釋出的訊息,並在控制檯列印。

四、如何讓點對點和釋出訂閱同時有效

為什麼這麼說呢,因為當我向上面一樣同時開啟,會發現點對點模式已經失效了。

效果演示

Springboot整合Active訊息佇列

從演示效果,可以得出如下結論:

1:我們發現我們在頁面輸入..../two?msg=555訊息後,後臺並沒有成功列印訊息。再看Active介面發現,這個queue物件,確實有一條待處理的訊息,但是我們發現,它對應的消費者數量是為0.

2:然而我們在開啟topic頁面發現,這裡卻存在一個消費者。

所以我個人理解是,當同時啟動的時候,所產生的消費者預設都是Topic消費者,沒有Queue消費者,所以它監聽不到queue所待處理的訊息。

當配置檔案不加:spring.jms.pub-sub-domain=true 那麼系統會預設支援quene(點對點模式),但一旦加上這段配置,系統又變成只支援釋出訂閱模式。

那如何同時都可以成功呢?

思路如下:

第一步:還是需要去掉配置檔案中的:

#訊息佇列預設是點對點的,如果需要釋出/訂閱模式那麼需要加上下面註解(如果同時需要點對點發布訂閱這裡也需註釋掉)
#spring.jms.pub-sub-domain=true

第二步:在釋出訂閱者的中消費者中指定獨立的containerFactory

因為你去掉上面的配置,那麼系統就預設是queue,所以@JmsListener如果不指定獨立的containerFactory的話是隻能消費queue訊息

@JmsListener(destination="video.topic",containerFactory="jmsListenerContainerTopic")
 public void receive1(String text){
  System.out.println("video.topic 消費者:receive1="+text);
 }
 
 
 @JmsListener(destination="video.topic",containerFactory="jmsListenerContainerTopic")
 public void receive2(String text){
  System.out.println("video.topic 消費者:receive2="+text);
 }
 
 //第三步我不新增containerFactory="jmsListenerContainerTopic"看等下是否會打印出
 @JmsListener(destination="video.topic")
 public void receive3(String text){
  System.out.println("video.topic 消費者:receive3="+text);
 }

第三步:定義獨立的topic定義獨立的JmsListenerContainer

在springboot主類中新增:

@Bean
  public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
   DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
   bean.setPubSubDomain(true);
   bean.setConnectionFactory(activeMQConnectionFactory);
   return bean;
  }

效果:

Springboot整合Active訊息佇列

得出結論:

1:點對點,和釋出訂閱都有用

2:receive3沒有指定獨立的containerFactory一樣沒有打印出來。

原始碼
github地址:https://github.com/yudiandemingzi/springbootAcitveMQ