1. 程式人生 > 實用技巧 >Spring Cloud:Stream基礎知識

Spring Cloud:Stream基礎知識

背景

  訊息中介軟體有多種,rabbitmq,rocketmq,activemq,kafka等。
  不同的訊息中介軟體具體細節不一樣。那麼有沒有一種新的技術誕生,讓我們不再關注具體MQ細節,我們只需要用一種適配繫結的方式,自動給我們在各種MQ內切換。
  一句話:遮蔽底層訊息中介軟體的差異,降低切換成本,統一訊息的程式設計模型。Spring Cloud Stream 因此誕生。
  官方定義 Spring Cloud Stream是一個構建訊息驅動微服務的框架。
  應用程式使用inputs或者outputs來與springcloud stream中binder互動。
  通過我們配置來bingding(繫結),而stream的binder物件負責與訊息中介軟體互動。
  所以,我們只需要搞清楚如何與springcloud stream互動就可以方便使用訊息驅動方式。
  SpringCloud Stream為一些供應商的訊息中介軟體產品提供了個性化的自動化配置實現,引用釋出-訂閱、消費組、分割槽三個核心概念。
  目前僅支援RabbitMQ和Kafka。
Stream處理架構:

通過定義繫結器Binder作為中間層,實現了應用程式與訊息中介軟體之間的隔離。


stream標準流程套路:

Binder:很方便的連線中介軟體,遮蔽差異。
channel:通道,是佇列Queue的一種抽象,在訊息通訊系統中就是實現儲存和轉發的媒介,通過channel對佇列進行配置。
source和sink:簡單理解為訊息的輸入輸出。
編碼API和註解:

Stream訊息生產者

pom依賴

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

yml配置

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      bindings:
        output:
          destination: studyExchange
          content-type: application/json
          binder: defaultRabbit
      binders:
        defaultRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 192.168.10.132
                port: 5672
                username: guest
                password: guest

eureka:
  client:
    #是否將自己註冊到Eureka Server 預設為true
    register-with-eureka: true
    #是否從EurekaServer抓取已有的註冊資訊,預設為true,單節點無所謂,叢集必須設定true才能配合ribbon做負載均衡
    fetch-registry: true
    service-url:
      #設定eureka server互動的地址查詢服務和註冊服務都需要依賴這個地址
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2
    lease-expiration-duration-in-seconds: 5
    instance-id: send-8801.com
    prefer-ip-address: true

傳送訊息的介面:

public interface IMessageProvider {
    public String send();
}

傳送訊息的實現:

@EnableBinding(Source.class)//定義訊息的推送管道
public class MessageProvider implements IMessageProvider {

    @Autowired
    private MessageChannel output;

    @Override
    public String send() {
        String serial = IdUtil.simpleUUID();
        System.out.println(serial+"============");
        output.send(MessageBuilder.withPayload(serial).build());
        return serial;
    }
}

controller

@RestController
public class SendController {

    @Autowired
    private IMessageProvider messageProvider;

    @GetMapping("sendMessage")
    public String sendMessage(){
        return messageProvider.send();
    }
}

呼叫介面,觀察效果

Stream訊息消費者

配置yml

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      bindings:
        input:
          destination: studyExchange
          content-type: application/json
          binder: defaultRabbit
      binders:
        defaultRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 192.168.10.132
                port: 5672
                username: guest
                password: guest

eureka:
  client:
    #是否將自己註冊到Eureka Server 預設為true
    register-with-eureka: true
    #是否從EurekaServer抓取已有的註冊資訊,預設為true,單節點無所謂,叢集必須設定true才能配合ribbon做負載均衡
    fetch-registry: true
    service-url:
      #設定eureka server互動的地址查詢服務和註冊服務都需要依賴這個地址
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2
    lease-expiration-duration-in-seconds: 5
    instance-id: receive-8802.com
    prefer-ip-address: true

接收訊息:

@EnableBinding(Sink.class)
@RestController
public class ReceiveMessageListenerController {

    @Value("${server.port}")
    private String serverPort;

    @StreamListener(value = Sink.INPUT)
    public void input(Message<String> message){
        System.out.println("訊息:"+message.getPayload()+"serverPort:"+serverPort);
    }

}

stream重複消費

下預設配置中,多個消費者存在,會存在重複消費問題
原因:預設分組group是不同的,組流水號不一樣,被認為是不同組,可以消費,所以要自定義配置分組。

yml配置:

  cloud:
    stream:
      bindings:
        input:
          destination: studyExchange
          content-type: application/json
          binder: defaultRabbit
          group: wen.jie

通過配置後,兩個消費者被分配到一組,就不存在重複消費的問題。