1. 程式人生 > 實用技巧 >11.SpringCloud Stream (訊息驅動)

11.SpringCloud Stream (訊息驅動)

1.訊息驅動概述

是什麼

遮蔽底層訊息中介軟體的差異,降低切換版本,統一訊息的程式設計模型官網https://spring.io/projects/spring-cloud-stream#overview
https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.1.RELEASE/reference/html/ Spring Cloud Stream中文指導手冊https://m.wang1314.com/doc/webapp/topic/20971999.html

設計思想

標準MQ

  • 生產者/消費者之間靠訊息媒介傳遞資訊內容 Message
  • 訊息必須走特定的通道 訊息通道MessageChannel

  • 訊息通道里的訊息如何被消費呢,誰負責收發處理
訊息通道MessageChannel的子介面SubscribableChannel,由MessageHandler訊息處理器訂閱


為什麼用Cloud Stream


stream憑什麼可以統一底層差異



Binder


  • INPUT對應於消費者
  • OUTPUT對應於生產者

Stream中的訊息通訊方式遵循了釋出-訂閱模式

Topic主題進行廣播
  • 在RabbitMQ就是Exchange
  • 在kafka中就是Topic

Spring Cloud Stream標準流程套路


  • Binder
很方便的連線中介軟體,遮蔽差異
  • Channel
通道,是佇列Queue的一種抽象,在訊息通訊系統中就是實現儲存和轉發的媒介,通過對Channel對佇列進行配置
  • Source和Sink
簡單的可理解為參照物件是Spring Cloud Stream自身,從Stream釋出訊息就是輸出,接受訊息就是輸入

編碼API和常用註解


2.案例說明

前提:RabbitMQ環境已經OK工程中新建三個子模組
  • cloud-stream-rabbitmq-provider8801,作為生產者進行發訊息模組
  • cloud-stream-rabbitmq-consumer8802,作為訊息接收模組
  • cloud-stream-rabbitmq-consumer8803,作為訊息接收模組

3.訊息驅動之生產者

新建模組 cloud-stream-rabbitmq-provider8801pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>springcloud2020</artifactId>
        <groupId>com.chl.springcloud</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>cloud-stream-rabbitmq-provider8801</artifactId>

    <dependencies>
        <!--springcloud-stream和rabbit的整合-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-eureka-server -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>

        <dependency>
            <groupId>com.chl.springcloud</groupId>
            <artifactId>cloud-api-commons</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>


        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>


        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-devtools -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-test -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>
yml
server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: # 在此處配置要繫結的rabbitmq的服務資訊;
        defaultRabbit: # 表示定義的名稱,用於於binding整合
          type: rabbit # 訊息元件型別
          environment: # 設定rabbitmq的相關的環境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服務的整合處理
        output: # 這個名字是一個通道的名稱
          destination: studyExchange # 表示要使用的Exchange名稱定義
          content-type: application/json # 設定訊息型別,本次為json,文字則設定“text/plain”
          binder: defaultRabbit  # 設定要繫結的訊息服務的具體設定

eureka:
  client: # 客戶端進行Eureka註冊的配置
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 設定心跳的時間間隔(預設是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(預設是90秒)
    instance-id: send-8801.com  # 在資訊列表時顯示主機名稱
    prefer-ip-address: true     # 訪問的路徑變為IP地址
啟動類
@SpringBootApplication
public class StreamMQMain8801 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8801.class, args);
    }
}
業務類1.傳送訊息介面
public interface IMessageProvider {
    public String send();
}
2.傳送訊息介面實現類
@EnableBinding(Source.class) //定義訊息的推送管道
public class MessageProviderImpl implements IMessageProvider {
    @Autowired
    private MessageChannel output; // 訊息傳送管道

    @Override
    public String send() {
        String serial = UUID.randomUUID().toString();
        //發訊息出去  將訊息用builder一下,發出去
        output.send(MessageBuilder.withPayload(serial).build());
        System.out.println("*****serial: "+serial);
        return null;
    }
}
3.Controller
@RestController
public class SendMessageController {
    @Autowired
    private IMessageProvider messageProvider;

    @GetMapping(value = "/sendMessage")
    public String sendMessage() {
        return messageProvider.send();
    }
}
測試1.啟動7001eureka2.啟動rabbitmq 訪問 http://localhost:15672/ 3.啟動88014.訪問 http://localhost:8801/sendMessage

4.訊息驅動之消費者

新建 cloud-stream-rabbitmq-consumer8802pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>springcloud2020</artifactId>
        <groupId>com.chl.springcloud</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>cloud-stream-rabbitmq-consumer8802</artifactId>

    <dependencies>
        <!--springcloud-stream和rabbit的整合-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-eureka-server -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>

        <dependency>
            <groupId>com.chl.springcloud</groupId>
            <artifactId>cloud-api-commons</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>


        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>


        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-devtools -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-test -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

    </dependencies>
</project>
yml
server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: # 在此處配置要繫結的rabbitmq的服務資訊;
        defaultRabbit: # 表示定義的名稱,用於於binding整合
          type: rabbit # 訊息元件型別
          environment: # 設定rabbitmq的相關的環境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服務的整合處理
        input: # 這個名字是一個通道的名稱
          destination: studyExchange # 表示要使用的Exchange名稱定義
          content-type: application/json # 設定訊息型別,本次為json,文字則設定“text/plain”
          binder: defaultRabbit  # 設定要繫結的訊息服務的具體設定
          group:  chlGroup1   #所屬分組

eureka:
  client: # 客戶端進行Eureka註冊的配置
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 設定心跳的時間間隔(預設是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(預設是90秒)
    instance-id: receive-8802.com  # 在資訊列表時顯示主機名稱
    prefer-ip-address: true     # 訪問的路徑變為IP地址
啟動類
@SpringBootApplication
public class StreamMQMain8802 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8802.class, args);
    }
}
業務類
@Component
@EnableBinding(Sink.class) //開啟監聽 input
public class ReceiveMessageListenerController {
    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT)//監聽
    public void input(Message<String> message) {
        System.out.println("消費者2號,接受:"+message.getPayload()+"\t port:"+serverPort);
    }

}
測試8801傳送8802接收訊息http://localhost:8801/sendMessage


5.分組消費與持久化

重複消費演示

依照8802,clone出來一份執行8803新建立 cloud-stream-rabbitmq-consumer8803啟動
  • RabbitMQ
  • 服務註冊 7001
  • 訊息生產 8801
  • 訊息消費 8802 ,8803

執行後兩個問題
  • 有重複消費問題
  • 訊息持久化問題

目前是8802/8803同時都收到了,存在重複消費問題http://localhost:8801/sendMessage 如何解決分組和持久化屬性group

分組

原理:微服務應用放置於同一個group中,就能夠保證訊息只會被其中一個應用消費一次。不同的組是可以消費的,同一個組內會發生競爭關係,只有其中一個可以消費。

分為同組測試

8802/8803都變成不同組,group兩個不同group: chlGroup1、chlGroup28802 yml配置
8803 yml配置
配置結果測試結果 還是重複消費

分為不同組測試

8802/8803都變成相同組,group兩個相同
  • group: chlGroup1
結論同一個組的多個微服務例項,每次只會有一個被消費

6.持久化

通過上述,解決了重複消費問題,再看看持久化停止8802/8803 並去除掉8802的分組 8803的保留分組
  • group: chlGroup1
結論同一個組的多個微服務例項,每次只會有一個被消費