Spring Cloud整合RabbitMQ的使用
同步 or 非同步
前言:我們現在有一個用微服務架構模式開發的系統,系統裡有一個商品服務和訂單服務,且它們都是同步通訊的。
目前我們商品服務和訂單服務之間的通訊方式是同步的,當業務擴大之後,如果還繼續使用同步的方式進行服務之間的通訊,會使得服務之間的耦合增大。例如我們登入操作可能需要同步呼叫使用者服務、積分服務、簡訊服務等等,而服務之間可能又依賴別的服務,那麼這樣一個登入過程就會耗費不少的時間,以致使用者的體驗降低。
那我們在微服務架構下要如何對服務之間的通訊進行解耦呢?這就需要使用到訊息中介軟體了,訊息中介軟體可以幫助我們將同步的通訊轉化為非同步通訊,服務之間只需要對訊息佇列進行訊息的釋出、訂閱即可,從而解耦服務之間的通訊依賴。
目前較為主流的訊息中介軟體:
- RabbitMQ
- Kafka
- ActiveMQ
非同步通訊特點:
- 客戶端請求不會阻塞程序,服務端的響應可以是非即時的
非同步的常見形態:
- 推送通知
- 請求/非同步響應
- 訊息佇列
MQ應用場景:
- 非同步處理
- 流量削峰
- 日誌處理
- 應用解耦
更多關於訊息中介軟體的描述,可以參考我另一篇文章:
RabbitMQ的基本使用
在上文 Spring Cloud Config - 統一配置中心 中,已經演示過使用Docker安裝RabbitMQ,所以這裡就不再浪費篇幅演示了。
直接進入正題,我們以訂單服務和商品服務示例,首先在訂單服務的專案中,加入mq的依賴:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在配置檔案中增加RabbitMQ的相關配置項:
到訂單服務的專案中,新建一個message包,在該包中建立一個MqReceiver類,我們來看看RabbitMQ的基本操作。程式碼如下:
package org.zero.springcloud.order.server.message; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @program: sell_order * @description: 接收訊息,即消費者 * @author: 01 * @create: 2018-08-21 22:24 **/ @Slf4j @Component public class MqReceiver { /** * 接收訊息並列印 * * @param message message */ @RabbitListener(queues = "myQueue") public void process(String message) { // @RabbitListener註解用於監聽RabbitMQ,queues指定監聽哪個佇列 log.info(message); } }
因為RabbitMQ上還沒有myQueue這個佇列,所以我們還得到RabbitMQ的管理介面上,建立這個佇列,如下:
然後新建一個測試類,用於傳送訊息到佇列中,程式碼如下:
package org.zero.springcloud.order.server;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @program: sell_order
* @description: 傳送訊息,即訊息釋出者
* @author: 01
* @create: 2018-08-21 22:28
**/
@RunWith(SpringRunner.class)
@SpringBootTest
public class MqSenderTest {
@Autowired
private AmqpTemplate amqpTemplate;
@Test
public void send() {
for (int i = 0; i < 100; i++) {
amqpTemplate.convertAndSend("myQueue", "第" + i + "條訊息");
}
}
}
執行該測試類,執行成功後到OrderApplication的控制檯上,看看是否接收並列印了接收到的訊息。正常情況應如下:
基本的消費者和釋出者的程式碼我們都已經編寫過,並且也測試成功了。但有個小問題,我們要監聽一個不存在的佇列時,需要手動去新建這個佇列,感覺每次都手動新建挺麻煩的。有沒有辦法當佇列不存在時,自動建立該佇列呢?答案是有的,依舊使用之前的那個註解,只不過這次的引數要換成queuesToDeclare
。示例程式碼如下:
package org.zero.springcloud.order.server.message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @program: sell_order
* @description: 接收訊息,即消費者
* @author: 01
* @create: 2018-08-21 22:24
**/
@Slf4j
@Component
public class MqReceiver {
/**
* 接收並列印訊息
* 可以當佇列不存在時自動建立佇列
*
* @param message message
*/
@RabbitListener(queuesToDeclare = @Queue("myQueue"))
public void process2(String message) {
// @RabbitListener註解用於監聽RabbitMQ,queuesToDeclare可以建立指定的佇列
log.info(message);
}
}
以上我們通過示例簡單的介紹了訊息的收發及佇列的建立,本小節則介紹一下exchange 的自動繫結方式。當需要自動繫結 exchange 時,我們也可以通過 bindings 引數完成。示例程式碼如下:
package org.zero.springcloud.order.server.message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @program: sell_order
* @description: 接收訊息,即消費者
* @author: 01
* @create: 2018-08-21 22:24
**/
@Slf4j
@Component
public class MqReceiver {
/**
* 接收並列印訊息
* 可以當佇列不存在時自動建立佇列,以及自動繫結指定的Exchange
* @param message message
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue("myQueue"),
exchange = @Exchange("myExchange")
))
public void process3(String message) {
// @RabbitListener註解用於監聽RabbitMQ,bindings可以建立指定的佇列及自動繫結Exchange
log.info(message);
}
}
訊息分組我們也是可以通過 bindings 引數完成,例如現在有一個數碼供應商服務和一個水果供應商服務,它們都監聽著同一個訂單服務的訊息佇列。但我希望數碼訂單的訊息被數碼供應商服務消費,而水果訂單的訊息被水果供應商服務消費。所以我們就需要用到訊息分組。示例程式碼如下:
package org.zero.springcloud.order.server.message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @program: sell_order
* @description: 接收訊息,即消費者
* @author: 01
* @create: 2018-08-21 22:24
**/
@Slf4j
@Component
public class MqReceiver {
/**
* 數碼供應商服務 - 接收訊息
*
* @param message message
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue("computerOrder"),
exchange = @Exchange("myOrder"),
key = "computer" // 指定路由的key
))
public void processComputer(String message) {
log.info("computer message : {}", message);
}
/**
* 水果供應商服務 - 接收訊息
*
* @param message message
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue("computerOrder"),
exchange = @Exchange("myOrder"),
key = "fruit" // 指定路由的key
))
public void processFruit(String message) {
log.info("fruit message : {}", message);
}
}
測試程式碼如下,通過指定key進行訊息的分組,將訊息傳送到數碼供應商服務:
package org.zero.springcloud.order.server;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @program: sell_order
* @description: 傳送訊息,即訊息釋出者
* @author: 01
* @create: 2018-08-21 22:28
**/
@RunWith(SpringRunner.class)
@SpringBootTest
public class MqSenderTest {
@Autowired
private AmqpTemplate amqpTemplate;
@Test
public void sendOrder() {
for (int i = 0; i < 100; i++) {
// 第一個引數指定佇列,第二個引數來指定路由的key,第三個引數指定訊息
amqpTemplate.convertAndSend("myOrder", "computer", "第" + i + "條訊息");
}
}
}
重啟專案後,執行以上測試程式碼,控制檯輸出如下,可以看到只有數碼供應商服務才能夠接收到訊息,而水果供應商服務是接收不到的。這就完成了訊息分組:
Spring Cloud Stream的使用
Spring Cloud Stream 是一個用來為微服務應用構建訊息驅動能力的框架。它可以基於Spring Boot 來建立獨立的,可用於生產的Spring 應用程式。他通過使用Spring Integration來連線訊息代理中介軟體以實現訊息事件驅動。Spring Cloud Stream 為一些供應商的訊息中介軟體產品提供了個性化的自動化配置實現,引用了釋出-訂閱、消費組、分割槽的三個核心概念。目前僅支援RabbitMQ、Kafka。
什麼是Spring Integration ? Integration 整合
企業應用整合(EAI)是整合應用之間資料和服務的一種應用技術。四種整合風格:
- 檔案傳輸:兩個系統生成檔案,檔案的有效負載就是由另一個系統處理的訊息。該類風格的例子之一是針對檔案輪詢目錄或FTP目錄,並處理該檔案。
- 共享資料庫:兩個系統查詢同一個資料庫以獲取要傳遞的資料。一個例子是你部署了兩個EAR應用,它們的實體類(JPA、Hibernate等)共用同一個表。
- 遠端過程呼叫:兩個系統都暴露另一個能呼叫的服務。該類例子有EJB服務,或SOAP和REST服務。
- 訊息:兩個系統連線到一個公用的訊息系統,互相交換資料,並利用訊息呼叫行為。該風格的例子就是眾所周知的中心輻射式的(hub-and-spoke)JMS架構。
Spring Integration作為一種企業級整合框架,遵從現代經典書籍《企業整合模式》,為開發者提供了一種便捷的實現模式。Spring Integration構建在Spring控制反轉設計模式之上,抽象了訊息源和目標,利用訊息傳送和訊息操作來整合應用環境下的各種元件。訊息和整合關注點都被框架處理,所以業務元件能更好地與基礎設施隔離,從而降低開發者所要面對的複雜的整合職責。
模型圖:
現在我們來看看Spring Cloud Stream的基本使用,到訂單服務專案上,增加如下依賴:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
然後是在配置檔案中,配置rabbitmq的相關資訊,只不過我們之前已經配置過了所以不用配置了。
我們來看看如何使用Spring Cloud Stream傳送和接收訊息,首先建立一個介面,定義input和output方法。程式碼如下:
package org.zero.springcloud.order.server.message;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface StreamClient {
// 接收訊息、入口
@Input("myMessageInput")
SubscribableChannel input();
// 傳送訊息、
@Output("myMessageOutput")
MessageChannel output();
}
建立一個訊息接收者。程式碼如下:
package org.zero.springcloud.order.server.message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;
/**
* @program: sell_order
* @description: 訊息接收者
* @author: 01
* @create: 2018-08-22 22:16
**/
@Slf4j
@Component
@EnableBinding(StreamClient.class)
public class StreamReceiver {
@StreamListener("myMessageOutput")
public void process(String message) {
log.info("message : {}", message);
}
}
訊息傳送者,這裡作為一個Controller存在。程式碼如下:
package org.zero.springcloud.order.server.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.zero.springcloud.order.server.message.StreamClient;
/**
* @program: sell_order
* @description: 訊息傳送者
* @author: 01
* @create: 2018-08-22 22:18
**/
@RestController
public class SendMessageController {
private final StreamClient streamClient;
@Autowired
public SendMessageController(StreamClient streamClient) {
this.streamClient = streamClient;
}
@GetMapping("/send/msg")
public void send() {
for (int i = 0; i < 100; i++) {
MessageBuilder<String> messageBuilder = MessageBuilder.withPayload("這是第" + i + "條訊息");
streamClient.output().send(messageBuilder.build());
}
}
}
因為我們的微服務可能會部署多個例項,若有多個例項需要對訊息進行分組,否則所有的服務例項都會接收到相同的訊息。在配置檔案中,增加如下配置完成訊息的分組:
spring:
...
cloud:
...
stream:
bindings:
myMessageOutput:
group: order
...
重啟專案,訪問http://localhost:9080/send/msg
,控制檯輸出如下:
注:Spring Cloud Stream可以在專案啟動的時候自動建立佇列,在專案關閉的時候自動刪除佇列
在實際的開發中,我們一般傳送的訊息通常會是一個java物件而不是字串。所以我們來看看如何傳送物件,其實和傳送字串幾乎是一樣的。訊息傳送者程式碼如下:
package org.zero.springcloud.order.server.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.zero.springcloud.order.server.dto.OrderDTO;
import org.zero.springcloud.order.server.message.StreamClient;
/**
* @program: sell_order
* @description: 訊息傳送者
* @author: 01
* @create: 2018-08-22 22:18
**/
@RestController
public class SendMessageController {
private final StreamClient streamClient;
@Autowired
public SendMessageController(StreamClient streamClient) {
this.streamClient = streamClient;
}
/**
* 傳送OrderDTO物件
*/
@GetMapping("/send/msg")
public void send() {
OrderDTO orderDTO = new OrderDTO();
orderDTO.setOrderId("123465");
MessageBuilder<OrderDTO> messageBuilder = MessageBuilder.withPayload(orderDTO);
streamClient.output().send(messageBuilder.build());
}
}
訊息接收者也只需要在方法引數上宣告這個物件的型別即可。程式碼如下:
package org.zero.springcloud.order.server.message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;
import org.zero.springcloud.order.server.dto.OrderDTO;
/**
* @program: sell_order
* @description: 訊息接收者
* @author: 01
* @create: 2018-08-22 22:16
**/
@Slf4j
@Component
@EnableBinding(StreamClient.class)
public class StreamReceiver {
/**
* 接收OrderDTO物件
* @param message message
*/
@StreamListener("myMessageOutput")
public void process(OrderDTO message) {
log.info("message : {}", message);
}
}
另外需要提到的一點是,預設情況下,java物件在訊息佇列中是以base64編碼存在的,我們也都知道base64不可讀。為了方便檢視堆積在訊息佇列裡的物件資料,我們希望java物件是以json格式的字串呈現,這樣就方便我們人類閱讀。至於這個問題,我們只需要在配置檔案中,增加一段content-type的配置即可。如下:
spring:
...
cloud:
...
stream:
bindings:
myMessageOutput:
group: order
content-type: application/json
...
重啟專案,訪問http://localhost:9080/send/msg
,控制檯輸出如下:
2018-08-22 23:32:33.704 INFO 12436 --- [nio-9080-exec-4] o.z.s.o.server.message.StreamReceiver
: message : OrderDTO(orderId=123465, buyerName=null, buyerPhone=null, buyerAddress=null, buyerOpenid=null,
orderAmount=null, orderStatus=null, payStatus=null, createTime=null, updateTime=null, orderDetailList=null)
當我們接收到訊息的時候,可能會需要返回一段特定的訊息,表示訊息已收到之類的。至於這個功能,我們通過@SendTo
註解即可完成。程式碼如下:
package org.zero.springcloud.order.server.message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;
import org.zero.springcloud.order.server.dto.OrderDTO;
/**
* @program: sell_order
* @description: 訊息接收者
* @author: 01
* @create: 2018-08-22 22:16
**/
@Slf4j
@Component
@EnableBinding(StreamClient.class)
public class StreamReceiver {
/**
* 接收OrderDTO物件
* @param message message
*/
@StreamListener("myMessageOutput")
@SendTo("myMessageInput")
public String process(OrderDTO message) {
log.info("message : {}", message);
return "success";
}
@StreamListener("myMessageInput")
public void success(String message) {
log.info("message : {}", message);
}
}
重啟專案,訪問http://localhost:9080/send/msg
,控制檯輸出如下:
Spring Cloud Stream 再一次簡化了我們在分散式環境下對訊息中介軟體的操作,配置好訊息中介軟體的連線地址及使用者密碼後,在開發的過程中,我們只需要關注input和output,對訊息中介軟體的操作基本是無感知的。