11.SpringCloud Stream (訊息驅動)
阿新 • • 發佈:2021-01-02
1.訊息驅動概述
是什麼
遮蔽底層訊息中介軟體的差異,降低切換版本,統一訊息的程式設計模型官網https://spring.io/projects/spring-cloud-stream#overviewhttps://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.1.RELEASE/reference/html/ Spring Cloud Stream中文指導手冊https://m.wang1314.com/doc/webapp/topic/20971999.html
設計思想
標準MQ
- 生產者/消費者之間靠訊息媒介傳遞資訊內容 Message
- 訊息必須走特定的通道 訊息通道MessageChannel
- 訊息通道里的訊息如何被消費呢,誰負責收發處理
為什麼用Cloud Stream
stream憑什麼可以統一底層差異
Binder
- INPUT對應於消費者
- OUTPUT對應於生產者
Stream中的訊息通訊方式遵循了釋出-訂閱模式
Topic主題進行廣播- 在RabbitMQ就是Exchange
- 在kafka中就是Topic
Spring Cloud Stream標準流程套路
- Binder
- Channel
- Source和Sink
編碼API和常用註解
2.案例說明
前提:RabbitMQ環境已經OK工程中新建三個子模組- cloud-stream-rabbitmq-provider8801,作為生產者進行發訊息模組
- cloud-stream-rabbitmq-consumer8802,作為訊息接收模組
- cloud-stream-rabbitmq-consumer8803,作為訊息接收模組
3.訊息驅動之生產者
新建模組 cloud-stream-rabbitmq-provider8801pomyml<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>springcloud2020</artifactId> <groupId>com.chl.springcloud</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>cloud-stream-rabbitmq-provider8801</artifactId> <dependencies> <!--springcloud-stream和rabbit的整合--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-eureka-server --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>com.chl.springcloud</groupId> <artifactId>cloud-api-commons</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-devtools --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-test --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> </project>
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 在此處配置要繫結的rabbitmq的服務資訊;
defaultRabbit: # 表示定義的名稱,用於於binding整合
type: rabbit # 訊息元件型別
environment: # 設定rabbitmq的相關的環境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服務的整合處理
output: # 這個名字是一個通道的名稱
destination: studyExchange # 表示要使用的Exchange名稱定義
content-type: application/json # 設定訊息型別,本次為json,文字則設定“text/plain”
binder: defaultRabbit # 設定要繫結的訊息服務的具體設定
eureka:
client: # 客戶端進行Eureka註冊的配置
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 設定心跳的時間間隔(預設是30秒)
lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(預設是90秒)
instance-id: send-8801.com # 在資訊列表時顯示主機名稱
prefer-ip-address: true # 訪問的路徑變為IP地址
啟動類@SpringBootApplication
public class StreamMQMain8801 {
public static void main(String[] args) {
SpringApplication.run(StreamMQMain8801.class, args);
}
}
業務類1.傳送訊息介面public interface IMessageProvider {
public String send();
}
2.傳送訊息介面實現類@EnableBinding(Source.class) //定義訊息的推送管道
public class MessageProviderImpl implements IMessageProvider {
@Autowired
private MessageChannel output; // 訊息傳送管道
@Override
public String send() {
String serial = UUID.randomUUID().toString();
//發訊息出去 將訊息用builder一下,發出去
output.send(MessageBuilder.withPayload(serial).build());
System.out.println("*****serial: "+serial);
return null;
}
}
3.Controller@RestController
public class SendMessageController {
@Autowired
private IMessageProvider messageProvider;
@GetMapping(value = "/sendMessage")
public String sendMessage() {
return messageProvider.send();
}
}
測試1.啟動7001eureka2.啟動rabbitmq 訪問 http://localhost:15672/ 3.啟動88014.訪問 http://localhost:8801/sendMessage 4.訊息驅動之消費者
新建 cloud-stream-rabbitmq-consumer8802pom<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>springcloud2020</artifactId>
<groupId>com.chl.springcloud</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cloud-stream-rabbitmq-consumer8802</artifactId>
<dependencies>
<!--springcloud-stream和rabbit的整合-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-eureka-server -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>com.chl.springcloud</groupId>
<artifactId>cloud-api-commons</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-devtools -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
ymlserver:
port: 8802
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 在此處配置要繫結的rabbitmq的服務資訊;
defaultRabbit: # 表示定義的名稱,用於於binding整合
type: rabbit # 訊息元件型別
environment: # 設定rabbitmq的相關的環境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服務的整合處理
input: # 這個名字是一個通道的名稱
destination: studyExchange # 表示要使用的Exchange名稱定義
content-type: application/json # 設定訊息型別,本次為json,文字則設定“text/plain”
binder: defaultRabbit # 設定要繫結的訊息服務的具體設定
group: chlGroup1 #所屬分組
eureka:
client: # 客戶端進行Eureka註冊的配置
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 設定心跳的時間間隔(預設是30秒)
lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(預設是90秒)
instance-id: receive-8802.com # 在資訊列表時顯示主機名稱
prefer-ip-address: true # 訪問的路徑變為IP地址
啟動類@SpringBootApplication
public class StreamMQMain8802 {
public static void main(String[] args) {
SpringApplication.run(StreamMQMain8802.class, args);
}
}
業務類@Component
@EnableBinding(Sink.class) //開啟監聽 input
public class ReceiveMessageListenerController {
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)//監聽
public void input(Message<String> message) {
System.out.println("消費者2號,接受:"+message.getPayload()+"\t port:"+serverPort);
}
}
測試8801傳送8802接收訊息http://localhost:8801/sendMessage 5.分組消費與持久化
重複消費演示
依照8802,clone出來一份執行8803新建立 cloud-stream-rabbitmq-consumer8803啟動- RabbitMQ
- 服務註冊 7001
- 訊息生產 8801
- 訊息消費 8802 ,8803
執行後兩個問題
- 有重複消費問題
- 訊息持久化問題
目前是8802/8803同時都收到了,存在重複消費問題http://localhost:8801/sendMessage 如何解決分組和持久化屬性group
分組
原理:微服務應用放置於同一個group中,就能夠保證訊息只會被其中一個應用消費一次。不同的組是可以消費的,同一個組內會發生競爭關係,只有其中一個可以消費。分為同組測試
8802/8803都變成不同組,group兩個不同group: chlGroup1、chlGroup28802 yml配置8803 yml配置
配置結果測試結果 還是重複消費
分為不同組測試
8802/8803都變成相同組,group兩個相同- group: chlGroup1
6.持久化
通過上述,解決了重複消費問題,再看看持久化停止8802/8803 並去除掉8802的分組 8803的保留分組- group: chlGroup1