Spring整合RabbitMQ-03-SimpleMessageListenerContainer
阿新 • • 發佈:2020-06-24
SimpleMessageListenerContainer
- 簡單訊息監聽容器,這個類非常強大,我們可以對他進行很多設定,對於訊息的配置項,這個類都可以滿足;
- 監聽佇列(多個對列),自動啟動,自動宣告功能
- 設定事務特性,事務管理器,事務屬性。事務容量(併發),是否開啟事務,回滾訊息等
- 設定消費者的數量,最大最小數量,批量消費等
- 設定訊息確認和自動確認模式,是否重回佇列,異常捕獲handler函式
- 設定消費者標籤生成策略,是否獨佔模式,消費者屬性
- 設定具體的監聽器,訊息轉換器等等
注意
SimpleMessageListenerContainer
可以進行動態設定,比如在執行中可以動態修改其消費者數量的大小,接收訊息的模式等。很多基於RabbitMQ的制定化後端管理控制檯在進行動態設定的時候,也是根據這一特性去實現的。
程式碼
package com.wyg.rabbitmq.springamqp;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.support.ConsumerTagStrategy;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.rabbitmq.client.Channel;
/**
* RabbitAdmin
*
* @author [email protected]
* @date 2019-11-25 15:11
* @since JDK1.8
* @version V1.0
*/
@Configuration
public class RabbitConfig {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setAddresses("localhost:5672");
cachingConnectionFactory.setUsername("guest");
cachingConnectionFactory.setPassword("guest");
cachingConnectionFactory.setVirtualHost("/");
return cachingConnectionFactory;
}
/**
* SimpleMessageListenerContainer注入
*
* @param connectionFactory
* @return
* @throws @author
* [email protected]
* @date 2019/11/25 17:16
*/
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
// 監聽多個queue
container.addQueueNames("test01","test02","test03");
// 設定當前消費者數量
container.setConcurrentConsumers(1);
// 設定最大的消費者數量
container.setMaxConcurrentConsumers(5);
// 設定不要重回佇列
container.setDefaultRequeueRejected(false);
// 設定自動簽收
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
// 設定消費端tag策略
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + System.currentTimeMillis();
}
});
// 設定監聽
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message,Channel channel) throws Exception {
// 訊息處理
String msg = new String(message.getBody(),"UTF-8");
System.out.println("---消費者---佇列名:" + message.getMessageProperties().getConsumerQueue() + ",訊息:" + msg
+ ",deliveryTag:" + message.getMessageProperties().getDeliveryTag());
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);;
}
});
return container;
}
}
複製程式碼
單元測試
package com.wyg.rabbitmq.springamqp;
import java.io.*;
import java.lang.reflect.Field;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.wyg.rabbitmq.springamqp.convert.Order;
import com.wyg.rabbitmq.springamqp.convert.User;
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitConfigTest {
@Autowired
RabbitAdmin rabbitAdmin;
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private SimpleMessageListenerContainer simpleMessageListenerContainer;
@Test
public void testSimpleMessageListenerContainerSendMsg() {
// 分別向 佇列 "test01","test02","test03" 發訊息,"test01",
// "test03"與springdemo.direct已經繫結,routingKey都為orderRoutingKey
for (int i = 0; i < 3; i++) {
rabbitTemplate.convertAndSend("springdemo.direct","orderRoutingKey",("第" + i + "條訊息").getBytes());
}
}
}
複製程式碼