RabbitMQ不講武德,發個訊息也這麼多花招
阿新 • • 發佈:2021-01-03
# 前言
```properties
本篇部落格已被收錄GitHub:https://zhouwenxing.github.io/
文中所涉及的原始碼也已被收錄GitHub:https://github.com/zhouwenxing/lonely-wolf-note (message-queue模組)
```
使用訊息佇列必須要保證生產者傳送的訊息能被消費者所接收,那麼生產者如何接收訊息呢?下圖是 `RabbitMQ` 的工作模型:
![](https://img2020.cnblogs.com/blog/2232223/202101/2232223-20210103114009505-1976367881.png)
上圖中生產者會將訊息傳送到交換機 `Exchange` 上,再由 `Exchange` 傳送給不同的 `Queue` ,而 `Queue` 是用來儲存訊息佇列,那麼假如有多個生產者,那麼訊息傳送到交換機 `Exchange` 之後,應該如何和 `Queue` 之間建立繫結關係呢?
# 如何使用 RabbitMQ 傳送訊息
`RabbitMQ` 中提供了3種傳送訊息的路由方式。
## 直連 Direct 模式
通過指定一個精確的繫結鍵來實現 `Exchange`(交換機) 和 `Queue`(訊息佇列) 之間的繫結,也就是說,當建立了一個直連型別的交換機時,生產者在傳送訊息時攜帶的路由鍵(routing key),必須與某個繫結鍵(binding key)**完全匹配**時,這條訊息才會從交換機路由到滿足路由關係訊息佇列上,然後消費者根據各自監聽的佇列就可以獲取到訊息(如下如吐所示,`Queue1` 綁定了 `order` ,那麼這時候傳送訊息的路由鍵必須為 `order` 才能分配到 `Queue1` 上):
![](https://img2020.cnblogs.com/blog/2232223/202101/2232223-20210103114300335-1886467200.png)
## 主題 Topic 模式
`Direct` 模式會存在一定的侷限性,有時候我們需要按型別劃分,比如訂單類路由到一個佇列,產品類路由到另一個佇列,所以在 RabbitMQ 中,提供了主題模式來實現模糊匹配。使用主題型別連線方式支援兩種萬用字元:
直連方式只能精確匹配,有時候我們需要實現模糊匹配,那麼這時候就需要主題型別的連線方式,在 `RabbitMQ` 中,使用主題型別連線方式支援兩種萬用字元:
- #:表示 `0` 個或者多個單詞
- *:表示 `1` 個單詞
PS:使用萬用字元時,單詞指的是用英文符號的小數點 `.` 隔開的字元,如:`abc.def` 就表示有 `abc` 和 `def` 兩個單詞。
下圖所示中,因為 `Queue1` 綁定了 `order.#`,所以當傳送訊息的路由鍵為 `order` 或者 `order.xxx`時都可以使得訊息分配到 `Queue1` 上:
![](https://img2020.cnblogs.com/blog/2232223/202101/2232223-20210103114435678-277984429.png)
## 廣播 Fanout 模式
當我們定義了一個廣播型別的交換機時就不需要指定繫結鍵,而且生產者傳送訊息到交換機上時,也不需要攜帶路由鍵,此時當訊息到達交換機時,所有與其繫結的佇列都會收到訊息,這種模式的訊息傳送適用於訊息通知類需求。
如下如所示,`Queue1`,`Queue2`,`Queue3` 三個佇列都繫結到了一個 `Fanout` 交換機上,那麼當 `Fanout Exchange` 收到訊息時,會同時將訊息傳送給三個佇列:
![](https://img2020.cnblogs.com/blog/2232223/202101/2232223-20210103114523317-376991653.png)
在 `RabbitMQ` 提供的後臺管理系統中也能查詢到建立的交換機和佇列等資訊,並且可以通過管理後臺直接建立佇列和交換機:
![](https://img2020.cnblogs.com/blog/2232223/202101/2232223-20210103114709349-7206311.png)
## 訊息傳送實戰
下面通過一個 `SpringBoot` 例子來體會一下三種傳送訊息的方式。
- 1、`application.yml` 檔案中新增如下配置:
```yaml
spring:
rabbitmq:
host: ip
port: 5672
username: admin
password: 123456
```
- 2、新增一個 `RabbitConfig` 配置類(為了節省篇幅省略了包名和匯入 ),此類中聲明瞭三個交換機和三個佇列,並分別進行繫結:
```java
@Configuration
public class RabbitConfig {
//直連交換機
@Bean("directExchange")
public DirectExchange directExchange(){
return new DirectExchange("LONGLY_WOLF_DIRECT_EXCHANGE");
}
//主題交換機
@Bean("topicExchange")
public TopicExchange topicExchange(){
return new TopicExchange("LONGLY_WOLF_TOPIC_EXCHANGE");
}
//廣播交換機
@Bean("fanoutExchange")
public FanoutExchange fanoutExchange(){
return new FanoutExchange("LONGLY_WOLF_FANOUT_EXCHANGE");
}
@Bean("orderQueue")
public Queue orderQueue(){
return new Queue("LONGLY_WOLF_ORDER_QUEUE");
}
@Bean("userQueue")
public Queue userQueue(){
return new Queue("LONGLY_WOLF_USER_QUEUE");
}
@Bean("productQueue")
public Queue productQueue(){
return new Queue("LONGLY_WOLF_PRODUCT_QUEUE");
}
//Direct交換機和orderQueue繫結,繫結鍵為:order.detail
@Bean
public Binding bindDirectExchange(@Qualifier("orderQueue") Queue queue, @Qualifier("directExchange") DirectExchange directExchange){
return BindingBuilder.bind(queue).to(directExchange).with("order.detail");
}
//Topic交換機和userQueue繫結,繫結鍵為:user.#
@Bean
public Binding bindTopicExchange(@Qualifier("userQueue") Queue queue, @Qualifier("topicExchange") TopicExchange topicExchange){
return BindingBuilder.bind(queue).to(topicExchange).with("user.#");
}
//Fanout交換機和productQueue繫結
@Bean
public Binding bindFanoutExchange(@Qualifier("productQueue") Queue queue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange){
return BindingBuilder.bind(queue).to(fanoutExchange);
}
}
```
- 3、新建一個消費者 `ExchangeConsumer` 類,不同的方法實現分別監聽不同的佇列:
```java
@Component
public class ExchangeConsumer {
/**
* 監聽綁定了direct交換機的的訊息佇列
*/
@RabbitHandler
@RabbitListener(queues = "LONGLY_WOLF_ORDER_QUEUE")
public void directConsumer(String msg){
System.out.println("direct交換機收到訊息:" + msg);
}
/**
* 監聽綁定了topic交換機的的訊息佇列
*/
@RabbitHandler
@RabbitListener(queues = "LONGLY_WOLF_USER_QUEUE")
public void topicConsumer(String msg){
System.out.println("topic交換機收到訊息:" + msg);
}
/**
* 監聽綁定了fanout交換機的的訊息佇列
*/
@RabbitHandler
@RabbitListener(queues = "LONGLY_WOLF_PRODUCT_QUEUE")
public void fanoutConsumer(String msg){
System.out.println("fanout交換機收到訊息:" + msg);
}
}
```
- 4、新增一個 `RabbitExchangeController` 類來作為生產者,進行訊息傳送:
```java
@RestController
@RequestMapping("/exchange")
public class RabbitExchangeController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping(value="/send/direct")
public String sendDirect(String routingKey,@RequestParam(value = "msg",defaultValue = "no direct message") String msg){
rabbitTemplate.convertAndSend("LONGLY_WOLF_DIRECT_EXCHANGE",routingKey,msg);
return "succ";
}
@GetMapping(value="/send/topic")
public String sendTopic(String routingKey,@RequestParam(value = "msg",defaultValue = "no topic message") String msg){
rabbitTemplate.convertAndSend("LONGLY_WOLF_TOPIC_EXCHANGE",routingKey,msg);
return "succ";
}
@GetMapping(value="/send/fanout")
public String sendFaout(String routingKey,@RequestParam(value = "msg",defaultValue = "no faout message") String msg){
rabbitTemplate.convertAndSend("LONGLY_WOLF_FANOUT_EXCHANGE",routingKey,msg);
return "succ";
}
}
```
- 5、啟動服務,當我們呼叫第一個介面時候,路由鍵和繫結鍵 `order.detail` 精確匹配時,`directConsumer` 就會收到訊息,同樣的,呼叫第二介面時,路由鍵滿足 `user.#` 時,`topicConsumer` 就會收到訊息,而只要呼叫第三個介面,不論是否指定路由鍵,`fanoutConsumer` 都會收到訊息。
# 訊息過期了怎麼辦
簡單的傳送訊息我們學會了,難道這就能讓我們就此止步了嗎?顯然是不能的,要玩就要玩高階點,所以接下來讓我們給訊息加點佐料。
## TTL(Time-To-Live)
`TTL` 即 一條訊息在佇列中的最大存活時間。在一條在佇列中超過配置的 `TTL` 的訊息稱為已死訊息。但是需要注意的是,已死訊息並不能保證會立即從佇列中刪除,但是能保證已死的訊息不會被投遞出去。
設定 `TTL` 的方式有兩種:
- 1、給佇列設定 `x-message-ttl`,此時所有被投遞到佇列中的訊息,都會在到達 `TTL` 時成為已死訊息。
這種情況就會出現當一條訊息同時路由到 `N` 個帶有 `TTL` 時間的佇列,而由於每個佇列的 `TTL` 不一定相同,所以同一條訊息在不同的佇列中可能會在不同時間死亡或者不會死亡(未設定 `TTL` ),所以一個佇列中的訊息死亡不會影響到其他佇列中的訊息。
- 2、單獨給某一條訊息設定過期時間。
此時需要注意的時,當訊息達到 `TTL` 時,可能不會馬上被丟棄,因為只有處於佇列頭部訊息過期後才會被丟棄,假如佇列頭部的訊息沒有設定 `TTL`,而第 `2` 條訊息設定了 `TTL`,那麼即使第 `2` 條訊息成為了已死訊息,也必須要等到佇列頭部的訊息被消費之後才會被丟棄,而已死訊息在被丟棄之前也會被計入統計資料(比如佇列中的訊息總數)。所以為了更好的利用 `TTL` 特性,建議讓消費者線上消費訊息,這樣才能確保訊息更快的被丟棄,防止訊息堆積。
PS:訊息過期和消費者傳遞之間可能存在自然的競爭條件。例如,訊息可能在傳送途中(未到達消費者)過期。
## 佇列的生存
和 `TTL` 針對訊息不同的是,我們可以通過設定過期時間屬性 `x-expires`` 來處理佇列,當在指定過期時間內內未使用佇列時,伺服器保證將刪除佇列(但是無法保證在過期時間過後佇列將以多快的速度被刪除)。
## TTL 和過期時間實戰
- 1、在上面定義的 `RabbitConfig` 類中,再新增一個 `TTL` 佇列並將其繫結到 `direct` 交換機上:
```java
@Bean("ttlQueue")
public Queue ttlQueue(){