1. 程式人生 > >基於SpringBoot使用RabbitMQ以及原理詳解

基於SpringBoot使用RabbitMQ以及原理詳解

RabbitMQ 使用與詳解

RabbitMQ參考中文文件

1. RabbitMQ原理詳解

  • Producer(生產者),產生訊息並向RabbitMq傳送訊息

  • Consumer(消費者),等待RabbitMq訊息到來並處理訊息

  • Queue(佇列), 依存於RabbitMQ內部, 雖然訊息通過RabbitMQ在你的應用中傳遞,但是它們只能儲存在queue中

  • message acknowledgment,我們可以要求消費者在消費完訊息後傳送一個回執給RabbitMQ,RabbitMQ收到訊息回執(Message acknowledgment)後才將該訊息從Queue中移除

  • message durability,我們可以將Queue與Message都設定為可持久化的(durable),這樣可以保證絕大部分情況下我們的RabbitMQ訊息不會丟失。

  • Prefetch Count,如果有多個消費者同時訂閱同一個Queue中的訊息,Queue中的訊息會被平攤給多個消費者。這時如果每個訊息的處理時間不同,就有可能會導致某些消費者一直在忙,而另外一些消費者很快就處理完手頭工作並一直空閒的情況。我們可以通過設定prefetchCount來限制Queue每次傳送給每個消費者的訊息數,比如我們設定prefetchCount=1,則Queue每次給每個消費者傳送一條訊息;消費者處理完這條訊息後Queue會再給該消費者傳送一條訊息。

  • Exchange(交換器),生產者將訊息傳送到Exchange(交換器,下圖中的X),由Exchange將訊息路由到一個或多個Queue中(或者丟棄)

  • routing key,生產者在將訊息傳送給Exchange的時候,一般會指定一個routing key,來指定這個訊息的路由規則,而這個routing key需要與Exchange Type及binding key聯合使用才能最終生效,RabbitMQ為routing key設定的長度限制為255 bytes

  • Binding,RabbitMQ中通過Binding將Exchange與Queue關聯起來,這樣RabbitMQ就知道如何正確地將訊息路由到指定的Queue了

  • binding key,在繫結(Binding)Exchange與Queue的同時,一般會指定一個binding key,binding key 並不是在所有情況下都生效,它依賴於Exchange Type,比如fanout型別的Exchange就會無視binding key,而是將訊息路由到所有繫結到該Exchange的Queue。

  • Exchange Type,常見的有fanout、direct、topic、headers這四種

    fanout

    fanout型別的Exchange路由規則非常簡單,它會把所有傳送到該Exchange的訊息路由到所有與它繫結的Queue中

    direct

    把訊息路由到那些binding key與routing key完全匹配的Queue中

    topic

​ 把訊息路由到那些binding key與routing key模糊匹配的Queue中

匹配規則:

  1. routing key為一個句點號“. ”分隔的字串(我們將被句點號“. ”分隔開的每一段獨立的字串稱為一個單詞),如“aa.bb.cc
  2. binding key與routing key一樣也是句點號“. ”分隔的字串
  3. binding key中可以存在兩種特殊字元“”與“#”,用於做模糊匹配,其中“”用於匹配一個單詞,“#”用於匹配多個單詞(可以是零個)

header

headers型別的Exchange不依賴於routingkey與binding key的匹配規則來路由訊息,而是根據傳送的訊息內容中的					headers屬性進行匹配。

2. 執行RabbitMQ

使用docker執行,要使用管理頁面用management的版本

docker run -d --name rabbitmq --publish 5671:5671 --publish 5672:5672 --publish 4369:4369 --publish 25672:25672 --publish 15671:15671 --publish 15672:15672 rabbitmq:management

管理頁面預設使用者名稱和密碼都是guest

3. 建立QUEUE

點選Queues,Add a new queue

填入queue名稱儲存即可

4. 建立Exchange

點選Exchanges,Add a new exchange

輸入Echange名稱,選擇type

儲存即可

5. 繫結queue和exchange

點選剛才建立的exchange,Bindings下面填入queue的名稱和Routing Key即可

6. 建立springboot程式來收發訊息

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.mt.demo</groupId>
	<artifactId>spring-boot-rabbitmq-demo</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>spring-boot-rabbitmq-demo</name>
	<description>Demo project for Spring Boot</description>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.0.6.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<version>1.16.14</version>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

application.yml

spring:
  application:
    name: rabbitmq-demo
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    publisher-confirms: true #生產者可以判斷訊息是否傳送到了broker
    publisher-returns: true #生產者可以判斷訊息是否傳送到了queue
    listener:
      direct:
        acknowledge-mode: manual
      simple:
        acknowledge-mode: manual
server:
  port: 10001

先在RabbitMQ管理頁面上建立hello的佇列,並且使用繫結到topic交換器上

建立一個消費者

@Slf4j
@Component
@RabbitListener(queues = "hello")
public class HelloListener {

    @RabbitHandler
    public void process(String hello) {
        log.info("Receiver: {}", hello);
    }
}

建立一個生產者

@GetMapping("/send")
public void send(@RequestParam String topic, @RequestParam String route, @RequestParam String msg) {
    log.info("send topic[{}], msg: {}", topic, msg);

    rabbitTemplate.convertAndSend(topic, route, msg);
}

如果再建立一個消費者繫結同樣的佇列,則可以看到兩個消費者交替收到訊息

@Slf4j
@Component
@RabbitListener(queues = "hello")
public class HelloListener2 {

    @RabbitHandler
    public void process(String hello) {
        log.info("Receiver2: {}", hello);
    }
}

如果再建立一個queue和前一個使用一樣的bindingkey,則傳送的訊息會同是傳送進兩個queue

配置RabbitTemplate,加入訊息確認機制回撥

@Autowired
private ReturnCallBackListener returnCallBackListener;

@Autowired
private ConfirmCallbackListener confirmCallbackListener;

@Bean
public RabbitTemplate getRabbitTemplate(CachingConnectionFactory connectionFactory){
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setConfirmCallback(confirmCallbackListener);
    /**
    * 當mandatory標誌位設定為true時
    * 如果exchange根據自身型別和訊息routingKey無法找到一個合適的queue儲存訊息
    * 那麼broker會呼叫basic.return方法將訊息返還給生產者
    * 當mandatory設定為false時,出現上述情況broker會直接將訊息丟棄
    */
    rabbitTemplate.setMandatory(true);
    rabbitTemplate.setReturnCallback(returnCallBackListener);

    return rabbitTemplate;
}

ConfirmCallback: ConfirmCallback介面用於實現訊息傳送到RabbitMQ交換器後接收ack回撥

ReturnCallback:ReturnCallback介面用於實現訊息傳送到RabbitMQ交換器,但無相應佇列與交換器繫結時的回撥