1. 程式人生 > >springboot與ActiveMQ整合

springboot與ActiveMQ整合

nds 消息 ring str and info 開啟 int ng-

前言

  很多項目, 都不是一個系統就做完了. 而是好多個系統, 相互協作來完成功能. 那, 系統與系統之間, 不可能完全獨立吧?

  如: 在學校所用的管理系統中, 有學生系統, 資產系統, 宿舍系統等等. 當學期結束之後, 是否需要對已經結束的期次進行歸檔操作. 假如歸檔功能在學生系統中, 那點擊歸檔之後, 學生是不是還要關心宿舍那邊是否已結束, 學生所領資產是否全都歸還?

  顯然, 這並不是一個好的方式, 系統之間的耦合性做的太強了, 很不利於系統擴展, 而且, 一步操作, 可能要等很久很久, 才能完成. 用戶可願意等?

  既然同步歸檔不可能了, 那是否有辦法實現異步歸檔? 異步歸檔怎麽實現呢?

  我們其實可以通過消息隊列來實現異步歸檔. 學生這邊點擊歸檔後, 發個消息到隊列中, 其他系統自行去讀取, 然後完成各自系統應該完成的工作.

ActiveMQ下載安裝

  下載地址: http://activemq.apache.org/download.html

  安裝過程比較簡單, 在centos中, 解壓出來, 就算是安裝好了

  運行方法:

技術分享圖片

  運行起來後, 可以通過 ip:8161 來查看是否成功.

  技術分享圖片

點擊紅框中的鏈接, 會出現登錄彈框, 賬號密碼默認都是admin.

技術分享圖片

springboot整合activemq

一. 目錄結構

技術分享圖片

producer : 消息生產者

consumer-a : 消息消費者

consumer-b : 消息消費者

pom文件:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

如果使用pool的話, 就需要在pom中加入以下依賴:

<dependency>
     <groupId>org.apache.activemq</
groupId> <artifactId>activemq-pool</artifactId> <version>5.14.5</version> </dependency>

二. producer

1. 目錄結構

技術分享圖片

2. yml文件:

server:
  port: 8080
  context-path: /pro
spring:
  activemq:
    user: admin
    password: admin
    broker-url: tcp://192.168.153.129:61616
    pool:
      enabled: true
      max-connections: 10

queueName: publish.queue
topicName: publish.topic

這裏我開啟了連接池, 默認是不開的.

這裏要註意端口, 不是之前的8161.

2. 配置文件 ActiveMQConfig

/**
 * @author: elvin
 */
@Configuration
public class ActiveMQConfig {
    @Value("${queueName}")
    private String queueName;

    @Value("${topicName}")
    private String topicName;

    @Value("${spring.activemq.user}")
    private String usrName;

    @Value("${spring.activemq.password}")
    private  String password;

    @Value("${spring.activemq.broker-url}")
    private  String brokerUrl;

    @Bean
    public Queue queue(){
        return new ActiveMQQueue(queueName);
    }

    @Bean
    public Topic topic(){
        return new ActiveMQTopic(topicName);
    }

    @Bean
    public ActiveMQConnectionFactory connectionFactory() {
        return new ActiveMQConnectionFactory(usrName, password, brokerUrl);
    }

    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory){
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setConnectionFactory(connectionFactory);
        return bean;
    }

    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory){
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
//設置為發布訂閱方式, 默認情況下使用的生產消費者方式 bean.setPubSubDomain(
true); bean.setConnectionFactory(connectionFactory); return bean; } }

這裏本來不需要配置這麽多的, 但是在consumer中也會用到, 所以就暫時弄一份一樣的, 拷貝一下完事.

3. PublishController

/**
 * @author: elvin
 */
@RestController
@RequestMapping("/publish")
public class PublishController {

    @Autowired
    private JmsMessagingTemplate jms;

    @Autowired
    private Queue queue;

    @Autowired
    private Topic topic;

    @RequestMapping("/queue")
    public String queue(){

        for (int i = 0; i < 10 ; i++){
            jms.convertAndSend(queue, "queue"+i);
        }

        return "queue 發送成功";
    }

    @JmsListener(destination = "out.queue")
    public void consumerMsg(String msg){
        System.out.println(msg);
    }

    @RequestMapping("/topic")
    public String topic(){

        for (int i = 0; i < 10 ; i++){
            jms.convertAndSend(topic, "topic"+i);
        }

        return "topic 發送成功";
    }
}

三. consumer

1. 目錄結構

技術分享圖片

a,b是一樣的, 只是顯示的信息不同.

2. 配置文件

yml配置文件是一樣的, 只是修改了端口和context-path.

ActiveMQConfig文件內容是一樣的.

3. listener

/**
 * @author: elvin
 */
@Component
public class QueueListener {

    @JmsListener(destination = "publish.queue", containerFactory = "jmsListenerContainerQueue")
    @SendTo("out.queue")
    public String receive(String text){
        System.out.println("QueueListener: consumer-a 收到一條信息: " + text);
        return "consumer-a received : " + text;
    }
}

SendTo 會將此方法返回的數據, 寫入到 queue : out.queue 中去.

/**
 * @author: elvin
 */
@Component
public class TopicListener {

    @JmsListener(destination = "publish.topic", containerFactory = "jmsListenerContainerTopic")
    public void receive(String text){
        System.out.println("TopicListener: consumer-a 收到一條信息: " + text);
    }
}

這裏通過傳入不同的factory, 來實現發送不同類型的信息.

四. 測試

1. queue測試

瀏覽器中訪問: http://localhost:8080/pro/publish/queue

技術分享圖片

然後看一下, 控制臺, 那些用戶接收到了信息.

技術分享圖片

技術分享圖片

從上兩幅圖看的出來, a, b並不能同時接收數據. 這是queue的方式, 點對點.

技術分享圖片

那我想點對面, 怎麽辦?

2. topic測試

瀏覽器訪問頁面: http://localhost:8080/pro/publish/topic

技術分享圖片

技術分享圖片

a用戶完全接收到信息了. 再看看b用戶

技術分享圖片

沒毛病, 也都接收到數據了.

topic默認情況下, 是不會保存數據的, 也就是說, consumer是接收不到之前未接收到的信息.

而queue卻是可以的.

但是, topic並不是不能實現那個功能, 只要配置一下, 還是可以的.

springboot與ActiveMQ整合