1. 程式人生 > 資訊 >第八屆中英國際顆粒技術論壇舉行:探討能源、環境、材料、醫療等前沿科技

第八屆中英國際顆粒技術論壇舉行:探討能源、環境、材料、醫療等前沿科技

概念

Spring Cloud Stream 是用於構建訊息驅動的微服務應用程式的框架。Spring Cloud Stream 構建在 Spring Boot 的基礎上,以建立獨立的、生產級的 Spring 應用程式,並使用 Spring Integration 提供與訊息代理的連線,它提供了來自多個供應商的中介軟體配置,引入釋出-訂閱、消費組和分割槽的概念。作用:遮蔽底層訊息中介軟體的差異,降低切換成本,統一訊息的程式設計模型,通過定義繫結器Binder作為中間層,實現了應用程式與訊息中介軟體細節之間的隔離。Binder可以生成Binding,Binding用來繫結訊息容器的生產者和消費者,它有兩種型別,INPUT和OUTPUT,INPUT對應於消費者,OUTPUT對應於生產者。目前 Spring Cloud Stream 只支援 RabbitMQ 和 Kafka 的自動化配置。本次以RabbitMQ作為中介軟體

環境

  1. jdk11
  2. maven3.8.1
  3. SpringBoot 2.2.2
  4. SpringCloud Hoxton.SR1

案例

首先啟動RabbitMQ服務,新建作為生產者發訊息模組cloud-stream-rabbitmq-provider8801

pom

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <!--基礎配置-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true
</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>

yml

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
      stream:
        binders: # 在此處配置要繫結的rabbitmq的服務資訊;
          defaultRabbit: # 表示定義的名稱,用於於binding整合
            type: rabbit # 訊息元件型別
            environment: # 設定rabbitmq的相關的環境配置
              spring:
                rabbitmq:
                  host: localhost
                  port: 5672
                  username: guest
                  password: guest
        bindings: # 服務的整合處理
          output: # 這個名字是一個通道的名稱
            destination: studyExchange # 表示要使用的Exchange名稱定義
            content-type: application/json # 設定訊息型別,本次為json,文字則設定“text/plain”
            binder: defaultRabbit # 設定要繫結的訊息服務的具體設定

eureka:
  client: # 客戶端進行Eureka註冊的配置
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 設定心跳的時間間隔(預設是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(預設是90秒)
    instance-id: send-8801.com  # 在資訊列表時顯示主機名稱
    prefer-ip-address: true     # 訪問的路徑變為IP地址

主啟動類

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;

@SpringBootApplication
public class StreamMQMain8801
{
    public static void main(String[] args)
    {
        SpringApplication.run(StreamMQMain8801.class,args);
    }
}

傳送訊息介面

public interface IMessageProvider
{
    public String send() ;
}
 

實現類

import com.atguigu.springcloud.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.MessageChannel;
import org.springframework.integration.support.MessageBuilder;
import javax.annotation.Resource;
import org.springframework.cloud.stream.messaging.Source;

import java.util.UUID;

@EnableBinding(Source.class) // 可以理解為是一個訊息的傳送管道的定義
public class MessageProviderImpl implements IMessageProvider
{
    @Resource
    private MessageChannel output; // 訊息的傳送管道

    @Override
    public String send()
    {
        String serial = UUID.randomUUID().toString();
        this.output.send(MessageBuilder.withPayload(serial).build()); // 建立併發送訊息
        System.out.println("***serial: "+serial);

        return serial;
    }
}

controller

import com.atguigu.springcloud.service.IMessageProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.UUID;

@RestController
public class SendMessageController
{
    @Resource
    private IMessageProvider messageProvider;

    @GetMapping(value = "/sendMessage")
    public String sendMessage()
    {
        return messageProvider.send();
    }
}

啟動eureka服務7001,8801,訪問http://localhost:15672/

訪問http://localhost:8801/sendMessage,測試通過

新建訊息接收模組cloud-stream-rabbitmq-consumer8802

pom

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <!--基礎配置-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

yml

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
      stream:
        binders: # 在此處配置要繫結的rabbitmq的服務資訊;
          defaultRabbit: # 表示定義的名稱,用於於binding整合
            type: rabbit # 訊息元件型別
            environment: # 設定rabbitmq的相關的環境配置
              spring:
                rabbitmq:
                  host: localhost
                  port: 5672
                  username: guest
                  password: guest
        bindings: # 服務的整合處理
          input: # 這個名字是一個通道的名稱
            destination: studyExchange # 表示要使用的Exchange名稱定義
            content-type: application/json # 設定訊息型別,本次為物件json,如果是文字則設定“text/plain”
            binder: defaultRabbit # 設定要繫結的訊息服務的具體設定

eureka:
  client: # 客戶端進行Eureka註冊的配置
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 設定心跳的時間間隔(預設是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(預設是90秒)
    instance-id: receive-8802.com  # 在資訊列表時顯示主機名稱
    prefer-ip-address: true     # 訪問的路徑變為IP地址

主啟動類

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class StreamMQMain8802
{
    public static void main(String[] args)
    {
        SpringApplication.run(StreamMQMain8802.class,args);
    }
}

業務類

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Component
@EnableBinding(Sink.class)
@Slf4j
public class ReceiveMessageListenerController
{
@Value("${server.port}")
private String serverPort;

@StreamListener(Sink.INPUT)
public void input(Message<String> message)
{
log.info("消費者1號,===============接收到的訊息:" + message.getPayload()+"\t port: "+serverPort);
}
}

測試8801傳送資訊:http://localhost:8801/sendMessage

根據8002,複製一份8003,然後啟動eureka服務7001,訊息生產8801,訊息消費8802、8803

8801傳送資訊:http://localhost:8801/sendMessage,發現有重複消費問題,某種情況下只希望一個訊息消費接受到訊息;訊息持久化問題,假設8802出現問題,過一會在重新啟動,在這期間8801傳送訊息,8802並沒有接收到

解決辦法

微服務應用放置於同一個group中,就能夠保證訊息只會被其中一個應用消費一次。不同的組是可以消費的,同一個組內會發生競爭關係,只有其中一個可以消費。
修改訊息消費8802、8803yml

 bindings: # 服務的整合處理
        input: # 這個名字是一個通道的名稱
          destination: studyExchange # 表示要使用的Exchange名稱定義
          content-type: application/json # 設定訊息型別,本次為物件json,如果是文字則設定“text/plain”
          binder: {defaultRabbit} # 設定要繫結的訊息服務的具體設定
          group: fly

這時候8801再次傳送訊息,只會有一個微服務拿到。如果這時候8802或8803重啟服務,在這期間8801傳送訊息,服務重啟後仍會接收到訊息