【Spring Boot】(30)、SpringBoot整合RabbitMQ
1、安裝
1.1、Erlang:
Erlang下載地址,下載後安裝即可。
1.2、RabbitMQ安裝
RabbitMQ下載地址,下載後安裝即可。
注意:Erlang的版本要與RabbitMQ版本需要匹配才行。
RabbitMQ | Minimum required Erlang/OTP | Maximum supported Erlang/OTP |
---|---|---|
3.7.x | 19.3 | 20.3.x |
3.6.15 | 19.3 | 20.3.x |
3.6.14、 3.6.13 、 3.6.12、 3.6.11 | R16B03 | 20.1.x |
3.6.10 、 3.6.9、 3.6.8、 3.6.7、 3.6.6、 3.6.5、 3.6.4 | R16B03 | 19.3.x |
3.6.3 、 3.6.2、 3.6.1、 3.6.0 | R16B03 | 18.3.x |
3.5.x | R14B04 | 17.5.x |
3.4.x | R13B03 | 16B03 |
2、視覺化管理介面
-
Erlang和RabbitMQ安裝完成後,通過命令列進入到RabbitMQ的安裝目錄下的sbin目錄,輸入以下命令,等待返回。
-
rabbitmq-plugins enable rabbitmq_management
-
-
訪問http://ip:15672/,使用guest/guest或者admin/admin登入。
-
如果使用的是Linux系統,記得把防火牆的埠15672開放或者把防火牆關閉。
-
如果程式碼中需要使用新使用者作為測試,需要在
Admin
標籤頁中新建一個使用者,並同時設定密碼和virtual hosts
。
3、RabbitMQ術語
-
Server(Broker):接收客戶端連線,實現AMQP協議的訊息佇列和路由功能的程序;
-
Virtual Host:
-
Exchange:交換機,接收生產者傳送的訊息,並根據Routing Key將訊息路由到伺服器中的佇列Queue。
-
ExchangeType:交換機型別決定了路由訊息行為,RabbitMQ中主要有三種類型Exchange,分別是fanout、direct、topic;
-
Message Queue:訊息佇列,用於儲存還未被消費者消費的訊息;
-
Message:由Header和body組成,Header是由生產者新增的各種屬性的集合,包括Message是否被持久化、優先順序是多少、由哪個Message Queue接收等;body是真正需要傳送的資料內容;
-
BindingKey(RouteKey):繫結關鍵字,將一個特定的Exchange和一個特定的Queue繫結起來。
4、與Spring Boot的整合(簡單版HelloWorld)
4.1、引入RabbitMQ依賴
首先當然是新增RabbitMQ的依賴啦,從mvn repository找到SpringBoot整合RabbitMQ的整合包。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>${spring-boot-amqp.version}</version>
</dependency>
來看看其依賴情況:
4.2、新增RabbitMQ配置資訊
spring.rabbitmq.host=ip地址
spring.rabbitmq.port=5672
spring.rabbitmq.username=使用者名稱
spring.rabbitmq.password=密碼
spring.rabbitmq.virtual-host=/
spring.rabbitmq.publisher-confirms=true
這裡配置的資訊是最基礎的,複雜的配置可以自行百度嘗試。
4.3、建立訊息生產者
通過注入
AmqpTemplate
介面的例項來實現訊息的傳送,AmqpTemplate
介面定義了一套針對AMQP協議的基礎操作。在Spring Boot中會根據配置來注入其具體實現。
@Component
public class Sender {
@Autowired
private AmqpTemplate amqpTemplate;
public void send(){
String content = "hello : " + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
amqpTemplate.convertAndSend("hello", content);
}
}
此時的AmqpTemplate物件其實是RabbitTemplate的例項,因為RabbitTemplate是AmqpTemplate的子類:
至於為啥能自動注入這個Bean,後面會講解到,先不急。
4.4、建立訊息消費者
通過
@RabbitListener
註解定義該類對hello
佇列的監聽,並用@RabbitHandler
註解來指定對訊息的處理方法。所以,該消費者實現了對hello
佇列的消費,消費操作為輸出訊息的字串內容。
@RabbitListener(queues = {"hello"})
@Component
public class Receiver {
@RabbitHandler
public void handler(String content){
System.out.println("接收到:" + content);
}
}
4.5、建立RabbitMQ配置類
用來配置佇列、交換器、路由等高階資訊。
@Configuration
public class RabbitConfig {
@Bean
public Queue helloQueue(){
return new Queue("hello");
}
}
4.6、編寫測試類
注入訊息生產者用於向佇列中傳送訊息
@SpringBootTest
@RunWith(SpringRunner.class)
public class TestRabbitmq {
@Autowired
private Sender sender;
@Test
public void test(){
sender.send();
}
}
4.7、測試
-
先啟動主程式類,用於監聽佇列;
-
然後執行測試類,使用生產者向佇列中傳送訊息。
4.8 、疑問
學過RabbitMQ基礎的童靴肯定知道,要通過RabbitMQ傳送訊息的話,需要建立通道,交換機,佇列,並將通道與交換機、交換機與佇列繫結起來,而上述的簡單例子中,為什麼沒看到通道、交換機的建立,也沒看到繫結的操作呢?其實在RabbitMQ中,在不建立交換機的情況下,RabbitMQ會建立一個預設的交換機,通過RabbitMQ視覺化管理介面可以看到:
而且建立的佇列,預設也就繫結到該交換機上,見下圖:
再仔細看這個預設交換機,能看到這個交換機(Exchange)型別是Direct模式的,至於什麼是Direct模式,後面會講。
5、Spring Boot自動配置RabbitMQ
同之前文章一樣,SpringBoot整合RabbitMQ同樣有個自動配置類,只不過RabbitMQ的自動配置類是由SpringBoot官方自行提供,而不像Mybatis是由Mybatis方提供的。這個自動配置類在spring-boot-autoconfigure-xxx.jar包中:
這裡說個題外話,是關於自定義Starter的小知識點:
-
啟動器(Starter)只用來做依賴匯入;
-
需要專門寫一個自動配置模組;
-
啟動器依賴自動配置模組,使用的時候只需要引入啟動器(Starter);
而在命名規範中約定如下:
-
官方名稱空間:
-
字首:spring-boot-starter-xxx
-
模式:spring-boot-starter-模組名
-
舉例:spring-boot-starter-web、spring-boot-starter-jdbc、...
-
-
自定義名稱空間:
-
字首:xxx-spring-boot-starter
-
模式:模組名-spring-boot-starter
-
舉例:mybatis-spring-boot-starter、pagehelper-spring-boot-starter、...
-
進入正題之前,咱們先來看看Spring與RabbitMQ的整合時的配置資訊:
<!-- RabbitMQ start -->
<!-- 連線配置 -->
<rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}"
password="${mq.password}" port="${mq.port}" />
<rabbit:admin connection-factory="connectionFactory"/>
<!-- 訊息佇列客戶端 -->
<rabbit:template id="amqpTemplate" exchange="${mq.queue}_exchange" connection-factory="connectionFactory" />
<!-- queue 佇列宣告 -->
<!--
durable 是否持久化
exclusive 僅建立者可以使用的私有佇列,斷開後自動刪除
auto-delete 當所有消費端連線斷開後,是否自動刪除佇列 -->
<rabbit:queue id="test_queue" name="${mq.queue}_testQueue" durable="true" auto-delete="false" exclusive="false" />
<!-- Topic交換機定義,其他型別交換機類似 -->
<!--
交換機:一個交換機可以繫結多個佇列,一個佇列也可以繫結到多個交換機上。
如果沒有佇列繫結到交換機上,則傳送到該交換機上的資訊則會丟失。
-->
<rabbit:topic-exchange name="${mq.queue}_exchange" durable="true" auto-delete="false">
<rabbit:bindings>
<!-- 設定訊息Queue匹配的pattern (direct模式為key) -->
<rabbit:binding queue="test_queue" pattern="${mq.queue}_patt"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<bean name="rabbitmqService" class="xxx.yyy.zzz"></bean>
<!-- 配置監聽 消費者 -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
<!--
queues 監聽佇列,多個用逗號分隔
ref 監聽器 -->
<rabbit:listener queues="test_queue" ref="rabbitmqService"/>
</rabbit:listener-container>
配合上面的xml配置檔案來看看SpringBoot中RabbitMQ的自動配置類RabbitAutoConfiguration:
@Configuration
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
@EnableConfigurationProperties(RabbitProperties.class)
@Import(RabbitAnnotationDrivenConfiguration.class)
public class RabbitAutoConfiguration {
@Configuration
@ConditionalOnMissingBean(ConnectionFactory.class)
protected static class RabbitConnectionFactoryCreator {
@Bean
public CachingConnectionFactory rabbitConnectionFactory(RabbitProperties config)
throws Exception {
RabbitConnectionFactoryBean factory = new RabbitConnectionFactoryBean();
//...
return connectionFactory;
}
}
@Configuration
@Import(RabbitConnectionFactoryCreator.class)
protected static class RabbitTemplateConfiguration {
@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnMissingBean(RabbitTemplate.class)
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//...
return rabbitTemplate;
}
@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true)
@ConditionalOnMissingBean(AmqpAdmin.class)
public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
}
}
該自動配置類中自動註冊了三個重要的Bean,分別是rabbitConnectionFactory、rabbitTemplate、amqpAdmin,剛好與xml配置檔案中的前三個Bean一一對應。當然RabbitMQ的配置資訊由RabbitProperties類進行匯入:
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitProperties {
private String host = "localhost";
private int port = 5672;
private String username;
private String password;
private final Ssl ssl = new Ssl();
private String virtualHost;
private String addresses;
private Integer requestedHeartbeat;
private boolean publisherConfirms;
private boolean publisherReturns;
private Integer connectionTimeout;
private final Cache cache = new Cache();
private final Listener listener = new Listener();
private final Template template = new Template();
private List<Address> parsedAddresses;
public static class Ssl {
private boolean enabled;
private String keyStore;
private String keyStorePassword;
private String trustStore;
private String trustStorePassword;
private String algorithm;
}
public static class Cache {
private final Channel channel = new Channel();
private final Connection connection = new Connection();
public static class Channel {
private Integer size;
private Long checkoutTimeout;
}
public static class Connection {
private CacheMode mode = CacheMode.CHANNEL;
private Integer size;
}
}
public static class Listener {
@NestedConfigurationProperty
private final AmqpContainer simple = new AmqpContainer();
}
public static class AmqpContainer {
private boolean autoStartup = true;
private AcknowledgeMode acknowledgeMode;
private Integer concurrency;
private Integer maxConcurrency;
private Integer prefetch;
private Integer transactionSize;
private Boolean defaultRequeueRejected;
private Long idleEventInterval;
@NestedConfigurationProperty
private final ListenerRetry retry = new ListenerRetry();
}
public static class Template {
@NestedConfigurationProperty
private final Retry retry = new Retry();
private Boolean mandatory;
private Long receiveTimeout;
private Long replyTimeout;
}
public static class Retry {
private boolean enabled;
private int maxAttempts = 3;
private long initialInterval = 1000L;
private double multiplier = 1.0;
private long maxInterval = 10000L;
}
public static class ListenerRetry extends Retry {
private boolean stateless = true;
}
private static final class Address {
private static final String PREFIX_AMQP = "amqp://";
private static final int DEFAULT_PORT = 5672;
private String host;
private int port;
private String username;
private String password;
private String virtualHost;
}
}
大家自行嘗試,這裡就不多描述了。
在RabbitAutoConfiguration配置類上有個簽名:
@Import(RabbitAnnotationDrivenConfiguration.class)
來看看RabbitAnnotationDrivenConfiguration類:
public class SimpleRabbitListenerContainerFactory
extends AbstractRabbitListenerContainerFactory<SimpleMessageListenerContainer>
implements ApplicationContextAware, ApplicationEventPublisherAware {
private Executor taskExecutor;
private PlatformTransactionManager transactionManager;
private Integer txSize;
private Integer concurrentConsumers;
private Integer maxConcurrentConsumers;
private Long startConsumerMinInterval;
private Long stopConsumerMinInterval;
private Integer consecutiveActiveTrigger;
private Integer consecutiveIdleTrigger;
private Integer prefetchCount;
private Long receiveTimeout;
private Boolean defaultRequeueRejected;
private Advice[] adviceChain;
private BackOff recoveryBackOff;
private Boolean missingQueuesFatal;
private Boolean mismatchedQueuesFatal;
private ConsumerTagStrategy consumerTagStrategy;
private Long idleEventInterval;
private ApplicationEventPublisher applicationEventPublisher;
private ApplicationContext applicationContext;
@Override
protected SimpleMessageListenerContainer createContainerInstance() {
return new SimpleMessageListenerContainer();
}
@Override
protected void initializeContainer(SimpleMessageListenerContainer instance) {
//other code...
}
}
它其實是SimpleMessageListenerContainer的工廠類,而SimpleMessageListenerContainer又是<rabbit:listener-container />標籤的具體實現類,剛好又同xml配置檔案的消費監聽容器對應。
至於其他的配置資訊,如佇列和交換機的建立,以及佇列與交換機的繫結就由配置類自行定義了。請往下接著看...
6、RabbitMQ交換機及工作模式
RabbitMQ的交換機Exchange有如下幾種型別:
-
Fanout
-
Direct
-
Topic
-
Header
其中header型別的Exchange由於用的相對較少,所以本章主要講述其他三種類型的Exchange。
RabbitMQ的工作模式:
-
釋出/訂閱模式:對應Fanout型別的交換機。
-
路由模式:對應Direct型別的交換機。
-
萬用字元模式:對應Topic型別的交換機。
6.1、釋出/訂閱模式(Fanout)
任何傳送到Fanout Exchange的訊息都會被轉發到與該Exchange繫結(Binding)的所有Queue上。
-
可以理解為路由表的模式;
-
這種模式不需要RouteKey;
-
這種模式需要提前將Exchange與Queue進行繫結,一個Exchange可以繫結多個Queue,一個Queue可以同多個Exchange進行繫結;
-
如果接受到訊息的Exchange沒有與任何Queue繫結,則訊息會被拋棄。
程式碼示例:
FanoutConfig配置類程式碼,配置了兩個佇列和一個交換機,並繫結:
@Configuration
public class FanoutConfig {
public static final String FANOUT_QUEUE_NAME_1 = "fanout-queue-1";
public static final String FANOUT_QUEUE_NAME_2 = "fanout-queue-2";
public static final String FANOUT_EXCHANGE_NAME = "fanout-exchange";
@Bean
public Queue fanoutQueue1() {
// return new Queue(FANOUT_QUEUE_NAME_1);//預設情況,durable為true,exclusive為false,auto-delete為false
return QueueBuilder.durable(FANOUT_QUEUE_NAME_1).build();
}
@Bean
public Queue fanoutQueue2() {
// return new Queue(FANOUT_QUEUE_NAME_1);//預設情況,durable為true,exclusive為false,auto-delete為false
return QueueBuilder.durable(FANOUT_QUEUE_NAME_2).build();
}
@Bean
public FanoutExchange fanoutExchange() {
// return new FanoutExchange(FANOUT_EXCHANGE_NAME);//預設情況下,durable為true,auto-delete為false
return (FanoutExchange) ExchangeBuilder.fanoutExchange(FANOUT_EXCHANGE_NAME).durable(true).build();
}
@Bean
public Binding fanoutBinding1(FanoutExchange fanoutExchange, Queue fanoutQueue1) {
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
@Bean
public Binding fanoutBinding2(FanoutExchange fanoutExchange, Queue fanoutQueue2) {
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
訊息生產者:
@Component(value = "fanout-sender")
public class Sender {
@Autowired
private AmqpTemplate amqpTemplate;
public void send(String name){
String content = "hello : " + name + ",當前時間:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
amqpTemplate.convertAndSend(FanoutConfig.FANOUT_EXCHANGE_NAME, "", content);
}
}
消費者1號:
@RabbitListener(queues = {FanoutConfig.FANOUT_QUEUE_NAME_1})
@Component("fanout-receiver1")
public class Receiver1 {
@RabbitHandler
public void handler(String content){
System.out.println("Fanout.Receiver1接收到:" + content);
}
}
消費者2號:
@RabbitListener(queues = {FanoutConfig.FANOUT_QUEUE_NAME_2})
@Component("fanout-receiver2")
public class Receiver2 {
@RabbitHandler
public void handler(String content){
System.out.println("Fanout.Receiver2接收到:" + content);
}
}
Fanout控制器:
@RestController
public class FanoutController {
@Autowired
@Qualifier("fanout-sender")
private Sender sender;
@RequestMapping("/fanout")
public String hello(String name){
sender.send(name);
return "success";
}
}
輸入url:http://localhost:8081/fanout?name=Cay 觀看控制檯輸出:
6.2、路由模式(Direct)
-
RabbitMQ預設自帶Exchange,該Exchange的名字為空字串,當前也可以自己指定名字;
-
在預設的Exchange下,不需要將Exchange與Queue繫結, RabbitMQ會自動繫結;而如果使用自定義的Exchange,則需要在將Exchange繫結到Queue的時候需要指定一個RouteKey;
-
在訊息傳遞時需要一個RouteKey;
-
所有傳送到Direct Exchange的訊息會被轉發到RouteKey中指定的Queue。
-
如果vhost中不存在RouteKey中指定的佇列名,則該訊息會被拋棄。
程式碼示例:
DirectConfig配置類程式碼,配置兩個佇列,通過兩個不同的routeKey繫結到同一個Exchange上:
@Configuration
public class DirectConfig {
public static final String DIRECT_QUEUE_NAME_1 = "direct-queue-1";
public static final String DIRECT_QUEUE_NAME_2 = "direct-queue-2";
public static final String DIRECT_EXCHANGE_NAME = "direct-exchange";
public static final String ROUTE_KEY_1 = "direct.route.key.1";
public static final String ROUTE_KEY_2 = "direct.route.key.2";
@Bean
public Queue directQueue1() {
// return new Queue(DIRECT_QUEUE_NAME_1);//預設情況,durable為true,exclusive為false,auto-delete為false
return QueueBuilder.durable(DIRECT_QUEUE_NAME_1).build();
}
@Bean
public Queue directQueue2() {
// return new Queue(DIRECT_QUEUE_NAME_2);//預設情況,durable為true,exclusive為false,auto-delete為false
return QueueBuilder.durable(DIRECT_QUEUE_NAME_2).build();
}
@Bean
public DirectExchange directExchange() {
// return new DirectExchange(DIRECT_EXCHANGE_NAME_1);//預設情況下,durable為true,auto-delete為false
return (DirectExchange) ExchangeBuilder.directExchange(DIRECT_EXCHANGE_NAME).durable(true).build();
}
@Bean
public Binding directBinding1(DirectExchange directExchange, Queue directQueue1) {
return BindingBuilder.bind(directQueue1).to(directExchange).with(ROUTE_KEY_1);
}
@Bean
public Binding directBinding2(DirectExchange directExchange, Queue directQueue2) {
return BindingBuilder.bind(directQueue2).to(directExchange).with(ROUTE_KEY_2);
}
}
訊息生產者:
@Component("direct-sender")
public class Sender {
@Autowired
private AmqpTemplate amqpTemplate;
public void send(Integer selector) {
String content = "hello,我是%d號,當前時間:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
String routeKey = "";
if (selector.intValue() == 1) {
content = String.format(content, 1);
routeKey = DirectConfig.ROUTE_KEY_1;
} else if (selector.intValue() == 2) {
content = String.format(content, 2);
routeKey = DirectConfig.ROUTE_KEY_2;
}
amqpTemplate.convertAndSend(DirectConfig.DIRECT_EXCHANGE_NAME, routeKey, content);
}
}
消費者1號:
@RabbitListener(queues = {DirectConfig.DIRECT_QUEUE_NAME_1})
@Component("direct-receiver1")
public class Receiver1 {
@RabbitHandler
public void handler(String content){
System.out.println("Direct.Receiver1接收到:" + content);
}
}
消費者2號:
@RabbitListener(queues = {DirectConfig.DIRECT_QUEUE_NAME_2})
@Component("direct-receiver2")
public class Receiver2 {
@RabbitHandler
public void handler(String content){
System.out.println("Direct.Receiver2接收到:" + content);
}
}
Direct控制器:
@RestController
public class DirectController {
private static final Logger logger = LoggerFactory.getLogger(DirectController.class);
@Autowired
@Qualifier("direct-sender")
private Sender sender;
@RequestMapping("/direct")
public String hello(@RequestParam(defaultValue = "1") int selector){
logger.info("引數selector:" + selector);
sender.send(selector);
return "success";
}
}
輸入兩次不同的引數selector:
6.3、萬用字元模式(Topic)
任何傳送到Topic Exchange的訊息都會被轉發到所有關心RouteKey中指定的Queue上。
-
這種模式較為複雜,簡單來說,就是每個佇列都有其關心的主題,所有的訊息都帶有一個“標題”(RouteKey),Exchange會將訊息轉發到所有關注主題能與RouteKey模糊匹配的佇列。
-
這種模式需要RouteKey,也需要提前繫結Exchange與Queue。
-
在進行繫結時,要提供一個該佇列關心的主題,如“#.log.#”表示該佇列關心所有涉及log的訊息(一個RouteKey為”MQ.log.error”的訊息會被轉發到該佇列)。
-
“#”表示0個或若干個關鍵字,“*”表示一個關鍵字。如“log.*”能與“log.warn”匹配,但是無法與“log.warn.timeout”匹配;但是“log.#”能與上述兩者匹配。
-
同樣,如果Exchange沒有發現能夠與RouteKey匹配的Queue,則會拋棄此訊息。
程式碼示例:
TopicConfig配置類,聲明瞭兩個佇列,分別對應兩個routeKey:topic.#和topic.*
@Configuration
public class TopicConfig {
public static final String TOPIC_QUEUE_NAME_1 = "topic-queue-1";
public static final String TOPIC_QUEUE_NAME_2 = "topic-queue-2";
public static final String TOPIC_EXCHANGE_NAME = "topic-exchange";
public static final String ROUTE_KEY_1 = "topic.#";
public static final String ROUTE_KEY_2 = "topic.*";
@Bean
public TopicExchange topicExchange() {
// return new TopicExchange(TOPIC_EXCHANGE_NAME);//預設情況下,durable為true,auto-delete為false
return (TopicExchange) ExchangeBuilder.topicExchange(TOPIC_EXCHANGE_NAME).durable(true).build();
}
@Bean
public Queue topicQueue1() {
return new Queue(TOPIC_QUEUE_NAME_1);
}
@Bean
public Queue topicQueue2() {
return new Queue(TOPIC_QUEUE_NAME_2);
}
@Bean
public Binding topicBinding1(TopicExchange topicExchange, Queue topicQueue1) {
return BindingBuilder.bind(topicQueue1).to(topicExchange).with(ROUTE_KEY_1);
}
@Bean
public Binding topicBinding2(TopicExchange topicExchange, Queue topicQueue2) {
return BindingBuilder.bind(topicQueue2).to(topicExchange).with(ROUTE_KEY_2);
}
}
訊息生產者:
@Component("topic-sender")
public class Sender {
private static final String TOPIC_PREFIX = "topic.";
@Autowired
private AmqpTemplate amqpTemplate;
public void send(String selector){
String content = "hello,當前時間:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
amqpTemplate.convertAndSend(TopicConfig.TOPIC_EXCHANGE_NAME, TOPIC_PREFIX + selector, content);
}
}
消費者1號:
@RabbitListener(queues = {TopicConfig.TOPIC_QUEUE_NAME_1})
@Component("topic-receiver1")
public class Receiver1 {
@RabbitHandler
public void handler(String content){
System.out.println("Topic.Receiver1接收到:" + content);
}
}
消費者2號:
@RabbitListener(queues = {TopicConfig.TOPIC_QUEUE_NAME_2})
@Component("topic-receiver2")
public class Receiver2 {
@RabbitHandler
public void handler(String content){
System.out.println("Topic.Receiver2接收到:" + content);
}
}
Topic控制器:
@RestController
@RequestMapping("/topic")
public class TopicController {
@Autowired
@Qualifier("topic-sender")
private Sender sender;
@RequestMapping("/send")
public String send(String routeKey){
sender.send(routeKey);
return "success";
}
}
通過url訪問不同的routeKey來檢視結果:
可以看到,如果引數為message的時候即routeKey為topic.message,兩個佇列都能接收到訊息,而如果引數為message.a的時候即routeKey為topic.message.a,只有佇列1能接收到訊息,而佇列2不能接收到訊息。
7、訊息確認與回撥
預設情況下,RabbitMQ傳送訊息以及接收訊息是自動確認的,意思也就是說,訊息傳送方傳送訊息的時候,認為訊息已經成功傳送到了RabbitMQ伺服器,而當訊息傳送給消費者後,RabbitMQ伺服器就立即自動確認,然後將訊息從佇列中刪除了。而這樣的自動機制會造成訊息的丟失,我們常常聽到“丟訊息”的字眼。
為了解決訊息的丟失,RabbitMQ便產生了手動確認的機制:
-
傳送者:
-
當訊息不能路由到任何佇列時,會進行確認失敗操作,如果傳送方設定了mandatory=true模式,則先會呼叫basic.return方法,然後呼叫basic.ack方法;
-
當訊息可以路由時,訊息被髮送到所有繫結的佇列時,進行訊息的確認basic.ack。
-
-
接收者:
-
當訊息成功被消費時,可以進行訊息的確認basic.ack;
-
當訊息不能正常被訊息時,可以進行訊息的反確認basic.nack 或者拒絕basic.reject。
-
當設定mandatory=true時,其中basic.ack和basic.nack會呼叫自定義的RabbitTemplate.ConfirmCallback介面的confirm方法。
public interface ConfirmCallback {
/**
* Confirmation callback.
* @param correlationData correlation data for the callback.
* @param ack true for ack, false for nack
* @param cause An optional cause, for nack, when available, otherwise null.
*/
void confirm(CorrelationData correlationData, boolean ack, String cause);
}
而傳送者傳送訊息時無法路由後,會呼叫baisc.return方法,其確認機制由RabbitTemplate.ReturnCallback介面的returnedMessage方法實現。
public interface ReturnCallback {
/**
* Returned message callback.
* @param message the returned message.
* @param replyCode the reply code.
* @param replyText the reply text.
* @param exchange the exchange.
* @param routingKey the routing key.
*/
void returnedMessage(Message message, int replyCode, String replyText,
String exchange, String routingKey);
}
7.1、使用Spring配置RabbitMQ的確認機制
- 修改publisher-confirms為true
<!--連線工廠 -->
<rabbit:connection-factory id="connectionFactory" host="{ip地址}" port="{埠}" username="{使用者名稱}" password="{密碼}" publisher-confirms="true"/>
- 修改訊息回撥方法confirm-callback和return-callback為bean的id
<!-- mandatory必須設定true,return callback才生效 -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
confirm-callback="confirmCallBackListener"
return-callback="returnCallBackListener"
mandatory="true"
/>
-
訊息回撥類
@Service
public class ConfirmCallBackListener implements RabbitTemplate.ConfirmCallback{
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
//other code...
}
}
@Service
class returnCallBackListener implements RabbitTemplate.ReturnCallback{
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
//other code...
}
}
- 修改訊息確認機制改成手動確認manual:
<rabbit:listener-container
connection-factory="connectionFactory" acknowledge="manual" >
<!-- other xml -->
</rabbit:listener-container>
7.2、使用SpringBoot配置RabbitMQ的確認機制
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
//other code...
//傳送者是否確認
cachingConnectionFactory.setPublisherConfirms(true);
return cachingConnectionFactory;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(){
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();
//other code...
//修改成手動確認
rabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return rabbitListenerContainerFactory;
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
//重點
rabbitTemplate.setMandatory(true);
//訊息回撥
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
});
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
});
return rabbitTemplate;
}
現在SpringBoot整合RabbitMQ的基礎寫得差不多了,還有很多功能需要大家去研究了。下期再見...
====================打個廣告,歡迎關注====================
QQ: |
412425870 |
微信公眾號:Cay課堂 |
|
csdn部落格: |
http://blog.csdn.net/caychen |
碼雲: |
https://gitee.com/caychen/ |
github: |
https://github.com/caychen |
點選群號或者掃描二維碼即可加入QQ群:328243383(1群) |
|
點選群號或者掃描二維碼即可加入QQ群: |
|