1. 程式人生 > 實用技巧 >Stream訊息驅動

Stream訊息驅動

學習地址:https://www.bilibili.com/video/BV18E411x7eT?p=83

Spring Cloud Stream

官網地址:https://spring.io/projects/spring-cloud-stream#overview

中文文件:https://m.wang1314.com/doc/webapp/topic/20971999.html

  1. Spring Cloud Stream是用於構建與共享訊息傳遞系統連線的高度可伸縮的事件驅動微服務框架,該框架提供了一個靈活的程式設計模型,它建立在已經建立和熟悉的Spring熟語和最佳實踐上,包括支援持久化的釋出/訂閱、消費組以及訊息分割槽這三個核心概念。

  2. 應用程式通過inputs或者outputs與Spring Cloud Stream中binder物件互動。通過我們配置來binding(繫結),而Spring Cloud Stream的binder物件負責與訊息中介軟體互動。所以,我們只需要搞清楚如何與Spring Cloud Stream互動就可以方便使用訊息驅動的方式。

  3. 通過使用Spring Integration來連線訊息代理中介軟體以實現訊息事件驅動。Spring Cloud Stream為一些供應商的訊息中介軟體產品提供了個性化的自動化配置實現,引用了釋出-訂閱、消費組、分割槽的三個核心概念。

  4. 目前僅支援RabbitMQ、Kafka.

  5. 遮蔽底層訊息中介軟體的差異,降低切換版本,統一訊息的程式設計模型

設計思想

  1. 生產者/消費者之間靠訊息媒介傳遞資訊內容:Message

  2. 訊息必須走特定的通道:訊息通道MessageChannel

  3. 訊息通道里的訊息如何被消費呢,誰負責收發處理

    • 訊息通道MessageChannel的子介面SubscribableChannel,由MessageHandler訊息處理器訂閱

Stream原理

  1. 比方說我們用到了RabbitMQ和Kafka,由於這兩個訊息中介軟體的架構上的不同,
    像RabbitMQ有exchange, kafka有 Topic和Partitions分割槽,這些中介軟體的差異性導致我們實際專案開發給我們造成了一定的困擾, 我們如果用了兩個訊息佇列的其中一種, 後面的業務需求,我想往另外一種訊息佇列進行遷移,這時候無疑就是一個災難性的, 一大堆東西都要重新推倒重新做,因為它跟我們的系統耦合了,這時候springcloud Stream給我們提供了一種解耦合的方式。

  1. 在沒有繫結器這個概念的情況下,我們的SpringBoot應用要直接與訊息中介軟體進行資訊互動的時候,由於各訊息中介軟體構建的初衷不同,它們的實現細節上會有較大的差異性通過定義繫結器作為中間層,完美地實現了應用程式與訊息中介軟體細節之間的隔離。通過嚮應用程式暴露統一的Channel通道, 使得應用程式不需要再考慮各種不同的訊息中介軟體實現。

  2. 通過定義繫結器Bindr作為中間層,實現了應用程式與訊息中介軟體細節之間的隔離。

INPUT對應於消費者
OUTPUT對應於生產者

Stream中的訊息通訊方式遵循了釋出-訂閱模式
Topic主題進行廣播
在RabbitMQ就是Exchange
在kafka中就是Topic

Stream標準流程


  1. Binder
    • 很方便的連線中介軟體,遮蔽差異
  2. Channel
    • 通道,是佇列Queue的一種抽象,在訊息通訊系統中就是實現儲存和轉發的媒介,通過對Channel對佇列進行配置
  3. Source和Sink
    • 簡單的可理解為參照物件是Spring Cloud Stream自身,從Stream釋出訊息就是輸出,接受訊息就是輸入

編碼API和常用註解

訊息驅動之生產者

RabbitMQ環境已經OK

cloud-stream-rabbitmq-provider8801

  1. 建module

  2. 寫POM

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>springcloud2020</artifactId>
        <groupId>com.nuc.springcloud</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>cloud-stream-rabbitmq-provider8801</artifactId>

    <dependencies>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>

        <dependency>
            <groupId>com.nuc.springcloud</groupId>
            <artifactId>cloud-api-commons</artifactId>
            <version>${project.version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

    </dependencies>

</project>
  1. 寫YML
server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: # 在此處配置要繫結的rabbitmq的服務資訊;
        defaultRabbit: # 表示定義的名稱,用於於binding整合
          type: rabbit # 訊息元件型別
          environment: # 設定rabbitmq的相關的環境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服務的整合處理
        output: # 這個名字是一個通道的名稱
          destination: studyExchange # 表示要使用的Exchange名稱定義
          content-type: application/json # 設定訊息型別,本次為json,文字則設定“text/plain”
          binder: defaultRabbit  # 設定要繫結的訊息服務的具體設定

eureka:
  client: # 客戶端進行Eureka註冊的配置
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 設定心跳的時間間隔(預設是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(預設是90秒)
    instance-id: send-8801.com  # 在資訊列表時顯示主機名稱
    prefer-ip-address: true     # 訪問的路徑變為IP地址
  1. 主啟動類
@SpringBootApplication
public class StreamMQMain8801 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8801.class,args);
    }
}
  1. 業務類

service

public interface IMessageProvider {
    public String send();
}
@EnableBinding(Source.class)//定義訊息的推送管道
public class MessageProviderImpl implements IMessageProvider {
    @Resource
    private MessageChannel output;//訊息傳送管道

    @Override
    public String send() {
        String serial = UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(serial).build());
        System.out.println("*****serial: \t"+serial);
        return null;
    }
}

controller

@RestController
public class SendMessageController {
    @Resource
    private IMessageProvider messageProvider;

    @GetMapping(value = "/sendMessage")
    public String sendMessage() {
        return messageProvider.send();
    }
}
  1. 測試

    啟動7001eureka

    啟動rabbitmq

    啟動8801

http://localhost:8801/sendMessage

訊息驅動之消費者

cloud-stream-rabbitmq-consumer8802

  1. 建module

  2. 寫POM

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>springcloud2020</artifactId>
        <groupId>com.nuc.springcloud</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>cloud-stream-rabbitmq-provider8802</artifactId>

    <dependencies>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>

        <dependency>
            <groupId>com.nuc.springcloud</groupId>
            <artifactId>cloud-api-commons</artifactId>
            <version>${project.version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

    </dependencies>

</project>
  1. 寫YML
server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: # 在此處配置要繫結的rabbitmq的服務資訊;
        defaultRabbit: # 表示定義的名稱,用於於binding整合
          type: rabbit # 訊息元件型別
          environment: # 設定rabbitmq的相關的環境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服務的整合處理
        input: # 這個名字是一個通道的名稱
          destination: studyExchange # 表示要使用的Exchange名稱定義
          content-type: application/json # 設定訊息型別,本次為json,文字則設定“text/plain”
          binder: defaultRabbit  # 設定要繫結的訊息服務的具體設定

eureka:
  client: # 客戶端進行Eureka註冊的配置
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 設定心跳的時間間隔(預設是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(預設是90秒)
    instance-id: receive-8802.com  # 在資訊列表時顯示主機名稱
    prefer-ip-address: true     # 訪問的路徑變為IP地址
  1. 主啟動類
@SpringBootApplication
public class StreamMQMain8802 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8802.class,args);
    }
}
  1. 業務類
@RestController
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {
    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT)
    public void input(Message<String> message) {
        System.out.println("消費者1號,-->接收訊息:"+message.getPayload()+"\t port: "+serverPort);
    }
}
  1. 測試

    測試8801傳送8802接收訊息


分組消費與持久化

cloud-stream-rabbitmq-consumer8803

依照8802,clone出來一份執行8803

啟動
RabbitMQ
7001服務註冊
8801訊息生產
8802訊息消費
8803訊息消費

分組解決重複消費問題

分散式微服務應用為了實現高可用和負載均衡,實際上都會部署多個例項,啟動了兩個消費微服務(8802/8803)多數情況,產者傳送訊息給某個具體微服務時只希望被消費一次,按照上面我們啟動兩個應用的例子,雖然它們同屬一個應用,但是這個訊息出現了被重複消費兩次的情況。為了解決這個問題,在Spring Cloud Stream中提供了消費組的概念。

目前是8802/8803同時都收到了,存在重複消費問題

  1. 比如在如下場景中,訂單系統我們做叢集部署,都會從RabbitMQ中獲取訂單資訊,那如果一個訂單同時被兩個服務獲取到,那麼就會造成資料錯誤,我們得避免這種情況。這時我們就可以使用Stream中的訊息分組來解決

  1. 注意在Stream中處於同一個group中的多個消費者是競爭關係,就能夠保證訊息只會被其中一個應用消費一次。不同組是可以全面消費的(重複消費),

  2. 同一組內會發生競爭關係,只有其中一個可以消費。

  3. 原理:微服務應用放置於同一個group中,就能夠保證訊息只會被其中一個應用消費一次。不同的組是可以消費的,同一個組內會發生競爭關係,只有其中一個可以消費。

8802/8803都變成不同組,group兩個不同

設定分組

修改8002YML

spring:
  cloud:
      bindings: 
          group: nucA

修改8003YML

spring:
  cloud:
      bindings: 
          group: nucB


8802/8803都變成相同組,group兩個相同

修改8002YML

spring:
  cloud:
      bindings: 
          group: nucA

修改8003YML

spring:
  cloud:
      bindings: 
          group: nucA

8802/8803實現了輪詢分組,每次只有一個消費者 8801模組的發的訊息只能被8802或8803其中一個接收到,這樣避免了重複消費




訊息持久化問題

停止8802/8803並去除掉8802的分組group:nucA

8803的分組group:nucA沒有去掉

8801先發送4條資訊到rabbitmq

先啟動8802,無分組屬性配置,後臺沒有打出來訊息

先啟動8803,有分組屬性配置,後臺打出來了MQ上的訊息



配置group屬性後具有持久化