1. 程式人生 > 其它 >SpringCloud——Stream入門

SpringCloud——Stream入門

技術標籤:SpringCloudspring cloudstream

一個專案中可能存在多個MQ中介軟體,對於這些MQ存在不同的寫法和處理方式,如果我們需要進行一些改動則需要對這些MQ都瞭解,Stream可以幫我解決這類問題,我們只需要使用它來處理和呼叫我們MQ的一些訊息。

Stream訊息驅動

1.Stream用於構建與共享訊息傳遞系統連線的高度可伸縮的事件驅動微服務框。 目的遮蔽底層中介軟體的差異,降低切換成本,統一訊息的程式設計模型。目前只支撐kafka和rabbitmq。
2.Stream中訊息通訊放手遵循釋出-訂閱模式
3.Stream三大主要內容:
Binder:很方便的連線中介軟體,遮蔽差異。

Channel:通道,是佇列Queue的一種抽象,在訊息通訊系統中就是實現儲存和轉發的媒介,通過Channel對佇列進行配置。
Source和Sink:從Stream釋出訊息就是輸出,接受訊息就是輸入。
4.對於訊息進行分發和消費處理

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: # 在此處配置要繫結的rabbitmq的服務資訊;
        defaultRabbit: # 表示定義的名稱,用於於binding整合
          type:
rabbit # 訊息元件型別 environment: # 設定rabbitmq的相關的環境配置 spring: rabbitmq: host: 192.168.20.129 port: 5672 username: guest password: guest bindings: # 服務的整合處理 output: # 這個名字是一個通道的名稱 destination:
charonExchange # 表示要使用的Exchange名稱定義 content-type: application/json # 設定訊息型別,本次為json,文字則設定“text/plain” binder: defaultRabbit # 設定要繫結的訊息服務的具體設定 eureka: client: # 客戶端進行Eureka註冊的配置 service-url: defaultZone: http://localhost:7001/eureka instance: lease-renewal-interval-in-seconds: 2 # 設定心跳的時間間隔(預設是30秒) lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(預設是90秒) instance-id: send-8801.com # 在資訊列表時顯示主機名稱 prefer-ip-address: true # 訪問的路徑變為IP地址

服務端:

@EnableBinding(Source.class)  // 定義訊息的推送管道
public class MessageProviderImpl implements IMessageProvider {

    /**
     * 訊息傳送管道
     */
    @Resource
    MessageChannel  output;

    @Override
    public String send() {
        String serial = UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(serial).build());
        System.out.println("*****serial: "+serial);
        return serial;
    }
}

客戶端:

@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {

    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT)
    public void input(Message<String> message) {
        System.out.println("消費者1號,----->接受到的訊息: "+message.getPayload()+"\t  port: "+serverPort);
    }
}

5.如果我們再配置檔案中沒有采用分組,那麼訊息存在**重複消費**的問題(rabbitmq會預設新增分組);我們需要在消費端的配置檔案中新增分組概念
在這裡插入圖片描述

對於訊息的消費:不同組可以全面消費(重複消費);同一個組內發生競爭關係只會被消費一次。(預設採用輪詢機制)。
6.如果我們消費服務沒有分組,那麼如果這個服務掛了,它將接收不到滯留訊息,訊息對於這個服務並沒有持久化;如果我們消費服務設定分組,哪怕它掛了重啟之後也可以消費那些滯留訊息。