1. 程式人生 > 程式設計 >Spring整合RabbitMQ-03-SimpleMessageListenerContainer

Spring整合RabbitMQ-03-SimpleMessageListenerContainer

SimpleMessageListenerContainer

  1. 簡單訊息監聽容器,這個類非常強大,我們可以對他進行很多設定,對於訊息的配置項,這個類都可以滿足;
  2. 監聽佇列(多個對列),自動啟動,自動宣告功能
  3. 設定事務特性,事務管理器,事務屬性。事務容量(併發),是否開啟事務,回滾訊息等
  4. 設定消費者的數量,最大最小數量,批量消費等
  5. 設定訊息確認和自動確認模式,是否重回佇列,異常捕獲handler函式
  6. 設定消費者標籤生成策略,是否獨佔模式,消費者屬性
  7. 設定具體的監聽器,訊息轉換器等等

注意

SimpleMessageListenerContainer可以進行動態設定,比如在執行中可以動態修改其消費者數量的大小,接收訊息的模式等。很多基於RabbitMQ的制定化後端管理控制檯在進行動態設定的時候,也是根據這一特性去實現的。

程式碼

package com.wyg.rabbitmq.springamqp;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import
org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.amqp.support.ConsumerTagStrategy; import org.springframework.context.annotation.Bean; import
org.springframework.context.annotation.Configuration; import com.rabbitmq.client.Channel; /** * RabbitAdmin * * @author [email protected] * @date 2019-11-25 15:11 * @since JDK1.8 * @version V1.0 */ @Configuration public class RabbitConfig { @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(); cachingConnectionFactory.setAddresses("localhost:5672"); cachingConnectionFactory.setUsername("guest"); cachingConnectionFactory.setPassword("guest"); cachingConnectionFactory.setVirtualHost("/"); return cachingConnectionFactory; } /** * SimpleMessageListenerContainer注入 * * @param connectionFactory * @return * @throws @author * [email protected] * @date 2019/11/25 17:16 */ @Bean public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); // 監聽多個queue container.addQueueNames("test01","test02","test03"); // 設定當前消費者數量 container.setConcurrentConsumers(1); // 設定最大的消費者數量 container.setMaxConcurrentConsumers(5); // 設定不要重回佇列 container.setDefaultRequeueRejected(false); // 設定自動簽收 container.setAcknowledgeMode(AcknowledgeMode.AUTO); // 設定消費端tag策略 container.setConsumerTagStrategy(new ConsumerTagStrategy() { @Override public String createConsumerTag(String queue) { return queue + "_" + System.currentTimeMillis(); } }); // 設定監聽 container.setMessageListener(new ChannelAwareMessageListener() { @Override public void onMessage(Message message,Channel channel) throws Exception { // 訊息處理 String msg = new String(message.getBody(),"UTF-8"); System.out.println("---消費者---佇列名:" + message.getMessageProperties().getConsumerQueue() + ",訊息:" + msg + ",deliveryTag:" + message.getMessageProperties().getDeliveryTag()); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);; } }); return container; } } 複製程式碼

單元測試

package com.wyg.rabbitmq.springamqp;

import java.io.*;
import java.lang.reflect.Field;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.wyg.rabbitmq.springamqp.convert.Order;
import com.wyg.rabbitmq.springamqp.convert.User;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitConfigTest {
    @Autowired
    RabbitAdmin rabbitAdmin;
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private SimpleMessageListenerContainer simpleMessageListenerContainer;

 
    @Test
    public void testSimpleMessageListenerContainerSendMsg() {
        // 分別向 佇列 "test01","test02","test03" 發訊息,"test01",
        // "test03"與springdemo.direct已經繫結,routingKey都為orderRoutingKey
        for (int i = 0; i < 3; i++) {
            rabbitTemplate.convertAndSend("springdemo.direct","orderRoutingKey",("第" + i + "條訊息").getBytes());

        }
    }

}
複製程式碼