分散式訊息通訊RabbitMQ工作模型
本章重點:
1.三種主要的交換機介紹
2.SpringBoot整合RabbitMQ三種交換機
3.死信佇列
4.優先順序佇列和訊息
5.服務端流控
6.消費端限流
RabbitMQ的特性
RabbitMQ使用Erlang語言編寫,使用Mnesia資料庫儲存訊息。
1.可靠性(Reliability) RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸確認、釋出確認。 2.靈活的路由(Flexible Routing) 在訊息進入佇列之前,通過 Exchange 來路由訊息的。對於典型的路由功 能,RabbitMQ 已經提供了一些內建的 Exchange 來實現。針對更復雜的路由功能,可以將多個 Exchange 繫結在 一起,也通過外掛機制實現自己的 Exchange 。 3.訊息叢集(Clustering) 多個 RabbitMQ 伺服器可以組成一個叢集,形成一個邏輯 Broker 。 4.高可用(Highly Available Queues) 佇列可以在叢集中的機器上進行映象,使得在部分節點出問題的情況下 佇列仍然可用。 5.多種協議(Multi-protocol) RabbitMQ 支援多種訊息佇列協議,比如 AMQP、STOMP、MQTT 等等。 6.多語言客戶端(Many Clients) RabbitMQ 幾乎支援所有常用語言,比如 Java、.NET、Ruby、PHP、C#、 JavaScript 等等。 7.管理介面(Management UI) RabbitMQ 提供了一個易用的使用者介面,使得使用者可以監控和管理訊息、叢集 中的節點。 8.外掛機制(Plugin System) RabbitMQ提供了許多外掛,以實現從多方面擴充套件,當然也可以編寫自己的外掛。
工作模型
RabbitMQ的術語介紹
Broker:即RabbitMQ的實體伺服器。提供一種傳輸服務,維護一條從生產者到消費者的傳輸線路, 保證訊息資料能按照指定的方式傳輸。 Exchange:訊息交換機。指定訊息按照什麼規則路由到哪個佇列Queue。 Queue:訊息佇列。訊息的載體,每條訊息都會被投送到一個或多個佇列中。 Binding:繫結。作用就是將Exchange和Queue按照某種路由規則繫結起來。 Routing Key:路由關鍵字。Exchange根據Routing Key進行訊息投遞。定義繫結時指定的關鍵字稱為 Binding Key。 Vhost:虛擬主機。一個Broker可以有多個虛擬主機,用作不同使用者的許可權分離。一個虛擬主機持有 一組Exchange、Queue和Binding。 Producer:訊息生產者。主要將訊息投遞到對應的Exchange上面。一般是獨立的程式。 Consumer:訊息消費者。訊息的接收者,一般是獨立的程式。 Connection: Producer 和 Consumer 與Broker之間的TCP長連線。 Channel:訊息通道,也稱通道。在客戶端的每個連線裡可以建立多個Channel,每個Channel代表一 個會話任務。在RabbitMQ Java Client API中,channel上定義了大量的程式設計介面。
三種主要的交換機
- Direct Exchange 直連交換機
- Topic Exchange 主題交換機
- Fanout Exchange 廣播交換機
RabbitMQ基本使用:
本文直接使用SpringBoot來進行開發.
訊息消費者:
專案結構:
新增pom依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
新增連線配置 application.properties
spring.application.name=spring-boot-rabbitmq
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
建立配置類:RabbitConfig
在這裡建立了三種類型的交換機,四個佇列
package com.zbb.rabbitmq_consumer.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 配置類,需要使用configuration註解
*/
@Configuration
public class RabbitConfig {
//定義三個交換機
/**
* 直連交換機
* @return
*/
@Bean
public DirectExchange directExchange(){
return new DirectExchange("DIRECT_EXCHANGE");
}
/**
* 主題交換機
* @return
*/
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("TOPIC_EXCHANGE");
}
/**
* 廣播交換機
* @return
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("FANOUT_EXCHANGE");
}
//定義四個佇列
@Bean
public Queue firsQueue(){
return new Queue("FIST_QUEUE");
}
@Bean
public Queue secondQueue(){
return new Queue("SECOND_QUEUE");
}
@Bean
public Queue thirdQueue(){
return new Queue("THIRD_QUEUE");
}
@Bean
public Queue fourthQueue(){
return new Queue("FOURTH_QUEUE");
}
//定義四個繫結關係
@Bean
public Binding bindFirst(@Qualifier("firsQueue") Queue queue,
@Qualifier("directExchange") DirectExchange directExchange){
return BindingBuilder.bind(queue).to(directExchange).with("zbb.test");
}
@Bean
public Binding bindSecond(@Qualifier("secondQueue") Queue queue,
@Qualifier("topicExchange") TopicExchange topicExchange) {
return BindingBuilder.bind(queue).to(topicExchange).with("*.zbb.*");
}
@Bean
public Binding bindThird(@Qualifier("thirdQueue") Queue queue,
@Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue).to(fanoutExchange);
}
@Bean
public Binding bindFourth(@Qualifier("fourthQueue") Queue queue,
@Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue).to(fanoutExchange);
}
}
建立消費者,四個,分別監聽4個佇列
1、FirstConsumer
package com.zbb.rabbitmq_consumer.consumer;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "FIST_QUEUE")
public class FirstConsumer {
@RabbitHandler
public void process(String msg) {
System.out.println("消費者一收到訊息:"+msg);
}
}
2、SecondConsumer
package com.zbb.rabbitmq_consumer.consumer;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "SECOND_QUEUE")
public class SecondConsumer {
@RabbitHandler
public void process(String msg) {
System.out.println("消費者二收到訊息:"+msg);
}
}
3、ThirdConsumer
package com.zbb.rabbitmq_consumer.consumer;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "THIRD_QUEUE")
public class ThirdConsumer {
@RabbitHandler
public void process(String msg) {
System.out.println("消費者三收到訊息:"+msg);
}
}
4、FourthConsumer
package com.zbb.rabbitmq_consumer.consumer;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "FOURTH_QUEUE")
public class FourthConsumer {
@RabbitHandler
public void process(String msg) {
System.out.println("消費者四收到訊息:"+msg);
}
}
訊息生產者:
引入pom依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
建立生產者類,給三個交換機發送訊息
package com.zbb.rabbitmq_producer.producer;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MyProducer {
@Autowired
RabbitTemplate rabbitTemplate;
public void send(){
//直連
rabbitTemplate.convertAndSend("DIRECT_EXCHANGE", "zbb.test",
"this is a direct msg");
//主題
rabbitTemplate.convertAndSend("TOPIC_EXCHANGE","kaifa.zbb.IT",
"this is a Topic msg");
//廣播
rabbitTemplate.convertAndSend("FANOUT_EXCHANGE", "",
"this is a fanout msg");
}
}
修改測試類:
package com.zbb.rabbitmq_producer;
import com.zbb.rabbitmq_producer.producer.MyProducer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqProducerApplicationTests {
@Autowired
MyProducer myProducer;
@Test
public void contextLoads() {
myProducer.send();
}
}
-
我們先啟動訊息生產者,開啟管理介面可以看到四個佇列裡都有一條訊息堆積
2.現在去啟動訊息消費者之後會發現控制檯打印出訊息,並且管理介面發生改變:
這個就是RabbitMQ在java中的簡單使用,具體在專案中的使用需要結合業務來進行訊息的收發。
下面將介紹一下RabbitMQ中一個重要的東西:交換機Exchange
這裡只介紹三種主要的交換機。
Direct Exchange 直連交換機
直連型別的交換機與一個佇列繫結時,需要指定一個明確的繫結關鍵字(binding key)。
路由規則:傳送訊息到直連型別的交換機時,只有路由關鍵字(routing key)跟繫結關鍵字(binding key)完全匹配時,繫結的佇列才能收到訊息。
佇列:
@Bean
public Queue firsQueue(){
return new Queue("FIST_QUEUE");
}
交換機:
@Bean
public DirectExchange directExchange(){
return new DirectExchange("DIRECT_EXCHANGE");
}
繫結:
@Bean
public Binding bindFirst(@Qualifier("firsQueue") Queue queue,
@Qualifier("directExchange") DirectExchange directExchange){
return BindingBuilder.bind(queue).to(directExchange).with("zbb.test");
}
傳送訊息:
rabbitTemplate.convertAndSend("DIRECT_EXCHANGE", "zbb.test",
"this is a direct msg");
Topic Exchange 主題交換機
定義:主題型別的交換機與一個佇列繫結時,可以指定按模式匹配的routing key。
萬用字元有兩個,*代表匹配一個單詞。#代表匹配零個或者多個單詞。單詞與單詞之間用 . 隔開。
路由規則:傳送訊息到主題型別的交換機時,routing key符合binding key的模式時,繫結的佇列才能收到訊息
佇列:
@Bean
public Queue secondQueue(){
return new Queue("SECOND_QUEUE");
}
交換機:
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("TOPIC_EXCHANGE");
}
繫結:
@Bean
public Binding bindSecond(@Qualifier("secondQueue") Queue queue,
@Qualifier("topicExchange") TopicExchange topicExchange) {
return BindingBuilder.bind(queue).to(topicExchange).with("*.zbb.*");
}
傳送訊息:
//主題
rabbitTemplate.convertAndSend("TOPIC_EXCHANGE","kaifa.zbb.IT",
"this is a Topic msg");
Fanout Exchange 廣播交換機,繫結兩個佇列thirdQueue fourthQueue
定義:廣播型別的交換機與一個佇列繫結時,不需要指定binding key。
路由規則:當訊息傳送到廣播型別的交換機時,不需要指定routing key,所有與之繫結的佇列都能收到訊息。
佇列:
@Bean
public Queue thirdQueue(){
return new Queue("THIRD_QUEUE");
}
@Bean
public Queue fourthQueue(){
return new Queue("FOURTH_QUEUE");
}
交換機:
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("FANOUT_EXCHANGE");
}
傳送訊息:
//廣播
rabbitTemplate.convertAndSend("FANOUT_EXCHANGE", "",
"this is a fanout msg");
進階知識 1、TTL(Time To Live) a、訊息的過期時間
有兩種設定方式:
通過佇列屬性設定訊息過期時間:
Map<String, Object> argss = new HashMap<String, Object>();
argss.put("x-message-ttl",6000);
channel.queueDeclare("TEST_TTL_QUEUE", false, false, false, argss);
設定單條訊息的過期時間:
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
// 持久化訊息
.deliveryMode
.contentEncoding("UTF-8")
.expiration("10000") // TTL
.build();
channel.basicPublish("", "TEST_TTL_QUEUE", properties, msg.getBytes());
b、佇列的過期時間:
Map<String, Object> argss = new HashMap<String, Object>();
argss.put("x-message-ttl",6000);
channel.queueDeclare("TEST_TTL_QUEUE", false, false, false, argss);
佇列的過期時間決定了在沒有任何消費者以後,佇列可以存活多久。
2、死信佇列
有三種情況訊息會進入DLX(Dead Letter Exchange)死信交換機。
- (NACK || Reject ) && requeue == false
- 訊息過期
- 佇列達到最大長度(先入隊的訊息會被髮送到DLX)
可以設定一個死信佇列(Dead Letter Queue)與DLX繫結,即可以儲存Dead Letter,消費者可以監聽這個佇列取走訊息。
Map<String,Object> arguments = new HashMap<String,Object>();
arguments.put("x-dead-letter-exchange","DLX_EXCHANGE");
// 指定了這個佇列的死信交換機
channel.queueDeclare("TEST_DLX_QUEUE", false, false, false, arguments);
// 宣告死信交換機
channel.exchangeDeclare("DLX_EXCHANGE","topic", false, false, false, null);
// 宣告死信佇列
channel.queueDeclare("DLX_QUEUE", false, false, false, null);
// 繫結
channel.queueBind("DLX_QUEUE","DLX_EXCHANGE","#");
3、優先順序佇列
設定一個佇列的最大優先順序:
Map<String, Object> argss = new HashMap<String, Object>();
argss.put("x-max-priority",10);
// 佇列最大優先順序
channel.queueDeclare("ORIGIN_QUEUE", false, false, false, argss);
傳送訊息時指定訊息當前的優先順序:
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.priority(5)
// 訊息優先順序
.build();
channel.basicPublish("", "ORIGIN_QUEUE", properties, msg.getBytes());
優先順序高的訊息可以優先被消費,但是:只有訊息堆積(訊息的傳送速度大於消費者的消費速度)的情況下優先順序 才有意義。
4、延遲佇列
RabbitMQ本身不支援延遲佇列。可以使用TTL結合DLX的方式來實現訊息的延遲投遞,即把DLX跟某個佇列繫結, 到了指定時間,訊息過期後,就會從DLX路由到這個佇列,消費者可以從這個佇列取走訊息。 另一種方式是使用rabbitmq-delayed-message-exchange外掛。
當然,將需要傳送的資訊儲存在資料庫,使用任務排程系統掃描然後傳送也是可以實現的。
5、RPC
RabbitMQ實現RPC的原理:服務端處理訊息後,把響應訊息傳送到一個響應佇列,客戶端再從響應佇列取到結 果。
其中的問題:Client收到訊息後,怎麼知道應答訊息是回覆哪一條訊息的?所以必須有一個唯一ID來關聯,就是 correlationId。
6、服務端流控(Flow Control)
RabbitMQ 會在啟動時檢測機器的實體記憶體數值。預設當 MQ 佔用 40% 以上記憶體時,MQ 會主動丟擲一個記憶體警告並阻塞所有連線(Connections)。
可以通過修改 rabbitmq.config 檔案來調整記憶體閾值,預設值是 0.4,如下 所示: [{rabbit, [{vm_memory_high_watermark, 0.4}]}]. 預設情況,如果剩餘磁碟空間在 1GB 以下,RabbitMQ 主動阻塞所有的生產者。這個閾值也是可調的。
注意佇列長度只在訊息堆積的情況下有意義,而且會刪除先入隊的訊息,不能實現服務端限流。
7、消費端限流
在AutoACK為false的情況下,如果一定數目的訊息(通過基於consumer或者channel設定Qos的值)未被確認 前,不進行消費新的訊息。
channel.basicQos(2);
// 如果超過2條訊息沒有傳送ACK,當前消費者不再接受佇列訊息
channel.basicConsume(QUEUE_NAME, false, consumer);
UI管理介面的使用
管理外掛提供了更簡單的管理方式。 啟用管理外掛
Windows啟用管理外掛
cd C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.6\sbin
rabbitmq-plugins.bat enable rabbitmq_management
Linux啟用管理外掛
cd /usr/lib/rabbitmq/bin
./rabbitmq-plugins enable rabbitmq_management
管理介面訪問埠:
預設埠是15672,預設使用者guest,密碼guest。guest使用者預設只能在本機訪問。