Springboot整合Active訊息佇列
簡單理解:
Active是Apache公司旗下的一個訊息匯流排,ActiveMQ是一個開源相容Java Message Service(JMS) 面向訊息的中件間. 是一個提供鬆耦合的應用程式架構.
主要用來在服務與服務之間進行非同步通訊的。
一、搭建步驟
1、相應jar包
<!-- 整合訊息佇列ActiveMQ --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <!-- 如果配置執行緒池則加入 --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> </dependency>
2、application.properties檔案
#整合jms測試,安裝在別的機器,防火牆和埠號記得開放 spring.activemq.broker-url=tcp://47.96.44.110:61616 spring.activemq.user=admin spring.activemq.password=admin #下列配置要增加依賴 spring.activemq.pool.enabled=true spring.activemq.pool.max-connections=100 #叢集配置(後續需要在配上) #spring.activemq.broker-url=failover:(tcp://localhost:61616,tcp://localhost:61617) #訊息佇列預設是點對點的,如果需要釋出/訂閱模式那麼需要加上下面註解(如果同時需要點對點發布訂閱這裡也需註釋掉) # spring.jms.pub-sub-domain=true
3、Springboot主類
<!-- 主類需要多加一個@EnableJms註解,不過貌似我沒有加的時候,也能執行,為安全起見姑且加上 --> @SpringBootApplication @EnableJms
4.5.......根據不同訊息模式來寫了。
二、點對點案例
我在這裡案例中建立了兩個點對點佇列,所以他會有兩個queue物件,同樣對應每個queue物件,都會有單一對應的消費者。
1、Springboot主類
@SpringBootApplication @EnableJms public class Main { public static void main(String[] args) { SpringApplication.run(Main.class,args); } //新建一個的Queue物件,交給sringboot管理,這個queue的名稱叫"first.queue". @Bean public Queue queue(){ return new ActiveMQQueue("first.queue"); } }
2.1、first.queue對應消費者
@Component public class FirstConsumer { //名為"first.queue"訊息佇列的消費者,通過JmsListener進行監聽有沒有訊息,有訊息會立刻讀取過來 @JmsListener(destination="first.queue") public void receiveQueue(String text){ System.out.println("FirstConsumer收到的報文為:"+text); } }
2.2、two.queue對應消費者(後面會建立)
@Component public class TwoConsumer { //名為"two.queue"訊息佇列的消費者 @JmsListener(destination="two.queue") public void receiveQueue(String text){ System.out.println("TwoConsumer收到的報文為:"+text); } }
3、Service類
/** * 功能描述:訊息生產 */ public interface ProducerService { // 功能描述:指定訊息佇列,還有訊息 public void sendMessage(Destination destination,final String message); // 功能描述:使用預設訊息佇列, 傳送訊息 public void sendMessage( final String message); }
4、ServiceImpl實現類
/** * 功能描述:訊息生產者實現類 */ @Service public class ProducerServiceImpl implements ProducerService{ //這個佇列就是Springboot主類中bean的物件 @Autowired private Queue queue; //用來發送訊息到broker的物件,可以理解連線資料庫的JDBC @Autowired private JmsMessagingTemplate jmsTemplate; //傳送訊息,destination是傳送到的佇列,message是待發送的訊息 @Override public void sendMessage(Destination destination,String message) { jmsTemplate.convertAndSend(destination,message); } //傳送訊息,queue是傳送到的佇列,message是待發送的訊息 @Override public void sendMessage(final String message) { jmsTemplate.convertAndSend(this.queue,message); } }
5.QueueController類
/** * 功能描述:點對點訊息佇列控制層 */ @RestController @RequestMapping("/api/v1") public class QueueController { @Autowired private ProducerService producerService; // 這裡後面呼叫的是Springboot主類的quene佇列 @GetMapping("first") public Object common(String msg){ producerService.sendMessage(msg); return "Success"; } // 這個佇列是新建的一個名為two.queue的點對點訊息佇列 @GetMapping("two") public Object order(String msg){ Destination destination = new ActiveMQQueue("two.queue"); producerService.sendMessage(destination,msg); return "Success"; } }
6、案例演示:
從演示效果可以得出以下結論:
1:當springboot啟動時候,就生成了這兩個佇列,而且他們都會有一個消費者
2:當我通過頁面訪問的時候,就相當於生產者把訊息放到佇列中,一旦放進去就會被消費者監聽到,就可以獲取生產者放進去的值並在後臺打印出
順便對頁面中四個單詞進行解釋:
Number Of Pending Messages :待處理訊息的數量。我們每次都會被監聽處理掉,所以不存在待處理,如果存在就說這裡面哪裡出故障了,需要排查
Number Of Consumers :消費者數量
Messages Enqueued: 訊息排列,這個只增不見,代表已經處理多少訊息
Messages Dequeued: 訊息出隊。
三、釋出/訂閱者模式
在上面點對點程式碼的基礎上,添加發布/訂閱相關程式碼
1.appliaction.properties檔案
#訊息佇列預設是點對點的,如果需要釋出/訂閱模式那麼需要加上下面註解(如果同時需要點對點發布訂閱這裡也需註釋掉) spring.jms.pub-sub-domain=true
2.Springboot主類新增
//新建一個topic佇列 @Bean public Topic topic(){ return new ActiveMQTopic("video.topic"); }
3.新增多個消費者類
//這裡定義了三個消費者 @Component public class TopicSub { @JmsListener(destination="video.topic") public void receive1(String text){ System.out.println("video.topic 消費者:receive1="+text); } @JmsListener(destination="video.topic") public void receive2(String text){ System.out.println("video.topic 消費者:receive2="+text); } @JmsListener(destination="video.topic") public void receive3(String text){ System.out.println("video.topic 消費者:receive3="+text); } }
4.Service類
//功能描述:訊息釋出者 public void publish(String msg);
5.ServiceImpl實現類
//=======釋出訂閱相關程式碼========= @Autowired private Topic topic; @Override public void publish(String msg) { this.jmsTemplate.convertAndSend(this.topic,msg); }
6.Controller類
// 這個佇列是新建的一個名為two.queue的點對點訊息佇列 @GetMapping("topic") public Object topic(String msg){ producerService.publish(msg); return "Success"; }
7.演示效果:
從演示效果總結如下:
1:Springboot啟動的時候,在Topics目錄下,一共出現了5個消費者。first.queue一個消費者、two.queue一個消費者、video.topic三個消費者
2:當我在控制檯輸入資訊後,video.topic的三個消費者都會監聽video.topic釋出的訊息,並在控制檯列印。
四、如何讓點對點和釋出訂閱同時有效
為什麼這麼說呢,因為當我向上面一樣同時開啟,會發現點對點模式已經失效了。
效果演示
從演示效果,可以得出如下結論:
1:我們發現我們在頁面輸入..../two?msg=555訊息後,後臺並沒有成功列印訊息。再看Active介面發現,這個queue物件,確實有一條待處理的訊息,但是我們發現,它對應的消費者數量是為0.
2:然而我們在開啟topic頁面發現,這裡卻存在一個消費者。
所以我個人理解是,當同時啟動的時候,所產生的消費者預設都是Topic消費者,沒有Queue消費者,所以它監聽不到queue所待處理的訊息。
當配置檔案不加:spring.jms.pub-sub-domain=true 那麼系統會預設支援quene(點對點模式),但一旦加上這段配置,系統又變成只支援釋出訂閱模式。
那如何同時都可以成功呢?
思路如下:
第一步:還是需要去掉配置檔案中的:
#訊息佇列預設是點對點的,如果需要釋出/訂閱模式那麼需要加上下面註解(如果同時需要點對點發布訂閱這裡也需註釋掉) #spring.jms.pub-sub-domain=true
第二步:在釋出訂閱者的中消費者中指定獨立的containerFactory
因為你去掉上面的配置,那麼系統就預設是queue,所以@JmsListener如果不指定獨立的containerFactory的話是隻能消費queue訊息
@JmsListener(destination="video.topic",containerFactory="jmsListenerContainerTopic") public void receive1(String text){ System.out.println("video.topic 消費者:receive1="+text); } @JmsListener(destination="video.topic",containerFactory="jmsListenerContainerTopic") public void receive2(String text){ System.out.println("video.topic 消費者:receive2="+text); } //第三步我不新增containerFactory="jmsListenerContainerTopic"看等下是否會打印出 @JmsListener(destination="video.topic") public void receive3(String text){ System.out.println("video.topic 消費者:receive3="+text); }
第三步:定義獨立的topic定義獨立的JmsListenerContainer
在springboot主類中新增:
@Bean public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) { DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); bean.setPubSubDomain(true); bean.setConnectionFactory(activeMQConnectionFactory); return bean; }
效果:
得出結論:
1:點對點,和釋出訂閱都有用
2:receive3沒有指定獨立的containerFactory一樣沒有打印出來。
原始碼
github地址:https://github.com/yudiandemingzi/springbootAcitveMQ