1. 程式人生 > >SpringBoot整合ActiveMQ訊息佇列和雙向佇列、點對點與釋出訂閱

SpringBoot整合ActiveMQ訊息佇列和雙向佇列、點對點與釋出訂閱

ActiveMQ 是Apache出品,最流行的,能力強勁的開源訊息匯流排。ActiveMQ 是一個完全支援JMS1.1和J2EE 1.4規範的 JMS Provider實現,儘管JMS規範出臺已經是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演著特殊的地位。

 1.新增SpringBoot整合ActiveMQ所需依賴 

<!-- activeMQ-->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

2.配置application.properties檔案

## URL of the ActiveMQ broker. Auto-generated by default. For instance `tcp://localhost:61616`
# failover:(tcp://localhost:61616,tcp://localhost:61617)
# tcp://localhost:61616
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.in-memory=true
#如果此處設定為true,需要加如下的依賴包,否則會自動配置失敗,報JmsMessagingTemplate注入失敗
spring.activemq.pool.enabled=false
#預設情況下activemq提供的是queue模式,若要使用topic模式需要配置下面配置
#spring.jms.pub-sub-domain=true

3.在啟動類中使用同步、非同步訊息佇列

加入@EnableJms註解就是非同步,沒有加 @EnableJms註解則預設是同步。

@EnableJms
@SpringBootApplication
public class QuartzsApplication {

	public static void main(String[] args) {
		SpringApplication.run(QuartzsApplication.class, args);
	}
}

4.點對點模式和釋出訂閱模式

  • 點對點模式:生產者傳送一條訊息到queue,只有一個消費者能收到。
  • 釋出訂閱模式:釋出者傳送到topic的訊息,只有訂閱了topic的訂閱者才會收到訊息。

在JMS中,TOPIC實現了分發和訂閱,當你分發一個訊息,所有訂閱這個訊息的服務都能得到這個服務,所以從0到許多個訂閱者都能得到一個訊息的拷貝,只有在訊息代理收到訊息時有一個有效訂閱時的訂閱者才能得到這個訊息的拷貝。

JMS Queue實現了負載均衡,一個訊息只能被一個消費者接受,當沒有消費者可用時,這個訊息會被儲存直到有 一個可用的消費者,一個queue可以有很多消費者,他們之間實現了負載均衡,所以Queue實現了一個可靠的JMS負載均衡。

5.Producer訊息生產者

package com.primeton.quartzs.activeMQ;

import javax.jms.Destination;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;

import java.util.ArrayList;

@Service("producer")
public class Producer {
    @Autowired // 也可以注入JmsTemplate,JmsMessagingTemplate對JmsTemplate進行了封裝
    private JmsMessagingTemplate jmsTemplate;
    // 傳送訊息,destination是傳送到的佇列,message是待發送的訊息
    public void sendMessage(Destination destination, final String message){
        jmsTemplate.convertAndSend(destination, message);
    }

    @JmsListener(destination="out.queue")//實現雙向佇列
    public void consumerMessage(String text){
        System.out.println("從out.queue佇列收到的回覆報文為:"+text);
    }

}

6.Consumer兩個訊息生產者

package com.primeton.quartzs.activeMQ;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer {
    // 使用JmsListener配置消費者監聽的佇列,其中text是接收到的訊息
    @JmsListener(destination = "mytest.queue")
    public void receiveQueue(String text) {
        System.out.println("Consumer收到的報文為:"+text);
    }
}
package com.primeton.quartzs.activeMQ;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

@Component
public class Consumer2 {

    @JmsListener(destination = "mytest.queue")
    @SendTo("out.queue")
    public String receiveQueue(String text) {
        System.out.println("Consumer2收到的報文為:"+text);
        return "return message"+text;
    }
}

7.實現訊息佇列和雙向佇列

在生產者上加入out.queue

 @JmsListener(destination="out.queue")//實現雙向佇列
    public void consumerMessage(String text){
        System.out.println("從out.queue佇列收到的回覆報文為:"+text);
    }

消費者註解@SendTo("out.queue")

@JmsListener(destination = "mytest.queue")
    @SendTo("out.queue")
    public String receiveQueue(String text) {
        System.out.println("Consumer2收到的報文為:"+text);
        return "return message"+text;
}

8.實現釋出訂閱

在消費者的註解@JmsListener加上containerFactory

// 使用JmsListener配置消費者監聽的佇列,其中text是接收到的訊息
@JmsListener(destination = "mytest.queue", containerFactory = "jmsListenerContainerQueue")
public void receiveQueue(String text) {
    System.out.println("Consumer收到的報文為:"+text);
}

@JmsListener(destination = "mytest.topic", containerFactory = "jmsListenerContainerTopic")
public void testTopicCusumer(String test){
    System.out.println(test);
}

9.編寫測試類

package com.primeton.quartzs;

import com.primeton.quartzs.activeMQ.Producer;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import javax.jms.Destination;

@RunWith(SpringRunner.class)
@SpringBootTest
public class QuartzsApplicationTests {

	@Autowired
	private Producer producer;

	@Test
	public void contextLoads() {
		Destination destination = new ActiveMQQueue("mytest.queue");
		for(int i=0; i<10; i++){
			producer.sendMessage(destination, "myname is chhliu!!!");
		}
	}
}

10.檢視測試結果

Consumer2收到的報文為:myname is chhliu!!!
從out.queue佇列收到的回覆報文為:return messagemyname is chhliu!!!
Consumer收到的報文為:myname is chhliu!!!
Consumer2收到的報文為:myname is chhliu!!!
從out.queue佇列收到的回覆報文為:return messagemyname is chhliu!!!
Consumer收到的報文為:myname is chhliu!!!
Consumer2收到的報文為:myname is chhliu!!!
從out.queue佇列收到的回覆報文為:return messagemyname is chhliu!!!
Consumer收到的報文為:myname is chhliu!!!
Consumer2收到的報文為:myname is chhliu!!!
Consumer收到的報文為:myname is chhliu!!!
從out.queue佇列收到的回覆報文為:return messagemyname is chhliu!!!
Consumer2收到的報文為:myname is chhliu!!!
從out.queue佇列收到的回覆報文為:return messagemyname is chhliu!!!
Consumer收到的報文為:myname is chhliu!!!

11.在application.properties下配置activeMQ時需要注意地方

## URL of the ActiveMQ broker. Auto-generated by default. For instance `tcp://localhost:61616`
# failover:(tcp://localhost:61616,tcp://localhost:61617)
# tcp://localhost:61616
#spring.activemq.broker-url=tcp://localhost:61616
#true時用內建activeMQ,否則用自己本機安裝的activeMQ
spring.activemq.in-memory=true
#如果此處設定為true,需要加如下的依賴包,否則會自動配置失敗,報JmsMessagingTemplate注入失敗
spring.activemq.pool.enabled=false
#預設情況下activemq提供的是queue模式,若要使用topic模式需要配置下面配置
#spring.jms.pub-sub-domain=true

如果大家有什麼問題,可以在下方留言,想要原始碼的可以到我的資源庫下載:連結地址

 

---------------------------------------------------------一個人的態度,決定他的高度。---------------------------------------------------------