1. 程式人生 > >解決Springboot整合ActiveMQ傳送和接收topic訊息的問題

解決Springboot整合ActiveMQ傳送和接收topic訊息的問題

環境搭建

1.建立maven專案(jar)

2.pom.xml新增依賴

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.4.0.RELEASE</version>
    </parent>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
    </dependencies>

 

3.編寫引導類

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class,args);
    }
}

 

4.在resources下的application.properties配置檔案中新增對應的配置

spring.activemq.broker-url=tcp://192.168.25.131:61616
#此url為activeMQ所在伺服器的連結

 

5.在類中注入JmsMessageTemplate

@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;

6.呼叫JmsMessageTemplate的方法傳送訊息

jmsMessagingTemplate.convertAndSend(“queue訊息名稱”,"訊息內容");

設定訊息傳送型別

在引導類中配置訊息型別

Queue

@Bean(name="queue")
public Destination getQueue(){
    return new ActiveMQQueue("queue_test");
}

Topic

@Bean(name="topic")
public Destination getTopic(){
    return new ActiveMQTopic("topic_test");
}

類中注入傳送訊息型別

在傳送訊息的類中注入傳送訊息型別物件Destination

Queue

@Resource(name="queue")
private Destination queue;

Topic

@Autowired
@Qualifier(value="topic")
private Destination topic;

訊息傳送

jmsMessageTemplate.convertAndSend(queue,"訊息內容");
jmsMessageTemplate.convertAndSend(topic,"訊息內容");

編寫消費者

@Component
public class ActiveMQConsumer {
    //接收queue訊息
    @JmsListener(destination = "queue_test")
    public void handler(String message){
        System.out.println(message);
    }
    //接收topic訊息
    @JmsListener(destination = "topic_test")
    public void handlerTopic(String msessage){
        System.out.println(msessage);
    }
}

啟動測試

controller類的方法中新增
@RequestMapping("/send")
public void sendQueue(){
    jmsMessagingTemplate.convertAndSend(queue,"這是Queue的訊息");
    jmsMessagingTemplate.convertAndSend(topic,"這是Topic的訊息");
}

執行後,在控制檯只輸出了Queue的訊息

問題

Springboot整合ActiveMQ模式只能監聽Queue佇列的訊息進行處理,所以如何處理topic訊息?

解決:

在Springboot的application.properties檔案中新增如下內容

spring.jms.pub-sub-domain=true   //預設是false,開啟發布訂閱模式

啟動測試

經過上述修改,又只能監聽topic的訊息,queue的訊息又無法獲取。

解決:

只有通過自定義監聽器類來處理

在監聽器類的@JmsListener新增connectionFactory屬性

@Component
public class ActiveMQConsumer {
    //接收queue訊息
   @JmsListener(destination = "queue_test",containerFactory =     
                   "queueListenerContainerFactory")
    public void handler(String message){
        System.out.println(message);
    }
    //接收topic訊息
    @JmsListener(destination = "topic_test",containerFactory = 
                "topicListenerContainerFactory")
    public void handlerTopic(String msessage){
        System.out.println(msessage);
    }
}

建立一個配置類,在配置類中提供兩個監聽工廠配置

@Configuration
public class ConsumerConfiguration {

    @Value("${spring.activemq.broker-url}")
    private String host;

    @Bean
    public ConnectionFactory getActiveMqConnection(){
        return new ActiveMQConnectionFactory(host);
    }

    @Bean(name="queueListenerContainerFactory")
    public JmsListenerContainerFactory queueListenerContailerFactory(ConnectionFactory connectionFactory){
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setPubSubDomain(false);
        return factory;
    }
    @Bean(name="topicListenerContainerFactory")
    public JmsListenerContainerFactory topicListenerContainerFactory(ConnectionFactory connectionFactory){
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setPubSubDomain(true);
        return factory;
    }
}

執行測試

&n