【SpringBoot1.x】SpringBoot1.x 訊息
SpringBoot1.x 訊息
概述
大多應用中,可通過訊息服務中介軟體來提升系統非同步通訊、擴充套件解耦能力。
訊息服務有兩個重要概念,即訊息代理(message broker)和目的地(destnation),當訊息傳送者傳送訊息以後,將由訊息代理接管,訊息代理保證訊息傳遞到指定目的地。
而目的地也有兩種形式:
- 佇列(queue) 點對點訊息通訊
- 訊息傳送者傳送訊息,訊息代理將其放入一個佇列中,訊息接收者從佇列中獲取訊息內容,訊息讀取後會被移除佇列。
- 訊息只有唯一哥的傳送者和接受者,但是不能說只能有一個接收者。
- 主題(topic) 釋出/訂閱訊息
- 傳送者/釋出者傳送訊息到主題,多個接受者/訂閱者監聽訂閱這個主題,那麼就會在訊息到達時同時收到訊息。
訊息佇列的兩個前導概念:
- JMS(Java Message Service):
- 基於 JVM 訊息代理的規範
- ActiveMQ、HornetMQ 是 JMS 實現
- AMQP(Advanced Message Queuing Protocol):
- 高階訊息佇列協議,也是一個訊息代理的規範,相容 JMS
- RabbitMQ 是 AMQP 的實現
- 兩者的異同:
JMS | AMQP | |
---|---|---|
定義 | Java Api | 網路線級協議 |
跨語言 | 否 | 是 |
跨平臺 | 否 | 是 |
Model | Peer-2-Peer、Pub/Sub | direct exchange、fanout exchange、topic change、headers exchange、system exchange |
支援訊息型別 | TextMessage、MapMessage、BytesMessage、StreamMessage、ObjectMessage、Message (只有訊息頭和屬性) | byte[] ,當實際應用時,有複雜的訊息,可以將訊息序列化後傳送 |
綜合評價 | JMS 定義了 JAVAAPI 層面的標準,其對跨平臺的支援較差 | AMQP 定義了網路層的協議標準,具有跨平臺、跨語言特性 |
應用場景
非同步處理
場景說明:使用者註冊後,需要發註冊郵件和註冊簡訊。傳統的做法有兩種 1.序列的方式;2.並行方式
序列方式:將註冊資訊寫入資料庫成功後,傳送註冊郵件,再發送註冊簡訊。以上三個任務全部完成後,返回給客戶端。
並行方式:將註冊資訊寫入資料庫成功後,傳送註冊郵件的同時,傳送註冊簡訊。以上三個任務完成後,返回給客戶端。與序列的差別是,並行的方式可以提高處理的時間。
引入訊息佇列後,將不是必須的業務邏輯,非同步處理。改造後的架構如下:
應用解耦
場景說明:使用者下單後,訂單系統需要通知庫存系統。傳統的做法是,訂單系統呼叫庫存系統的介面。如下圖:
傳統模式的缺點:假如庫存系統無法訪問,則訂單減庫存將失敗,從而導致訂單失敗,訂單系統與庫存系統耦合。
引入訊息佇列後,
訂單系統:使用者下單後,訂單系統完成持久化處理,將訊息寫入訊息佇列,返回使用者訂單下單成功。
庫存系統:訂閱下單的訊息,採用拉/推的方式,獲取下單資訊,庫存系統根據下單資訊,進行庫存操作。
流量削峰
場景說明:秒殺活動,一般會因為流量過大,導致流量暴增,應用掛掉。為解決這個問題,一般需要在應用前端加入訊息佇列。
引入訊息佇列後,
可以控制活動的人數,也可以緩解短時間內高流量壓垮應用。
日誌處理
場景說明:日誌處理是指將訊息佇列用在日誌處理中,比如Kafka的應用,解決大量日誌傳輸的問題。
引入訊息佇列後,
日誌採集客戶端:負責日誌資料採集,定時寫受寫入 Kafka 佇列。
Kafka 訊息佇列:負責日誌資料的接收,儲存和轉發。
日誌處理應用:訂閱並消費 kafka 佇列中的日誌資料。
訊息通訊
場景說明:息佇列一般都內建了高效的通訊機制,因此也可以用在純的訊息通訊。比如實現點對點訊息佇列,或者聊天室等。
引入訊息佇列後,
點對點通訊:客戶端A和客戶端B使用同一佇列,進行訊息通訊。
聊天室通訊:客戶端A,客戶端B,客戶端N訂閱同一主題,進行訊息釋出和接收,實現類似聊天室效果。
RabbitMQ
Spring 支援:
- spring-jms 提供了對 JMS 的支援
- spring-rabbit 提供了對 AMQP 的支援
- 需要 ConnectionFactory 的實現來連線訊息代理
- 提供 JmsTemplate、RabbitTemplate 來發送訊息
- @JmsListener(JMS)、@RabbitListener(AMQP)註解在方法上監聽訊息代理髮布的訊息
- @EnableJms、@EnableRabbit 開啟支援
Spring Boot 自動配置:
- JmsAutoConfiguration
- RabbitAutoConfiguration
RabbitMQ 是部署最廣泛的開源訊息代理,它是由 Erlang 開發的 AMQP 的開源實現。
基本概念
- Message 訊息。訊息是不具名的,它由訊息頭和訊息體組成。訊息體是不透明的,而訊息頭則由一系列的可選屬性組成,這些屬性包括:
- routing-key(路由鍵)
- priority(相對於其他訊息的優先權)
- delivery-mode(指出該訊息可能需要永續性儲存)
- Exchange 交換器。用來接收生產者傳送的訊息並將這些訊息路由給伺服器中的佇列。
- Queue 訊息佇列。用來儲存訊息直到傳送給消費者。它是訊息的容器,也是訊息的終點。一個訊息可投入一個或多個佇列。訊息一直在佇列裡面,等待消費者連線到這個佇列將其取走。
- Binding 繫結。用於訊息佇列和交換器之間的關聯。一個繫結就是基於路由鍵將交換器和訊息佇列連線起來的路由規則,所以可以將交換器理解成一個由繫結構成的路由表。Exchange 和 Queue 的繫結可以是多對多的關係。
- Connection 網路連線。比如一個 TCP 連線。
- Channel 通道。多路複用連線中的一條獨立的雙向資料流通道。通道是建立在真實的TCP連線內的虛擬連線,AMQP 命令都是通過通道發出去的,不管是釋出訊息、訂閱佇列還是接收訊息,這些動作都是通過通道完成。因為對於作業系統來說建立和銷燬 TCP 都是非常昂貴的開銷,所以引入了通道的概念,以複用一條 TCP 連線。
- Publisher 訊息的生產者。是一個向交換器釋出訊息的客戶端應用程式。
- Consumer 訊息的消費者。是一個從訊息佇列中取得訊息的客戶端應用程式。
- Virtual Host 虛擬主機。表示一批交換器、訊息佇列和相關物件。虛擬主機是共享相同的身份認證和加密環境的獨立伺服器域。每個 vhost 本質上就是一個 mini 版的 RabbitMQ 伺服器,擁有自己的佇列、交換器、繫結和許可權機制。vhost 是 AMQP 概念的基礎,必須在連線時指定。RabbitMQ 預設的 vhost 是
/
。 - Broker 訊息佇列伺服器實體。
這個概念之間的關係為:
執行機制
AMQP 中的訊息路由:AMQP 中訊息的路由過程和 Java 開發者熟悉的 JMS 存在一些差別,AMQP 中增加了 Exchange 和 Binding 的角色。生產者把訊息釋出到 Exchange 上,訊息最終到達佇列並被消費者接收,而 Binding 決定交換器的訊息應該傳送到那個佇列。
Exchange 型別:
-
Direct Exchange 訊息中的路由鍵(routing key)如果和 Binding 中的 binding key 一致, 交換器就將訊息發到對應的佇列中。路由鍵與佇列名完全匹配,如果一個佇列繫結到交換機要求路由鍵為 “dog”,則只轉發 routing key 標記為 “dog” 的訊息,不會轉發 “dog.puppy”,也不會轉發 “dog.guard” 等等。它是完全匹配、單播的模式。
-
Fanout Exchange 每個發到 fanout 型別交換器的訊息都會分到所有繫結的佇列上去。fanout 交換器不處理路由鍵,只是簡單的將佇列繫結到交換器上,每個傳送到交換器的訊息都會被轉發到與該交換器繫結的所有佇列上。它是廣播的模式。
-
Topic Exchange 它通過模式匹配分配訊息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時佇列需要繫結到一個模式上。它將路由鍵和繫結鍵的字串切分成單詞,這些單詞之間用點隔開。它同樣也會識別兩個萬用字元:符號
“#”
和符號“*”
。# 匹配 0 個或多個單詞,* 匹配 1 個單詞。
安裝測試
-
Docker 安裝 RabbitMQ:
docker pull rabbitmq:3.7.28-management
-
啟動 RabbitMQ:
docker run --name rabbitmq -p 5672:5672 -p 15672:15672 -d rabbitmq:3.7.28-management
-
進入管理頁面
http://localhost:15672/
,輸入 使用者名稱 guest,密碼 guest -
在管理頁面新增 Exchanges 和 Queues
-
引入 spring-boot-starter-amqp
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
-
編寫配置檔案:
spring.rabbitmq.username=guest spring.rabbitmq.password=guest
-
主配置類新增上
@EnableRabbit
,編寫測試檔案:@RunWith(SpringRunner.class) @SpringBootTest public class IntegrationMessageApplicationTests { @Autowired RabbitTemplate rabbitTemplate; /** * 傳送訊息 * 單播,一對一的 */ @Test public void testDirectExchangeSend() { // 方式1:使用特定的路由金鑰將訊息傳送到特定的交換機。 // 需要構造一個 Message(byte[] body, MessageProperties messageProperties) ,定義訊息體內容和訊息頭 // rabbitTemplate.send(String exchange, String routingKey, Message message); // 方式2:將 Java物件 轉換為 Amqp Message,會自動序列化,然後使用特定的路由金鑰將其傳送到特定的交換機。 // rabbitTemplate.convertAndSend(String exchange, String routingKey, Object message); HashMap<String, Object> map = new HashMap<>(); map.put("msg", "第一個訊息"); map.put("data", Arrays.asList("HelloWorld", 1024, true, new Book(12315125, "RabbitMQ 實戰", "parzulpan"))); rabbitTemplate.convertAndSend("exchange.direct", "parzulpan.news", map); // 物件以預設 jdk 序列化的形式傳送 } /** * 接收訊息 */ @Test public void testDirectExchangeReceive() { // 方式1 // rabbitTemplate.receive(String queueName) // 方式2 // rabbitTemplate.receiveAndConvert(String queueName) Object receive = rabbitTemplate.receiveAndConvert("parzulpan.news"); System.out.println(receive.getClass()); System.out.println(receive); } /** * 傳送訊息 * 廣播,一對多的 */ @Test public void testFanoutExchangeSend() { rabbitTemplate.convertAndSend("exchange.fanout", "", new Book(124123561, "RabbitMQ 原始碼剖析", "parzulpan")); } /** * 接收訊息 */ @Test public void testFanoutExchangeReceive() { Object receive = rabbitTemplate.receiveAndConvert("parzulpan"); System.out.println(receive.getClass()); System.out.println(receive); } /** * 傳送訊息 * 模式匹配 */ @Test public void testTopicExchangeSend() { rabbitTemplate.convertAndSend("exchange.topic", "parzulpan.#", new Book(1541351332, "RabbitMQ 優化", "parzulpan")); } /** * 接收訊息 */ @Test public void testTopicExchangeReceive() { Object receive1 = rabbitTemplate.receiveAndConvert("parzulpan"); System.out.println(receive1.getClass()); System.out.println(receive1); Object receive2 = rabbitTemplate.receiveAndConvert("parzulpan.emps"); System.out.println(receive2.getClass()); System.out.println(receive2); } }
-
為了 Json 格式序列化,可以自定義訊息轉換器
/** * @Author : parzulpan * @Time : 2021-01 * @Desc : 自定義 AMQP 配置類 */ @Configuration public class CustomAMQPConfig { /** * 自定義訊息轉換器 */ @Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } }
訊息監聽
可以在業務層方法上監聽訊息佇列的內容:
/**
* @Author : parzulpan
* @Time : 2021-01
* @Desc : 資料業務類
*/
@Service
public class BookService {
@RabbitListener(queues = {"parzulpan", "parzulpan.emps"})
public void receive(Book book) {
System.out.println("收到訊息: " + book);
}
}
當監聽的佇列收到訊息時就會執行方法。
AmqpAdmin 使用
之前的安裝測試,都是在管理頁面新增的 Exchanges 和 Queues 等,也可以通過 AmqpAdmin 建立和刪除 Exchanges、Queues、Binding 等。
@RunWith(SpringRunner.class)
@SpringBootTest
public class IntegrationMessageApplicationTests {
@Autowired
AmqpAdmin amqpAdmin;
@Test
public void testAmqpAdmin() {
// 建立 Exchange
amqpAdmin.declareExchange(new DirectExchange("AmqpAdminExchange.direct", true, false));
// 建立 Queue
amqpAdmin.declareQueue(new Queue("AmqpAdmin.queue", true));
// 建立 Binding
amqpAdmin.declareBinding(new Binding("AmqpAdmin.queue", Binding.DestinationType.QUEUE,
"AmqpAdminExchange.direct","AmqpAdmin.parzulpan", null));
}
@Test
public void testAmqpAdminDelete() {
// 建立 Exchange
amqpAdmin.deleteExchange("AmqpAdminExchange.direct");
// 建立 Queue
amqpAdmin.deleteQueue("AmqpAdmin.queue");
}
}