第八屆中英國際顆粒技術論壇舉行:探討能源、環境、材料、醫療等前沿科技
概念
Spring Cloud Stream 是用於構建訊息驅動的微服務應用程式的框架。Spring Cloud Stream 構建在 Spring Boot 的基礎上,以建立獨立的、生產級的 Spring 應用程式,並使用 Spring Integration 提供與訊息代理的連線,它提供了來自多個供應商的中介軟體配置,引入釋出-訂閱、消費組和分割槽的概念。作用:遮蔽底層訊息中介軟體的差異,降低切換成本,統一訊息的程式設計模型,通過定義繫結器Binder作為中間層,實現了應用程式與訊息中介軟體細節之間的隔離。Binder可以生成Binding,Binding用來繫結訊息容器的生產者和消費者,它有兩種型別,INPUT和OUTPUT,INPUT對應於消費者,OUTPUT對應於生產者。目前 Spring Cloud Stream 只支援 RabbitMQ 和 Kafka 的自動化配置。本次以RabbitMQ作為中介軟體
環境
- jdk11
- maven3.8.1
- SpringBoot 2.2.2
- SpringCloud Hoxton.SR1
案例
首先啟動RabbitMQ服務,新建作為生產者發訊息模組cloud-stream-rabbitmq-provider8801
pom
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <!--基礎配置--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
yml
server: port: 8801 spring: application: name: cloud-stream-provider cloud: stream: binders: # 在此處配置要繫結的rabbitmq的服務資訊; defaultRabbit: # 表示定義的名稱,用於於binding整合 type: rabbit # 訊息元件型別 environment: # 設定rabbitmq的相關的環境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服務的整合處理 output: # 這個名字是一個通道的名稱 destination: studyExchange # 表示要使用的Exchange名稱定義 content-type: application/json # 設定訊息型別,本次為json,文字則設定“text/plain” binder: defaultRabbit # 設定要繫結的訊息服務的具體設定 eureka: client: # 客戶端進行Eureka註冊的配置 service-url: defaultZone: http://localhost:7001/eureka instance: lease-renewal-interval-in-seconds: 2 # 設定心跳的時間間隔(預設是30秒) lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(預設是90秒) instance-id: send-8801.com # 在資訊列表時顯示主機名稱 prefer-ip-address: true # 訪問的路徑變為IP地址
主啟動類
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; @SpringBootApplication public class StreamMQMain8801 { public static void main(String[] args) { SpringApplication.run(StreamMQMain8801.class,args); } }
傳送訊息介面
public interface IMessageProvider { public String send() ; }
實現類
import com.atguigu.springcloud.service.IMessageProvider; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.messaging.MessageChannel; import org.springframework.integration.support.MessageBuilder; import javax.annotation.Resource; import org.springframework.cloud.stream.messaging.Source; import java.util.UUID; @EnableBinding(Source.class) // 可以理解為是一個訊息的傳送管道的定義 public class MessageProviderImpl implements IMessageProvider { @Resource private MessageChannel output; // 訊息的傳送管道 @Override public String send() { String serial = UUID.randomUUID().toString(); this.output.send(MessageBuilder.withPayload(serial).build()); // 建立併發送訊息 System.out.println("***serial: "+serial); return serial; } }
controller
import com.atguigu.springcloud.service.IMessageProvider; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.integration.support.MessageBuilder; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.util.UUID; @RestController public class SendMessageController { @Resource private IMessageProvider messageProvider; @GetMapping(value = "/sendMessage") public String sendMessage() { return messageProvider.send(); } }
啟動eureka服務7001,8801,訪問http://localhost:15672/
訪問http://localhost:8801/sendMessage,測試通過
新建訊息接收模組cloud-stream-rabbitmq-consumer8802
pom
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <!--基礎配置--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
yml
server: port: 8802 spring: application: name: cloud-stream-consumer cloud: stream: binders: # 在此處配置要繫結的rabbitmq的服務資訊; defaultRabbit: # 表示定義的名稱,用於於binding整合 type: rabbit # 訊息元件型別 environment: # 設定rabbitmq的相關的環境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服務的整合處理 input: # 這個名字是一個通道的名稱 destination: studyExchange # 表示要使用的Exchange名稱定義 content-type: application/json # 設定訊息型別,本次為物件json,如果是文字則設定“text/plain” binder: defaultRabbit # 設定要繫結的訊息服務的具體設定 eureka: client: # 客戶端進行Eureka註冊的配置 service-url: defaultZone: http://localhost:7001/eureka instance: lease-renewal-interval-in-seconds: 2 # 設定心跳的時間間隔(預設是30秒) lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(預設是90秒) instance-id: receive-8802.com # 在資訊列表時顯示主機名稱 prefer-ip-address: true # 訪問的路徑變為IP地址
主啟動類
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class StreamMQMain8802 { public static void main(String[] args) { SpringApplication.run(StreamMQMain8802.class,args); } }
業務類
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Component
@EnableBinding(Sink.class)
@Slf4j
public class ReceiveMessageListenerController
{
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)
public void input(Message<String> message)
{
log.info("消費者1號,===============接收到的訊息:" + message.getPayload()+"\t port: "+serverPort);
}
}
測試8801傳送資訊:http://localhost:8801/sendMessage
根據8002,複製一份8003,然後啟動eureka服務7001,訊息生產8801,訊息消費8802、8803
8801傳送資訊:http://localhost:8801/sendMessage,發現有重複消費問題,某種情況下只希望一個訊息消費接受到訊息;訊息持久化問題,假設8802出現問題,過一會在重新啟動,在這期間8801傳送訊息,8802並沒有接收到
解決辦法
微服務應用放置於同一個group中,就能夠保證訊息只會被其中一個應用消費一次。不同的組是可以消費的,同一個組內會發生競爭關係,只有其中一個可以消費。
修改訊息消費8802、8803yml
bindings: # 服務的整合處理 input: # 這個名字是一個通道的名稱 destination: studyExchange # 表示要使用的Exchange名稱定義 content-type: application/json # 設定訊息型別,本次為物件json,如果是文字則設定“text/plain” binder: {defaultRabbit} # 設定要繫結的訊息服務的具體設定 group: fly
這時候8801再次傳送訊息,只會有一個微服務拿到。如果這時候8802或8803重啟服務,在這期間8801傳送訊息,服務重啟後仍會接收到訊息