SpringCloud-Stream 訊息驅動
阿新 • • 發佈:2020-04-28
# 一、概述
## 是什麼?
Spring Cloud Stream 是一個構建訊息微服務驅動的框架。可以遮蔽底層訊息中介軟體的差異,降低版本切換成本,統一訊息的程式設計模型,目前僅支援 RabbitMQ 和 Kafka。
## 設計思想
### 標準 MQ 的設計思想
![](https://gitee.com/songjilong/FigureBed/raw/master/img/20200427105050.png)
生產者 / 消費者之間靠訊息媒介傳遞資訊內容,Message
訊息必須走特定的通道,MessageChannel
訊息通道里的訊息如何被消費呢,誰負責收發處理?訊息通道MessageChannel的子介面SubscribableChannel,由訊息處理器MessageHandler所訂閱
### Spring Cloud Stream 的設計思想
如果我們的專案中用到了 RabbitMQ 和 Kafka 兩種訊息中介軟體,由於它們的架構不同,對實際開發造成了一定困擾;或者用到了一種訊息中介軟體,隨著後面的業務需求需要向另一種訊息佇列遷移,這無疑是災難性的,會造成一大堆的改動,因為它們與系統耦合了,這時候 Spring Cloud Stream 就可以為我們提供一種解耦的方式。
Spring Cloud Stream 提供的解決方案是:**通過定義繫結器 Binder 作為中間層,實現了應用程式與訊息中介軟體細節之間的隔離。**嚮應用程式暴露統一的 Channel 通道,使得應用程式不需要再考慮各種訊息中介軟體的實現。
![](https://gitee.com/songjilong/FigureBed/raw/master/img/20200427111957.png)
inputs 對應消費者,outputs 對應生產者
**Stream中的訊息通訊方式遵循了釋出-訂閱模式,用 Topic 主題進行廣播(在RabbitMQ就是Exchange,在Kafka中就是Topic)**
## 工作流程
![](https://gitee.com/songjilong/FigureBed/raw/master/img/20200427112924.png)
Binder:繫結器,很方便的連線中介軟體,遮蔽差異
Channel:通道,是佇列 Queue 的一種抽象,在訊息通訊系統中就是實現儲存與轉發的媒介,通過 Channel 對佇列進行配置
Source 和 Sink:簡單理解就是參照物是 Spring Cloud Stream 本身,從 Stream 釋出訊息就是輸出,接收訊息就是輸入
## 編碼 API 和常用註解
![](https://gitee.com/songjilong/FigureBed/raw/master/img/20200427113659.png)
# 二、基本使用
## 生產者
配置:
```yml
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 配置繫結的rabbitmq的服務資訊
defaultRabbit: # 表示定義的名稱,用於與binding整合
type: rabbit # 訊息元件的型別
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服務的整合處理
output: # 這個名字是一個通道的名字
destination: studyExchange # 表示要使用的Exchange名稱定義
content-type: application/json # 設定訊息型別,如果文字就是“text/plain”
binder: defaultRabbit # 設定要繫結的訊息服務的具體設定
```
傳送訊息:
```java
@EnableBinding(Source.class)
@Slf4j
public class MessageProviderImpl implements IMessageProvider {
@Autowired
private MessageChannel output;
@Override
public void send() {
String serial = IdUtil.simpleUUID();
output.send(MessageBuilder.withPayload(serial).build());
log.info("流水號:" + serial);
}
}
```
## 消費者
配置與生產者一致,只需要把 output 改為 input
接收訊息:
```java
@Controller
@EnableBinding(Sink.class)
@Slf4j
public class MessageReceiveController {
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)
public void receiveMessage(Message message){
log.info("receive -> " + serverPort + " -> " +message.getPayload());
}
}
```
# 三、解決訊息重複消費的問題
## 場景
舉個栗子,我們對訂單系統做了叢集部署,消費者從 RabbitMQ 中獲取訂單資訊,如果同一個訂單被不同的服務都獲取到了,就會造成資料錯誤,為了避免這種情況,我們可以使用 Stream 中的訊息分組來解決。
## 原理
在 Stream 中,處於同一個組的多個消費者是競爭關係,就可以保證訊息只被一個服務消費一次,而不同組是可以重複消費的。現在預設分組就是不同的,組流水號不一樣。
## 解決
將不想產生重複消費的服務分為同一個組即可
## 配置方式
```yml
spring:
cloud:
stream:
bindings:
input:
group: groupA
```
# 四、持久化
如果我們的消費者因為種種原因宕機了,生產者此時傳送了訊息,沒有配置 group 屬性的消費者重新上線後無法接收到之前的訊息,而配置了 group 的消費者仍會接收到訊息,這就是持久化