1. 程式人生 > 其它 >SpringBoot整合activeMQ的簡單案例

SpringBoot整合activeMQ的簡單案例

ActiveMQ是什麼

ActiveMQ是訊息佇列技術,為解決高併發問題而生
ActiveMQ生產者消費者模型(生產者和消費者可以跨平臺、跨系統)
ActiveMQ支援如下兩種訊息傳輸方式
點對點模式,生產者生產了一個訊息,只能由一個消費者進行消費
釋出/訂閱模式,生產者生產了一個訊息,可以由多個消費者進行消費

SpringBoot的簡單整合

1. ActiveMQ下載啟動

http://activemq.apache.org/download-archives.html ,本文用的是windows版的5.15.3版本,下載下來是壓縮包,自行解壓一個到目錄下,CMD進入到解壓目錄下的bin目錄下,執行 activemq.bat start 啟動。如果能成功訪問http://localhost:8161/admin(使用者名稱和密碼預設為admin),則啟動成功。

2.新增依賴

建立兩個springboot專案,分別作為訊息提供者(provider)和消費者(consumer),新增依賴

<dependency>
 	<groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!--訊息佇列連線池-->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-pool</artifactId>
    <version>5.15.0</version>
</dependency>

3. boot_mq_product

啟動類加入 @EnableJms

application.yml
server:
  port: 7777
spring:
  application:
    name: boot_mq_product
  activemq:
    broker-url: tcp://127.0.0.1:61616 #MQ伺服器地址
    user: admin
    password: admin
  jms:
    pub-sub-domain: true #false = queue   true = topic

# 自己定義佇列名稱(P2P)
myqueue: boot-activemq-queue
# 釋出/訂閱模式的名稱
mytopic: boot-activemq-topic
Queue_Produce
@RestController
public class Queue_Produce {

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    @Value("${myqueue}")
    private String myQueue;

    @PostMapping("send")
    public void produceMsg(){
        jmsMessagingTemplate.convertAndSend(myQueue,"我是一條訊息");
        System.out.println("訊息推送到MQ成功");
    }

    @PostMapping("send1")
    public void produceMsg1(){
        User user = new User(1,"張三","123456");
        String s = JSON.toJSONString(user);
        jmsMessagingTemplate.convertAndSend(myQueue,s);
        System.out.println("User推送到MQ成功");
    }
}
Topic_Produce
@RestController
public class Topic_Produce {
    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    @Value("${mytopic}")
    private String mytopic;

    @PostMapping("sendTopic")
    public void sendTopic() {
        String msg="傳送Topic訊息內容 :"+"我是一條訂閱訊息";
        jmsMessagingTemplate.convertAndSend(mytopic, msg);
        System.out.println("訂閱訊息傳送成功");
    }
}

boot_mq_consumer

application.yml
server:
  port: 7778 # 埠號
spring:
  application:
    name: boot_mq_consumer
  activemq:
    broker-url: tcp://127.0.0.1:61616 # 自己的MQ伺服器地址,用自己的
  jms:
    pub-sub-domain: true  # false = Queue true = Topic

# 自己定義的佇列名稱(P2P)
myqueue: boot-activemq-queue
# 釋出/訂閱模式的名稱
mytopic: boot-activemq-topic

Queue_Consumer

@Component
public class Queue_Consumer {
    /**
     * 使用JmsListener配置消費者監聽的佇列,其中name是接收到的訊息
     * @param
     * @return
     */
    @JmsListener(destination = "${myqueue}")
    public void handleMessage(String message) {
        System.out.println("成功接受name: " + message);
    }
}

Topic_Consumer

@Component
public class Topic_Consumer {
    /**
     * 使用JmsListener配置消費者監聽的佇列,其中name是接收到的訊息
     * @param
     * @return
     */
    @JmsListener(destination = "${mytopic}")
    public void handleMessage1(String message) {
        System.out.println("消費者1成功接受: " + message);
    }

    /**
     * 使用JmsListener配置消費者監聽的佇列,其中name是接收到的訊息
     * @param
     * @return
     */
    @JmsListener(destination = "${mytopic}")
    public void handleMessage2(String message) {
        System.out.println("消費者2成功接受: " + message);
    }
}

注意:
使用訂閱/釋出方式時,需修改配置檔案中的 pub-sub-domain 引數 (activeMQ預設使用P2P方式)

同時使用點對點和釋出/訂閱模式

1.修改配置檔案

中的 pub-sub-domain引數為true

2.consumer的啟動類
@Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerTopic(@Qualifier("jmsConnectionFactory") ConnectionFactory activeMQConnectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setPubSubDomain(true);
        bean.setConnectionFactory(activeMQConnectionFactory);
        return bean;
    }
3.修改Topic_Consumer
@Component
public class Topic_Consumer {
    /**
     * 使用JmsListener配置消費者監聽的佇列,其中name是接收到的訊息
     * @param
     * @return
     */
    @JmsListener(destination = "${mytopic}", containerFactory = "jmsListenerContainerTopic")
    public void handleMessage1(String message) {
        System.out.println("消費者1成功接受: " + message);
    }

    /**
     * 使用JmsListener配置消費者監聽的佇列,其中name是接收到的訊息
     * @param
     * @return
     */
    @JmsListener(destination = "${mytopic}", containerFactory = "jmsListenerContainerTopic")
    public void handleMessage2(String message) {
        System.out.println("消費者2成功接受: " + message);
    }
}