基於SpringBoot使用RabbitMQ以及原理詳解
RabbitMQ 使用與詳解
RabbitMQ參考中文文件
1. RabbitMQ原理詳解
-
Producer(生產者),產生訊息並向RabbitMq傳送訊息
-
Consumer(消費者),等待RabbitMq訊息到來並處理訊息
-
Queue(佇列), 依存於RabbitMQ內部, 雖然訊息通過RabbitMQ在你的應用中傳遞,但是它們只能儲存在queue中
-
message acknowledgment,我們可以要求消費者在消費完訊息後傳送一個回執給RabbitMQ,RabbitMQ收到訊息回執(Message acknowledgment)後才將該訊息從Queue中移除
-
message durability,我們可以將Queue與Message都設定為可持久化的(durable),這樣可以保證絕大部分情況下我們的RabbitMQ訊息不會丟失。
-
Prefetch Count,如果有多個消費者同時訂閱同一個Queue中的訊息,Queue中的訊息會被平攤給多個消費者。這時如果每個訊息的處理時間不同,就有可能會導致某些消費者一直在忙,而另外一些消費者很快就處理完手頭工作並一直空閒的情況。我們可以通過設定prefetchCount來限制Queue每次傳送給每個消費者的訊息數,比如我們設定prefetchCount=1,則Queue每次給每個消費者傳送一條訊息;消費者處理完這條訊息後Queue會再給該消費者傳送一條訊息。
-
Exchange(交換器),生產者將訊息傳送到Exchange(交換器,下圖中的X),由Exchange將訊息路由到一個或多個Queue中(或者丟棄)
-
routing key,生產者在將訊息傳送給Exchange的時候,一般會指定一個routing key,來指定這個訊息的路由規則,而這個routing key需要與Exchange Type及binding key聯合使用才能最終生效,RabbitMQ為routing key設定的長度限制為255 bytes
-
Binding,RabbitMQ中通過Binding將Exchange與Queue關聯起來,這樣RabbitMQ就知道如何正確地將訊息路由到指定的Queue了
-
binding key,在繫結(Binding)Exchange與Queue的同時,一般會指定一個binding key,binding key 並不是在所有情況下都生效,它依賴於Exchange Type,比如fanout型別的Exchange就會無視binding key,而是將訊息路由到所有繫結到該Exchange的Queue。
-
Exchange Type,常見的有fanout、direct、topic、headers這四種
fanout
fanout型別的Exchange路由規則非常簡單,它會把所有傳送到該Exchange的訊息路由到所有與它繫結的Queue中
direct
把訊息路由到那些binding key與routing key完全匹配的Queue中
topic
把訊息路由到那些binding key與routing key模糊匹配的Queue中
匹配規則:
- routing key為一個句點號“. ”分隔的字串(我們將被句點號“. ”分隔開的每一段獨立的字串稱為一個單詞),如“aa.bb.cc”
- binding key與routing key一樣也是句點號“. ”分隔的字串
- binding key中可以存在兩種特殊字元“”與“#”,用於做模糊匹配,其中“”用於匹配一個單詞,“#”用於匹配多個單詞(可以是零個)
header
headers型別的Exchange不依賴於routingkey與binding key的匹配規則來路由訊息,而是根據傳送的訊息內容中的 headers屬性進行匹配。
2. 執行RabbitMQ
使用docker執行,要使用管理頁面用management的版本
docker run -d --name rabbitmq --publish 5671:5671 --publish 5672:5672 --publish 4369:4369 --publish 25672:25672 --publish 15671:15671 --publish 15672:15672 rabbitmq:management
管理頁面預設使用者名稱和密碼都是guest
3. 建立QUEUE
點選Queues,Add a new queue
填入queue名稱儲存即可
4. 建立Exchange
點選Exchanges,Add a new exchange
輸入Echange名稱,選擇type
儲存即可
5. 繫結queue和exchange
點選剛才建立的exchange,Bindings下面填入queue的名稱和Routing Key即可
6. 建立springboot程式來收發訊息
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.mt.demo</groupId>
<artifactId>spring-boot-rabbitmq-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>spring-boot-rabbitmq-demo</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.6.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.14</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.yml
spring:
application:
name: rabbitmq-demo
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
publisher-confirms: true #生產者可以判斷訊息是否傳送到了broker
publisher-returns: true #生產者可以判斷訊息是否傳送到了queue
listener:
direct:
acknowledge-mode: manual
simple:
acknowledge-mode: manual
server:
port: 10001
先在RabbitMQ管理頁面上建立hello的佇列,並且使用繫結到topic交換器上
建立一個消費者
@Slf4j
@Component
@RabbitListener(queues = "hello")
public class HelloListener {
@RabbitHandler
public void process(String hello) {
log.info("Receiver: {}", hello);
}
}
建立一個生產者
@GetMapping("/send")
public void send(@RequestParam String topic, @RequestParam String route, @RequestParam String msg) {
log.info("send topic[{}], msg: {}", topic, msg);
rabbitTemplate.convertAndSend(topic, route, msg);
}
如果再建立一個消費者繫結同樣的佇列,則可以看到兩個消費者交替收到訊息
@Slf4j
@Component
@RabbitListener(queues = "hello")
public class HelloListener2 {
@RabbitHandler
public void process(String hello) {
log.info("Receiver2: {}", hello);
}
}
如果再建立一個queue和前一個使用一樣的bindingkey,則傳送的訊息會同是傳送進兩個queue
配置RabbitTemplate,加入訊息確認機制回撥
@Autowired
private ReturnCallBackListener returnCallBackListener;
@Autowired
private ConfirmCallbackListener confirmCallbackListener;
@Bean
public RabbitTemplate getRabbitTemplate(CachingConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setConfirmCallback(confirmCallbackListener);
/**
* 當mandatory標誌位設定為true時
* 如果exchange根據自身型別和訊息routingKey無法找到一個合適的queue儲存訊息
* 那麼broker會呼叫basic.return方法將訊息返還給生產者
* 當mandatory設定為false時,出現上述情況broker會直接將訊息丟棄
*/
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(returnCallBackListener);
return rabbitTemplate;
}
ConfirmCallback: ConfirmCallback介面用於實現訊息傳送到RabbitMQ交換器後接收ack回撥
ReturnCallback:ReturnCallback介面用於實現訊息傳送到RabbitMQ交換器,但無相應佇列與交換器繫結時的回撥